source: fedd/federation/experiment_control.py @ 80b1e82

compt_changesinfo-ops
Last change on this file since 80b1e82 was 80b1e82, checked in by Ted Faber <faber@…>, 12 years ago

Info stuff all works. SEER will need to add a legacy parameter to info.

  • Property mode set to 100644
File size: 83.7 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import os.path
17
18import traceback
19# For parsing visualization output and splitter output
20import xml.parsers.expat
21
22from threading import Lock, Thread, Condition
23from subprocess import call, Popen, PIPE
24from string import join
25
26from urlparse import urlparse
27from urllib2 import urlopen
28
29from util import *
30from fedid import fedid, generate_fedid
31from remote_service import xmlrpc_handler, soap_handler, service_caller
32from service_error import service_error
33from synch_store import synch_store
34from experiment_partition import experiment_partition
35from experiment_control_legacy import experiment_control_legacy
36from authorizer import abac_authorizer
37from thread_pool import thread_pool, pooled_thread
38from experiment_info import experiment_info, allocation_info
39
40import topdl
41import list_log
42from ip_allocator import ip_allocator
43from ip_addr import ip_addr
44
45
46class nullHandler(logging.Handler):
47    def emit(self, record): pass
48
49fl = logging.getLogger("fedd.experiment_control")
50fl.addHandler(nullHandler())
51
52
53# Right now, no support for composition.
54class federated_service:
55    def __init__(self, name, exporter=None, importers=None, params=None, 
56            reqs=None, portal=None):
57        self.name=name
58        self.exporter=exporter
59        if importers is None: self.importers = []
60        else: self.importers=importers
61        if params is None: self.params = { }
62        else: self.params = params
63        if reqs is None: self.reqs = []
64        else: self.reqs = reqs
65
66        if portal is not None:
67            self.portal = portal
68        else:
69            self.portal = (name in federated_service.needs_portal)
70
71    def __str__(self):
72        return "name %s export %s import %s params %s reqs %s" % \
73                (self.name, self.exporter, self.importers, self.params,
74                        [ (r['name'], r['visibility']) for r in self.reqs] )
75
76    needs_portal = ('SMB', 'seer', 'tmcd', 'project_export', 'seer_master')
77
78class experiment_control_local(experiment_control_legacy):
79    """
80    Control of experiments that this system can directly access.
81
82    Includes experiment creation, termination and information dissemination.
83    Thred safe.
84    """
85
86    class ssh_cmd_timeout(RuntimeError): pass
87   
88    call_RequestAccess = service_caller('RequestAccess')
89    call_ReleaseAccess = service_caller('ReleaseAccess')
90    call_StartSegment = service_caller('StartSegment')
91    call_TerminateSegment = service_caller('TerminateSegment')
92    call_Ns2Topdl = service_caller('Ns2Topdl')
93
94    def __init__(self, config=None, auth=None):
95        """
96        Intialize the various attributes, most from the config object
97        """
98
99        def parse_tarfile_list(tf):
100            """
101            Parse a tarfile list from the configuration.  This is a set of
102            paths and tarfiles separated by spaces.
103            """
104            rv = [ ]
105            if tf is not None:
106                tl = tf.split()
107                while len(tl) > 1:
108                    p, t = tl[0:2]
109                    del tl[0:2]
110                    rv.append((p, t))
111            return rv
112
113        self.list_log = list_log.list_log
114
115        self.cert_file = config.get("experiment_control", "cert_file")
116        if self.cert_file:
117            self.cert_pwd = config.get("experiment_control", "cert_pwd")
118        else:
119            self.cert_file = config.get("globals", "cert_file")
120            self.cert_pwd = config.get("globals", "cert_pwd")
121
122        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
123                or config.get("globals", "trusted_certs")
124
125        self.repodir = config.get("experiment_control", "repodir")
126        self.repo_url = config.get("experiment_control", "repo_url", 
127                "https://users.isi.deterlab.net:23235");
128
129        self.exp_stem = "fed-stem"
130        self.log = logging.getLogger("fedd.experiment_control")
131        set_log_level(config, "experiment_control", self.log)
132        self.muxmax = 2
133        self.nthreads = 10
134        self.randomize_experiments = False
135
136        self.splitter = None
137        self.ssh_keygen = "/usr/bin/ssh-keygen"
138        self.ssh_identity_file = None
139
140
141        self.debug = config.getboolean("experiment_control", "create_debug")
142        self.cleanup = not config.getboolean("experiment_control", 
143                "leave_tmpfiles")
144        self.state_filename = config.get("experiment_control", 
145                "experiment_state")
146        self.store_filename = config.get("experiment_control", 
147                "synch_store")
148        self.store_url = config.get("experiment_control", "store_url")
149        self.splitter_url = config.get("experiment_control", "ns2topdl_uri")
150        self.fedkit = parse_tarfile_list(\
151                config.get("experiment_control", "fedkit"))
152        self.gatewaykit = parse_tarfile_list(\
153                config.get("experiment_control", "gatewaykit"))
154        accessdb_file = config.get("experiment_control", "accessdb")
155
156        dt = config.get("experiment_control", "direct_transit")
157        self.auth_type = config.get('experiment_control', 'auth_type') \
158                or 'legacy'
159        self.auth_dir = config.get('experiment_control', 'auth_dir')
160        if dt: self.direct_transit = [ tb.strip() for tb in dt.split(",")]
161        else: self.direct_transit = [ ]
162        # NB for internal master/slave ops, not experiment setup
163        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
164
165        self.overrides = set([])
166        ovr = config.get('experiment_control', 'overrides')
167        if ovr:
168            for o in ovr.split(","):
169                o = o.strip()
170                if o.startswith('fedid:'): o = o[len('fedid:'):]
171                self.overrides.add(fedid(hexstr=o))
172
173        self.state = { }
174        self.state_lock = Lock()
175        self.tclsh = "/usr/local/bin/otclsh"
176        self.tcl_splitter = config.get("ns2topdl", "tcl_splitter") or \
177                config.get("experiment_control", "tcl_splitter",
178                        "/usr/testbed/lib/ns2ir/parse.tcl")
179        mapdb_file = config.get("experiment_control", "mapdb")
180        self.trace_file = sys.stderr
181
182        self.def_expstart = \
183                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
184                "/tmp/federate";
185        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
186                "FEDDIR/hosts";
187        self.def_gwstart = \
188                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
189                "/tmp/bridge.log";
190        self.def_mgwstart = \
191                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
192                "/tmp/bridge.log";
193        self.def_gwimage = "FBSD61-TUNNEL2";
194        self.def_gwtype = "pc";
195        self.local_access = { }
196
197        if self.auth_type == 'legacy':
198            if auth:
199                self.auth = auth
200            else:
201                self.log.error( "[access]: No authorizer initialized, " +\
202                        "creating local one.")
203                auth = authorizer()
204            self.get_access = self.legacy_get_access
205        elif self.auth_type == 'abac':
206            self.auth = abac_authorizer(load=self.auth_dir)
207        else:
208            raise service_error(service_error.internal, 
209                    "Unknown auth_type: %s" % self.auth_type)
210
211        if mapdb_file:
212            self.read_mapdb(mapdb_file)
213        else:
214            self.log.warn("[experiment_control] No testbed map, using defaults")
215            self.tbmap = { 
216                    'deter':'https://users.isi.deterlab.net:23235',
217                    'emulab':'https://users.isi.deterlab.net:23236',
218                    'ucb':'https://users.isi.deterlab.net:23237',
219                    }
220
221        if accessdb_file:
222                self.read_accessdb(accessdb_file)
223        else:
224            raise service_error(service_error.internal,
225                    "No accessdb specified in config")
226
227        # Grab saved state.  OK to do this w/o locking because it's read only
228        # and only one thread should be in existence that can see self.state at
229        # this point.
230        if self.state_filename:
231            self.read_state()
232
233        if self.store_filename:
234            self.read_store()
235        else:
236            self.log.warning("No saved synch store")
237            self.synch_store = synch_store
238
239        # Dispatch tables
240        self.soap_services = {\
241                'New': soap_handler('New', self.new_experiment),
242                'Create': soap_handler('Create', self.create_experiment),
243                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
244                'Vis': soap_handler('Vis', self.get_vis),
245                'Info': soap_handler('Info', self.get_info),
246                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
247                'Terminate': soap_handler('Terminate', 
248                    self.terminate_experiment),
249                'GetValue': soap_handler('GetValue', self.GetValue),
250                'SetValue': soap_handler('SetValue', self.SetValue),
251        }
252
253        self.xmlrpc_services = {\
254                'New': xmlrpc_handler('New', self.new_experiment),
255                'Create': xmlrpc_handler('Create', self.create_experiment),
256                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
257                'Vis': xmlrpc_handler('Vis', self.get_vis),
258                'Info': xmlrpc_handler('Info', self.get_info),
259                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
260                'Terminate': xmlrpc_handler('Terminate',
261                    self.terminate_experiment),
262                'GetValue': xmlrpc_handler('GetValue', self.GetValue),
263                'SetValue': xmlrpc_handler('SetValue', self.SetValue),
264        }
265
266    # Call while holding self.state_lock
267    def write_state(self):
268        """
269        Write a new copy of experiment state after copying the existing state
270        to a backup.
271
272        State format is a simple pickling of the state dictionary.
273        """
274        if os.access(self.state_filename, os.W_OK):
275            copy_file(self.state_filename, \
276                    "%s.bak" % self.state_filename)
277        try:
278            f = open(self.state_filename, 'w')
279            pickle.dump(self.state, f)
280        except EnvironmentError, e:
281            self.log.error("Can't write file %s: %s" % \
282                    (self.state_filename, e))
283        except pickle.PicklingError, e:
284            self.log.error("Pickling problem: %s" % e)
285        except TypeError, e:
286            self.log.error("Pickling problem (TypeError): %s" % e)
287
288    @staticmethod
289    def get_alloc_ids(exp):
290        """
291        Used by read_store and read state.  This used to be worse.
292        """
293
294        return [ a.allocID for a in exp.get_all_allocations() ]
295
296
297    # Call while holding self.state_lock
298    def read_state(self):
299        """
300        Read a new copy of experiment state.  Old state is overwritten.
301
302        State format is a simple pickling of the state dictionary.
303        """
304       
305        try:
306            f = open(self.state_filename, "r")
307            self.state = pickle.load(f)
308            self.log.debug("[read_state]: Read state from %s" % \
309                    self.state_filename)
310        except EnvironmentError, e:
311            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
312                    % (self.state_filename, e))
313        except pickle.UnpicklingError, e:
314            self.log.warning(("[read_state]: No saved state: " + \
315                    "Unpickling failed: %s") % e)
316       
317        for s in self.state.values():
318            try:
319
320                eid = s.fedid
321                if eid : 
322                    if self.auth_type == 'legacy':
323                        # XXX: legacy
324                        # Give the owner rights to the experiment
325                        #self.auth.set_attribute(s['owner'], eid)
326                        # And holders of the eid as well
327                        self.auth.set_attribute(eid, eid)
328                        # allow overrides to control experiments as well
329                        for o in self.overrides:
330                            self.auth.set_attribute(o, eid)
331                        # Set permissions to allow reading of the software
332                        # repo, if any, as well.
333                        for a in self.get_alloc_ids(s):
334                            self.auth.set_attribute(a, 'repo/%s' % eid)
335                else:
336                    raise KeyError("No experiment id")
337            except KeyError, e:
338                self.log.warning("[read_state]: State ownership or identity " +\
339                        "misformatted in %s: %s" % (self.state_filename, e))
340
341
342    def read_accessdb(self, accessdb_file):
343        """
344        Read the mapping from fedids that can create experiments to their name
345        in the 3-level access namespace.  All will be asserted from this
346        testbed and can include the local username and porject that will be
347        asserted on their behalf by this fedd.  Each fedid is also added to the
348        authorization system with the "create" attribute.
349        """
350        self.accessdb = {}
351        # These are the regexps for parsing the db
352        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
353        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
354                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
355        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
356                "\s*->\s*(" + name_expr + ")\s*$")
357        lineno = 0
358
359        # Parse the mappings and store in self.authdb, a dict of
360        # fedid -> (proj, user)
361        try:
362            f = open(accessdb_file, "r")
363            for line in f:
364                lineno += 1
365                line = line.strip()
366                if len(line) == 0 or line.startswith('#'):
367                    continue
368                m = project_line.match(line)
369                if m:
370                    fid = fedid(hexstr=m.group(1))
371                    project, user = m.group(2,3)
372                    if not self.accessdb.has_key(fid):
373                        self.accessdb[fid] = []
374                    self.accessdb[fid].append((project, user))
375                    continue
376
377                m = user_line.match(line)
378                if m:
379                    fid = fedid(hexstr=m.group(1))
380                    project = None
381                    user = m.group(2)
382                    if not self.accessdb.has_key(fid):
383                        self.accessdb[fid] = []
384                    self.accessdb[fid].append((project, user))
385                    continue
386                self.log.warn("[experiment_control] Error parsing access " +\
387                        "db %s at line %d" %  (accessdb_file, lineno))
388        except EnvironmentError:
389            raise service_error(service_error.internal,
390                    ("Error opening/reading %s as experiment " +\
391                            "control accessdb") %  accessdb_file)
392        f.close()
393
394        # Initialize the authorization attributes
395        # XXX: legacy
396        if self.auth_type == 'legacy':
397            for fid in self.accessdb.keys():
398                self.auth.set_attribute(fid, 'create')
399                self.auth.set_attribute(fid, 'new')
400
401    def read_mapdb(self, file):
402        """
403        Read a simple colon separated list of mappings for the
404        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
405        """
406
407        self.tbmap = { }
408        lineno =0
409        try:
410            f = open(file, "r")
411            for line in f:
412                lineno += 1
413                line = line.strip()
414                if line.startswith('#') or len(line) == 0:
415                    continue
416                try:
417                    label, url = line.split(':', 1)
418                    self.tbmap[label] = url
419                except ValueError, e:
420                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
421                            "map db: %s %s" % (lineno, line, e))
422        except EnvironmentError, e:
423            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
424                    "open %s: %s" % (file, e))
425        else:
426            f.close()
427
428    def read_store(self):
429        try:
430            self.synch_store = synch_store()
431            self.synch_store.load(self.store_filename)
432            self.log.debug("[read_store]: Read store from %s" % \
433                    self.store_filename)
434        except EnvironmentError, e:
435            self.log.warning("[read_store]: No saved store: Can't open %s: %s"\
436                    % (self.state_filename, e))
437            self.synch_store = synch_store()
438
439        # Set the initial permissions on data in the store.  XXX: This ad hoc
440        # authorization attribute initialization is getting out of hand.
441        # XXX: legacy
442        if self.auth_type == 'legacy':
443            for k in self.synch_store.all_keys():
444                try:
445                    if k.startswith('fedid:'):
446                        fid = fedid(hexstr=k[6:46])
447                        if self.state.has_key(fid):
448                            for a in self.get_alloc_ids(self.state[fid]):
449                                self.auth.set_attribute(a, k)
450                except ValueError, e:
451                    self.log.warn("Cannot deduce permissions for %s" % k)
452
453
454    def write_store(self):
455        """
456        Write a new copy of synch_store after writing current state
457        to a backup.  We use the internal synch_store pickle method to avoid
458        incinsistent data.
459
460        State format is a simple pickling of the store.
461        """
462        if os.access(self.store_filename, os.W_OK):
463            copy_file(self.store_filename, \
464                    "%s.bak" % self.store_filename)
465        try:
466            self.synch_store.save(self.store_filename)
467        except EnvironmentError, e:
468            self.log.error("Can't write file %s: %s" % \
469                    (self.store_filename, e))
470        except TypeError, e:
471            self.log.error("Pickling problem (TypeError): %s" % e)
472
473
474    def remove_dirs(self, dir):
475        """
476        Remove the directory tree and all files rooted at dir.  Log any errors,
477        but continue.
478        """
479        self.log.debug("[removedirs]: removing %s" % dir)
480        try:
481            for path, dirs, files in os.walk(dir, topdown=False):
482                for f in files:
483                    os.remove(os.path.join(path, f))
484                for d in dirs:
485                    os.rmdir(os.path.join(path, d))
486            os.rmdir(dir)
487        except EnvironmentError, e:
488            self.log.error("Error deleting directory tree in %s" % e);
489
490    @staticmethod
491    def make_temp_certfile(expcert, tmpdir):
492        """
493        make a protected copy of the access certificate so the experiment
494        controller can act as the experiment principal.  mkstemp is the most
495        secure way to do that. The directory should be created by
496        mkdtemp.  Return the filename.
497        """
498        if expcert and tmpdir:
499            try:
500                certf, certfn = tempfile.mkstemp(suffix=".pem", dir=tmpdir)
501                f = os.fdopen(certf, 'w')
502                print >> f, expcert
503                f.close()
504            except EnvironmentError, e:
505                raise service_error(service_error.internal, 
506                        "Cannot create temp cert file?")
507            return certfn
508        else:
509            return None
510
511       
512    def generate_ssh_keys(self, dest, type="rsa" ):
513        """
514        Generate a set of keys for the gateways to use to talk.
515
516        Keys are of type type and are stored in the required dest file.
517        """
518        valid_types = ("rsa", "dsa")
519        t = type.lower();
520        if t not in valid_types: raise ValueError
521        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
522
523        try:
524            trace = open("/dev/null", "w")
525        except EnvironmentError:
526            raise service_error(service_error.internal,
527                    "Cannot open /dev/null??");
528
529        # May raise CalledProcessError
530        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
531        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
532        if rv != 0:
533            raise service_error(service_error.internal, 
534                    "Cannot generate nonce ssh keys.  %s return code %d" \
535                            % (self.ssh_keygen, rv))
536
537    def generate_seer_certs(self, destdir):
538        '''
539        Create a SEER ca cert and a node cert in destdir/ca.pem and
540        destdir/node.pem respectively.  These will be distributed throughout
541        the federated experiment.  This routine reports errors via
542        service_errors.
543        '''
544        openssl = '/usr/bin/openssl'
545        # All the filenames and parameters we need for openssl calls below
546        ca_key =os.path.join(destdir, 'ca.key') 
547        ca_pem = os.path.join(destdir, 'ca.pem')
548        node_key =os.path.join(destdir, 'node.key') 
549        node_pem = os.path.join(destdir, 'node.pem')
550        node_req = os.path.join(destdir, 'node.req')
551        node_signed = os.path.join(destdir, 'node.signed')
552        days = '%s' % (365 * 10)
553        serial = '%s' % random.randint(0, 1<<16)
554
555        try:
556            # Sequence of calls to create a CA key, create a ca cert, create a
557            # node key, node signing request, and finally a signed node
558            # certificate.
559            sequence = (
560                    (openssl, 'genrsa', '-out', ca_key, '1024'),
561                    (openssl, 'req', '-new', '-x509', '-key', ca_key, '-out', 
562                        ca_pem, '-days', days, '-subj', 
563                        '/C=US/ST=CA/O=DETER/OU=fedd/CN=CA' ),
564                    (openssl, 'genrsa', '-out', node_key, '1024'),
565                    (openssl, 'req', '-new', '-key', node_key, '-out', 
566                        node_req, '-days', days, '-subj', 
567                        '/C=US/ST=CA/O=DETER/OU=fedd/CN=node' ),
568                    (openssl, 'x509', '-CA', ca_pem, '-CAkey', ca_key, 
569                        '-set_serial', serial, '-req', '-in', node_req, 
570                        '-out', node_signed, '-days', days),
571                )
572            # Do all that stuff; bail if there's an error, and push all the
573            # output to dev/null.
574            for cmd in sequence:
575                trace = open("/dev/null", "w")
576                rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
577                if rv != 0:
578                    raise service_error(service_error.internal, 
579                            "Cannot generate SEER certs.  %s return code %d" \
580                                    % (' '.join(cmd), rv))
581            # Concatinate the node key and signed certificate into node.pem
582            f = open(node_pem, 'w')
583            for comp in (node_signed, node_key):
584                g = open(comp, 'r')
585                f.write(g.read())
586                g.close()
587            f.close()
588
589            # Throw out intermediaries.
590            for fn in (ca_key, node_key, node_req, node_signed):
591                os.unlink(fn)
592
593        except EnvironmentError, e:
594            # Any difficulties with the file system wind up here
595            raise service_error(service_error.internal,
596                    "File error on  %s while creating SEER certs: %s" % \
597                            (e.filename, e.strerror))
598
599
600
601    def gentopo(self, str):
602        """
603        Generate the topology data structure from the splitter's XML
604        representation of it.
605
606        The topology XML looks like:
607            <experiment>
608                <nodes>
609                    <node><vname></vname><ips>ip1:ip2</ips></node>
610                </nodes>
611                <lans>
612                    <lan>
613                        <vname></vname><vnode></vnode><ip></ip>
614                        <bandwidth></bandwidth><member>node:port</member>
615                    </lan>
616                </lans>
617        """
618        class topo_parse:
619            """
620            Parse the topology XML and create the dats structure.
621            """
622            def __init__(self):
623                # Typing of the subelements for data conversion
624                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
625                self.int_subelements = ( 'bandwidth',)
626                self.float_subelements = ( 'delay',)
627                # The final data structure
628                self.nodes = [ ]
629                self.lans =  [ ]
630                self.topo = { \
631                        'node': self.nodes,\
632                        'lan' : self.lans,\
633                    }
634                self.element = { }  # Current element being created
635                self.chars = ""     # Last text seen
636
637            def end_element(self, name):
638                # After each sub element the contents is added to the current
639                # element or to the appropriate list.
640                if name == 'node':
641                    self.nodes.append(self.element)
642                    self.element = { }
643                elif name == 'lan':
644                    self.lans.append(self.element)
645                    self.element = { }
646                elif name in self.str_subelements:
647                    self.element[name] = self.chars
648                    self.chars = ""
649                elif name in self.int_subelements:
650                    self.element[name] = int(self.chars)
651                    self.chars = ""
652                elif name in self.float_subelements:
653                    self.element[name] = float(self.chars)
654                    self.chars = ""
655
656            def found_chars(self, data):
657                self.chars += data.rstrip()
658
659
660        tp = topo_parse();
661        parser = xml.parsers.expat.ParserCreate()
662        parser.EndElementHandler = tp.end_element
663        parser.CharacterDataHandler = tp.found_chars
664
665        parser.Parse(str)
666
667        return tp.topo
668       
669
670    def genviz(self, topo):
671        """
672        Generate the visualization the virtual topology
673        """
674
675        neato = "/usr/local/bin/neato"
676        # These are used to parse neato output and to create the visualization
677        # file.
678        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="([\d\.]+),([\d\.]+)"')
679        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
680                "%s</type></node>"
681
682        try:
683            # Node names
684            nodes = [ n['vname'] for n in topo['node'] ]
685            topo_lans = topo['lan']
686        except KeyError, e:
687            raise service_error(service_error.internal, "Bad topology: %s" %e)
688
689        lans = { }
690        links = { }
691
692        # Walk through the virtual topology, organizing the connections into
693        # 2-node connections (links) and more-than-2-node connections (lans).
694        # When a lan is created, it's added to the list of nodes (there's a
695        # node in the visualization for the lan).
696        for l in topo_lans:
697            if links.has_key(l['vname']):
698                if len(links[l['vname']]) < 2:
699                    links[l['vname']].append(l['vnode'])
700                else:
701                    nodes.append(l['vname'])
702                    lans[l['vname']] = links[l['vname']]
703                    del links[l['vname']]
704                    lans[l['vname']].append(l['vnode'])
705            elif lans.has_key(l['vname']):
706                lans[l['vname']].append(l['vnode'])
707            else:
708                links[l['vname']] = [ l['vnode'] ]
709
710
711        # Open up a temporary file for dot to turn into a visualization
712        try:
713            df, dotname = tempfile.mkstemp()
714            dotfile = os.fdopen(df, 'w')
715        except EnvironmentError:
716            raise service_error(service_error.internal,
717                    "Failed to open file in genviz")
718
719        try:
720            dnull = open('/dev/null', 'w')
721        except EnvironmentError:
722            service_error(service_error.internal,
723                    "Failed to open /dev/null in genviz")
724
725        # Generate a dot/neato input file from the links, nodes and lans
726        try:
727            print >>dotfile, "graph G {"
728            for n in nodes:
729                print >>dotfile, '\t"%s"' % n
730            for l in links.keys():
731                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
732            for l in lans.keys():
733                for n in lans[l]:
734                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
735            print >>dotfile, "}"
736            dotfile.close()
737        except TypeError:
738            raise service_error(service_error.internal,
739                    "Single endpoint link in vtopo")
740        except EnvironmentError:
741            raise service_error(service_error.internal, "Cannot write dot file")
742
743        # Use dot to create a visualization
744        try:
745            dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005',
746                '-Gmaxiter=2000', '-Gpack=true', dotname], stdout=PIPE,
747                stderr=dnull, close_fds=True)
748        except EnvironmentError:
749            raise service_error(service_error.internal, 
750                    "Cannot generate visualization: is graphviz available?")
751        dnull.close()
752
753        # Translate dot to vis format
754        vis_nodes = [ ]
755        vis = { 'node': vis_nodes }
756        for line in dot.stdout:
757            m = vis_re.match(line)
758            if m:
759                vn = m.group(1)
760                vis_node = {'name': vn, \
761                        'x': float(m.group(2)),\
762                        'y' : float(m.group(3)),\
763                    }
764                if vn in links.keys() or vn in lans.keys():
765                    vis_node['type'] = 'lan'
766                else:
767                    vis_node['type'] = 'node'
768                vis_nodes.append(vis_node)
769        rv = dot.wait()
770
771        os.remove(dotname)
772        # XXX: graphviz seems to use low return codes for warnings, like
773        # "couldn't find font"
774        if rv < 2 : return vis
775        else: return None
776
777
778    def release_access(self, tb, aid, tbmap=None, uri=None, cert_file=None,
779            cert_pwd=None):
780        """
781        Release access to testbed through fedd
782        """
783
784        if not uri and tbmap:
785            uri = tbmap.get(tb, None)
786        if not uri:
787            raise service_error(service_error.server_config, 
788                    "Unknown testbed: %s" % tb)
789
790        if self.local_access.has_key(uri):
791            resp = self.local_access[uri].ReleaseAccess(\
792                    { 'ReleaseAccessRequestBody' : 
793                        {'allocID': {'fedid': aid}},}, 
794                    fedid(file=cert_file))
795            resp = { 'ReleaseAccessResponseBody': resp } 
796        else:
797            resp = self.call_ReleaseAccess(uri, {'allocID': {'fedid': aid} },
798                    cert_file, cert_pwd, self.trusted_certs)
799
800        # better error coding
801
802    def remote_ns2topdl(self, uri, desc):
803
804        req = {
805                'description' : { 'ns2description': desc },
806            }
807
808        r = self.call_Ns2Topdl(uri, req, self.cert_file, self.cert_pwd, 
809                self.trusted_certs)
810
811        if r.has_key('Ns2TopdlResponseBody'):
812            r = r['Ns2TopdlResponseBody']
813            ed = r.get('experimentdescription', None)
814            if ed.has_key('topdldescription'):
815                return topdl.Topology(**ed['topdldescription'])
816            else:
817                raise service_error(service_error.protocol, 
818                        "Bad splitter response (no output)")
819        else:
820            raise service_error(service_error.protocol, "Bad splitter response")
821
822    class start_segment:
823        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
824                cert_pwd=None, trusted_certs=None, caller=None,
825                log_collector=None):
826            self.log = log
827            self.debug = debug
828            self.cert_file = cert_file
829            self.cert_pwd = cert_pwd
830            self.trusted_certs = None
831            self.caller = caller
832            self.testbed = testbed
833            self.log_collector = log_collector
834            self.response = None
835            self.node = { }
836            self.proof = None
837
838        def make_map(self, resp):
839            if 'segmentdescription' not in resp  or \
840                    'topdldescription' not in resp['segmentdescription']:
841                self.log.warn('No topology returned from startsegment')
842                return 
843
844            top = topdl.Topology(
845                    **resp['segmentdescription']['topdldescription'])
846
847            for e in top.elements:
848                if isinstance(e, topdl.Computer):
849                    self.node[e.name] = \
850                            (e.localname, e.status, e.service, e.operation)
851
852        def __call__(self, uri, aid, topo, masters, attrs=None, connInfo=None):
853            req = {
854                    'allocID': { 'fedid' : aid }, 
855                    'segmentdescription': { 
856                        'topdldescription': topo.to_dict(),
857                    },
858                }
859
860            if connInfo:
861                req['connection'] = connInfo
862
863            import_svcs = [ s for m in masters.values() \
864                    for s in m if self.testbed in s.importers]
865
866            if import_svcs or self.testbed in masters:
867                req['service'] = []
868
869            for s in import_svcs:
870                for r in s.reqs:
871                    sr = copy.deepcopy(r)
872                    sr['visibility'] = 'import';
873                    req['service'].append(sr)
874
875            for s in masters.get(self.testbed, []):
876                for r in s.reqs:
877                    sr = copy.deepcopy(r)
878                    sr['visibility'] = 'export';
879                    req['service'].append(sr)
880
881            if attrs:
882                req['fedAttr'] = attrs
883
884            try:
885                self.log.debug("Calling StartSegment at %s " % uri)
886                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
887                        self.trusted_certs)
888                if r.has_key('StartSegmentResponseBody'):
889                    lval = r['StartSegmentResponseBody'].get('allocationLog',
890                            None)
891                    if lval and self.log_collector:
892                        for line in  lval.splitlines(True):
893                            self.log_collector.write(line)
894                    self.make_map(r['StartSegmentResponseBody'])
895                    if 'proof' in r: self.proof = r['proof']
896                    self.response = r
897                else:
898                    raise service_error(service_error.internal, 
899                            "Bad response!?: %s" %r)
900                return True
901            except service_error, e:
902                self.log.error("Start segment failed on %s: %s" % \
903                        (self.testbed, e))
904                return False
905
906
907
908    class terminate_segment:
909        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
910                cert_pwd=None, trusted_certs=None, caller=None):
911            self.log = log
912            self.debug = debug
913            self.cert_file = cert_file
914            self.cert_pwd = cert_pwd
915            self.trusted_certs = None
916            self.caller = caller
917            self.testbed = testbed
918
919        def __call__(self, uri, aid ):
920            req = {
921                    'allocID': {'fedid': aid }, 
922                }
923            try:
924                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
925                        self.trusted_certs)
926                return True
927            except service_error, e:
928                self.log.error("Terminate segment failed on %s: %s" % \
929                        (self.testbed, e))
930                return False
931
932    def allocate_resources(self, allocated, masters, eid, expid, 
933            tbparams, top, topo, tmpdir, alloc_log=None, log_collector=None, 
934            attrs=None, connInfo={}, tbmap=None, expcert=None):
935
936        started = { }           # Testbeds where a sub-experiment started
937                                # successfully
938
939        # XXX
940        fail_soft = False
941
942        if tbmap is None: tbmap = { }
943
944        log = alloc_log or self.log
945
946        tp = thread_pool(self.nthreads)
947        threads = [ ]
948        starters = [ ]
949
950        if expcert:
951            cert = expcert
952            pw = None
953        else:
954            cert = self.cert_file
955            pw = self.cert_pwd
956
957        for tb in allocated.keys():
958            # Create and start a thread to start the segment, and save it
959            # to get the return value later
960            tb_attrs = copy.copy(attrs)
961            tp.wait_for_slot()
962            uri = tbparams[tb].get('uri', tbmap.get(testbed_base(tb), None))
963            base, suffix = split_testbed(tb)
964            if suffix:
965                tb_attrs.append({'attribute': 'experiment_name', 
966                    'value': "%s-%s" % (eid, suffix)})
967            else:
968                tb_attrs.append({'attribute': 'experiment_name', 'value': eid})
969            if not uri:
970                raise service_error(service_error.internal, 
971                        "Unknown testbed %s !?" % tb)
972
973            if tbparams[tb].has_key('allocID') and \
974                    tbparams[tb]['allocID'].has_key('fedid'):
975                aid = tbparams[tb]['allocID']['fedid']
976            else:
977                raise service_error(service_error.internal, 
978                        "No alloc id for testbed %s !?" % tb)
979
980            s = self.start_segment(log=log, debug=self.debug,
981                    testbed=tb, cert_file=cert,
982                    cert_pwd=pw, trusted_certs=self.trusted_certs,
983                    caller=self.call_StartSegment,
984                    log_collector=log_collector)
985            starters.append(s)
986            t  = pooled_thread(\
987                    target=s, name=tb,
988                    args=(uri, aid, topo[tb], masters, tb_attrs, connInfo[tb]),
989                    pdata=tp, trace_file=self.trace_file)
990            threads.append(t)
991            t.start()
992
993        # Wait until all finish (keep pinging the log, though)
994        mins = 0
995        revoked = False
996        while not tp.wait_for_all_done(60.0):
997            mins += 1
998            alloc_log.info("Waiting for sub threads (it has been %d mins)" \
999                    % mins)
1000            if not revoked and \
1001                    len([ t.getName() for t in threads if t.rv == False]) > 0:
1002                # a testbed has failed.  Revoke this experiment's
1003                # synchronizarion values so that sub experiments will not
1004                # deadlock waiting for synchronization that will never happen
1005                self.log.info("A subexperiment has failed to swap in, " + \
1006                        "revoking synch keys")
1007                var_key = "fedid:%s" % expid
1008                for k in self.synch_store.all_keys():
1009                    if len(k) > 45 and k[0:46] == var_key:
1010                        self.synch_store.revoke_key(k)
1011                revoked = True
1012
1013        failed = [ t.getName() for t in threads if not t.rv ]
1014        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1015
1016        # If one failed clean up, unless fail_soft is set
1017        if failed:
1018            if not fail_soft:
1019                tp.clear()
1020                for tb in succeeded:
1021                    # Create and start a thread to stop the segment
1022                    tp.wait_for_slot()
1023                    uri = tbparams[tb]['uri']
1024                    t  = pooled_thread(\
1025                            target=self.terminate_segment(log=log,
1026                                testbed=tb,
1027                                cert_file=cert, 
1028                                cert_pwd=pw,
1029                                trusted_certs=self.trusted_certs,
1030                                caller=self.call_TerminateSegment),
1031                            args=(uri, tbparams[tb]['federant']['allocID']),
1032                            name=tb,
1033                            pdata=tp, trace_file=self.trace_file)
1034                    t.start()
1035                # Wait until all finish (if any are being stopped)
1036                if succeeded:
1037                    tp.wait_for_all_done()
1038
1039                # release the allocations
1040                for tb in tbparams.keys():
1041                    try:
1042                        self.release_access(tb, tbparams[tb]['allocID'], 
1043                                tbmap=tbmap, uri=tbparams[tb].get('uri', None),
1044                                cert_file=cert, cert_pwd=pw)
1045                    except service_error, e:
1046                        self.log.warn("Error releasing access: %s" % e.desc)
1047                # Remove the placeholder
1048                self.state_lock.acquire()
1049                self.state[eid].status = 'failed'
1050                if self.state_filename: self.write_state()
1051                self.state_lock.release()
1052                # Remove the repo dir
1053                self.remove_dirs("%s/%s" %(self.repodir, expid))
1054                # Walk up tmpdir, deleting as we go
1055                if self.cleanup:
1056                    self.remove_dirs(tmpdir)
1057                else:
1058                    log.debug("[start_experiment]: not removing %s" % tmpdir)
1059
1060
1061                log.error("Swap in failed on %s" % ",".join(failed))
1062                return
1063        else:
1064            # Walk through the successes and gather the proofs
1065            proofs = { }
1066            for s in starters:
1067                if s.proof:
1068                    proofs[s.testbed] = s.proof
1069
1070            # Annotate the topology with embedding info
1071            for e in top.elements:
1072                if isinstance(e, topdl.Computer):
1073                    for s in starters:
1074                        ann = s.node.get(e.name, None)
1075                        if ann is not None:
1076                            e.localname.extend(ann[0])
1077                            e.status = ann[1]
1078                            e.service.extend(ann[2])
1079                            e.operation.extend(ann[3])
1080                            break
1081
1082            log.info("[start_segment]: Experiment %s active" % eid)
1083
1084
1085        # Walk up tmpdir, deleting as we go
1086        if self.cleanup:
1087            self.remove_dirs(tmpdir)
1088        else:
1089            log.debug("[start_experiment]: not removing %s" % tmpdir)
1090
1091        # Insert the experiment into our state and update the disk copy.
1092        self.state_lock.acquire()
1093        self.state[expid].status = 'active'
1094        self.state[eid] = self.state[expid]
1095        self.state[eid].top = top
1096        # Append startup proofs
1097        for f in self.state[eid].get_all_allocations():
1098            if f.tb in proofs:
1099                f.proof.append(proofs[f.tb])
1100
1101        if self.state_filename: self.write_state()
1102        self.state_lock.release()
1103        return
1104
1105
1106    def add_kit(self, e, kit):
1107        """
1108        Add a Software object created from the list of (install, location)
1109        tuples passed as kit  to the software attribute of an object e.  We
1110        do this enough to break out the code, but it's kind of a hack to
1111        avoid changing the old tuple rep.
1112        """
1113
1114        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1115
1116        if isinstance(e.software, list): e.software.extend(s)
1117        else: e.software = s
1118
1119    def append_experiment_authorization(self, expid, attrs, 
1120            need_state_lock=True):
1121        """
1122        Append the authorization information to system state
1123        """
1124
1125        for p, a in attrs:
1126            self.auth.set_attribute(p, a)
1127        self.auth.save()
1128
1129        if need_state_lock: self.state_lock.acquire()
1130        # XXX: really a no op?
1131        #self.state[expid]['auth'].update(attrs)
1132        if self.state_filename: self.write_state()
1133        if need_state_lock: self.state_lock.release()
1134
1135    def clear_experiment_authorization(self, expid, need_state_lock=True):
1136        """
1137        Attrs is a set of attribute principal pairs that need to be removed
1138        from the authenticator.  Remove them and save the authenticator.
1139        """
1140
1141        if need_state_lock: self.state_lock.acquire()
1142        # XXX: should be a no-op
1143        #if expid in self.state and 'auth' in self.state[expid]:
1144            #for p, a in self.state[expid]['auth']:
1145                #self.auth.unset_attribute(p, a)
1146            #self.state[expid]['auth'] = set()
1147        if self.state_filename: self.write_state()
1148        if need_state_lock: self.state_lock.release()
1149        self.auth.save()
1150
1151
1152    def create_experiment_state(self, fid, req, expid, expcert,
1153            state='starting'):
1154        """
1155        Create the initial entry in the experiment's state.  The expid and
1156        expcert are the experiment's fedid and certifacte that represents that
1157        ID, which are installed in the experiment state.  If the request
1158        includes a suggested local name that is used if possible.  If the local
1159        name is already taken by an experiment owned by this user that has
1160        failed, it is overwritten.  Otherwise new letters are added until a
1161        valid localname is found.  The generated local name is returned.
1162        """
1163
1164        if req.has_key('experimentID') and \
1165                req['experimentID'].has_key('localname'):
1166            overwrite = False
1167            eid = req['experimentID']['localname']
1168            # If there's an old failed experiment here with the same local name
1169            # and accessible by this user, we'll overwrite it, otherwise we'll
1170            # fall through and do the collision avoidance.
1171            old_expid = self.get_experiment_fedid(eid)
1172            if old_expid:
1173                users_experiment = True
1174                try:
1175                    self.check_experiment_access(fid, old_expid)
1176                except service_error, e:
1177                    if e.code == service_error.access: users_experiment = False
1178                    else: raise e
1179                if users_experiment:
1180                    self.state_lock.acquire()
1181                    status = self.state[eid].status
1182                    if status and status == 'failed':
1183                        # remove the old access attributes
1184                        self.clear_experiment_authorization(eid,
1185                                need_state_lock=False)
1186                        overwrite = True
1187                        del self.state[eid]
1188                        del self.state[old_expid]
1189                    self.state_lock.release()
1190                else:
1191                    self.log.info('Experiment %s exists, ' % eid + \
1192                            'but this user cannot access it')
1193            self.state_lock.acquire()
1194            while (self.state.has_key(eid) and not overwrite):
1195                eid += random.choice(string.ascii_letters)
1196            # Initial state
1197            self.state[eid] = experiment_info(fedid=expid, localname=eid, 
1198                    identity=expcert)
1199            self.state[expid] = self.state[eid]
1200            if self.state_filename: self.write_state()
1201            self.state_lock.release()
1202        else:
1203            eid = self.exp_stem
1204            for i in range(0,5):
1205                eid += random.choice(string.ascii_letters)
1206            self.state_lock.acquire()
1207            while (self.state.has_key(eid)):
1208                eid = self.exp_stem
1209                for i in range(0,5):
1210                    eid += random.choice(string.ascii_letters)
1211            # Initial state
1212            self.state[eid] = experiment_info(fedid=expid, localname=eid, 
1213                    identity=expcert)
1214            self.state[expid] = self.state[eid]
1215            if self.state_filename: self.write_state()
1216            self.state_lock.release()
1217
1218        # Let users touch the state.  Authorize this fid and the expid itself
1219        # to touch the experiment, as well as allowing th eoverrides.
1220        self.append_experiment_authorization(eid, 
1221                set([(fid, expid), (expid,expid)] + \
1222                        [ (o, expid) for o in self.overrides]))
1223
1224        return eid
1225
1226
1227    def allocate_ips_to_topo(self, top):
1228        """
1229        Add an ip4_address attribute to all the hosts in the topology, based on
1230        the shared substrates on which they sit.  An /etc/hosts file is also
1231        created and returned as a list of hostfiles entries.  We also return
1232        the allocator, because we may need to allocate IPs to portals
1233        (specifically DRAGON portals).
1234        """
1235        subs = sorted(top.substrates, 
1236                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1237                reverse=True)
1238        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1239        ifs = { }
1240        hosts = [ ]
1241
1242        for idx, s in enumerate(subs):
1243            net_size = len(s.interfaces)+2
1244
1245            a = ips.allocate(net_size)
1246            if a :
1247                base, num = a
1248                if num < net_size: 
1249                    raise service_error(service_error.internal,
1250                            "Allocator returned wrong number of IPs??")
1251            else:
1252                raise service_error(service_error.req, 
1253                        "Cannot allocate IP addresses")
1254            mask = ips.min_alloc
1255            while mask < net_size:
1256                mask *= 2
1257
1258            netmask = ((2**32-1) ^ (mask-1))
1259
1260            base += 1
1261            for i in s.interfaces:
1262                i.attribute.append(
1263                        topdl.Attribute('ip4_address', 
1264                            "%s" % ip_addr(base)))
1265                i.attribute.append(
1266                        topdl.Attribute('ip4_netmask', 
1267                            "%s" % ip_addr(int(netmask))))
1268
1269                hname = i.element.name
1270                if ifs.has_key(hname):
1271                    hosts.append("%s\t%s-%s %s-%d" % \
1272                            (ip_addr(base), hname, s.name, hname,
1273                                ifs[hname]))
1274                else:
1275                    ifs[hname] = 0
1276                    hosts.append("%s\t%s-%s %s-%d %s" % \
1277                            (ip_addr(base), hname, s.name, hname,
1278                                ifs[hname], hname))
1279
1280                ifs[hname] += 1
1281                base += 1
1282        return hosts, ips
1283
1284    def get_access_to_testbeds(self, testbeds, fid, allocated, 
1285            tbparam, masters, tbmap, expid=None, expcert=None):
1286        for tb in testbeds:
1287            self.get_access(tb, tbparam, fid, masters, tbmap, expid,
1288                    expcert)
1289            allocated[tb] = 1
1290
1291    def get_access(self, tb, tbparam,fid, masters, tbmap, expid=None, 
1292            expcert=None):
1293        """
1294        Get access to testbed through fedd and set the parameters for that tb
1295        """
1296        def get_export_project(svcs):
1297            """
1298            Look through for the list of federated_service for this testbed
1299            objects for a project_export service, and extract the project
1300            parameter.
1301            """
1302
1303            pe = [s for s in svcs if s.name=='project_export']
1304            if len(pe) == 1:
1305                return pe[0].params.get('project', None)
1306            elif len(pe) == 0:
1307                return None
1308            else:
1309                raise service_error(service_error.req,
1310                        "More than one project export is not supported")
1311
1312        def add_services(svcs, type, slist, keys):
1313            """
1314            Add the given services to slist.  type is import or export.  Also
1315            add a mapping entry from the assigned id to the original service
1316            record.
1317            """
1318            for i, s in enumerate(svcs):
1319                idx = '%s%d' % (type, i)
1320                keys[idx] = s
1321                sr = {'id': idx, 'name': s.name, 'visibility': type }
1322                if s.params:
1323                    sr['fedAttr'] = [ { 'attribute': k, 'value': v } \
1324                            for k, v in s.params.items()]
1325                slist.append(sr)
1326
1327        uri = tbmap.get(testbed_base(tb), None)
1328        if not uri:
1329            raise service_error(service_error.server_config, 
1330                    "Unknown testbed: %s" % tb)
1331
1332        export_svcs = masters.get(tb,[])
1333        import_svcs = [ s for m in masters.values() \
1334                for s in m \
1335                    if tb in s.importers ]
1336
1337        export_project = get_export_project(export_svcs)
1338        # Compose the credential list so that IDs come before attributes
1339        creds = set()
1340        keys = set()
1341        certs = self.auth.get_creds_for_principal(fid)
1342        if expid:
1343            certs.update(self.auth.get_creds_for_principal(expid))
1344        for c in certs:
1345            keys.add(c.issuer_cert())
1346            creds.add(c.attribute_cert())
1347        creds = list(keys) + list(creds)
1348
1349        if expcert: cert, pw = expcert, None
1350        else: cert, pw = self.cert_file, self.cert_pw
1351
1352        # Request credentials
1353        req = {
1354                'abac_credential': creds,
1355            }
1356        # Make the service request from the services we're importing and
1357        # exporting.  Keep track of the export request ids so we can
1358        # collect the resulting info from the access response.
1359        e_keys = { }
1360        if import_svcs or export_svcs:
1361            slist = []
1362            add_services(import_svcs, 'import', slist, e_keys)
1363            add_services(export_svcs, 'export', slist, e_keys)
1364            req['service'] = slist
1365
1366        if self.local_access.has_key(uri):
1367            # Local access call
1368            req = { 'RequestAccessRequestBody' : req }
1369            r = self.local_access[uri].RequestAccess(req, 
1370                    fedid(file=self.cert_file))
1371            r = { 'RequestAccessResponseBody' : r }
1372        else:
1373            r = self.call_RequestAccess(uri, req, cert, pw, self.trusted_certs)
1374
1375        if r.has_key('RequestAccessResponseBody'):
1376            # Through to here we have a valid response, not a fault.
1377            # Access denied is a fault, so something better or worse than
1378            # access denied has happened.
1379            r = r['RequestAccessResponseBody']
1380            self.log.debug("[get_access] Access granted")
1381        else:
1382            raise service_error(service_error.protocol,
1383                        "Bad proxy response")
1384        if 'proof' not in r:
1385            raise service_error(service_error.protocol,
1386                        "Bad access response (no access proof)")
1387       
1388        tbparam[tb] = { 
1389                "allocID" : r['allocID'],
1390                "uri": uri,
1391                "proof": [ r['proof'] ],
1392                }
1393
1394        # Collect the responses corresponding to the services this testbed
1395        # exports.  These will be the service requests that we will include in
1396        # the start segment requests (with appropriate visibility values) to
1397        # import and export the segments.
1398        for s in r.get('service', []):
1399            id = s.get('id', None)
1400            if id and id in e_keys:
1401                e_keys[id].reqs.append(s)
1402
1403        # Add attributes to parameter space.  We don't allow attributes to
1404        # overlay any parameters already installed.
1405        for a in r.get('fedAttr', []):
1406            try:
1407                if a['attribute'] and \
1408                        isinstance(a['attribute'], basestring)\
1409                        and not tbparam[tb].has_key(a['attribute'].lower()):
1410                    tbparam[tb][a['attribute'].lower()] = a['value']
1411            except KeyError:
1412                self.log.error("Bad attribute in response: %s" % a)
1413
1414
1415    def split_topology(self, top, topo, testbeds):
1416        """
1417        Create the sub-topologies that are needed for experiment instantiation.
1418        """
1419        for tb in testbeds:
1420            topo[tb] = top.clone()
1421            # copy in for loop allows deletions from the original
1422            for e in [ e for e in topo[tb].elements]:
1423                etb = e.get_attribute('testbed')
1424                # NB: elements without a testbed attribute won't appear in any
1425                # sub topologies. 
1426                if not etb or etb != tb:
1427                    for i in e.interface:
1428                        for s in i.subs:
1429                            try:
1430                                s.interfaces.remove(i)
1431                            except ValueError:
1432                                raise service_error(service_error.internal,
1433                                        "Can't remove interface??")
1434                    topo[tb].elements.remove(e)
1435            topo[tb].make_indices()
1436
1437    def confirm_software(self, top):
1438        """
1439        Make sure that the software to be loaded in the topo is all available
1440        before we begin making access requests, etc.  This is a subset of
1441        wrangle_software.
1442        """
1443        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1444        pkgs.update([x.location for e in top.elements for x in e.software])
1445
1446        for pkg in pkgs:
1447            loc = pkg
1448
1449            scheme, host, path = urlparse(loc)[0:3]
1450            dest = os.path.basename(path)
1451            if not scheme:
1452                if not loc.startswith('/'):
1453                    loc = "/%s" % loc
1454                loc = "file://%s" %loc
1455            # NB: if scheme was found, loc == pkg
1456            try:
1457                u = urlopen(loc)
1458                u.close()
1459            except Exception, e:
1460                raise service_error(service_error.req, 
1461                        "Cannot open %s: %s" % (loc, e))
1462        return True
1463
1464    def wrangle_software(self, expid, top, topo, tbparams):
1465        """
1466        Copy software out to the repository directory, allocate permissions and
1467        rewrite the segment topologies to look for the software in local
1468        places.
1469        """
1470
1471        # Copy the rpms and tarfiles to a distribution directory from
1472        # which the federants can retrieve them
1473        linkpath = "%s/software" %  expid
1474        softdir ="%s/%s" % ( self.repodir, linkpath)
1475        softmap = { }
1476
1477        # self.fedkit and self.gateway kit are lists of tuples of
1478        # (install_location, download_location) this extracts the download
1479        # locations.
1480        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1481        pkgs.update([x.location for e in top.elements for x in e.software])
1482        try:
1483            os.makedirs(softdir)
1484        except EnvironmentError, e:
1485            raise service_error(
1486                    "Cannot create software directory: %s" % e)
1487        # The actual copying.  Everything's converted into a url for copying.
1488        auth_attrs = set()
1489        for pkg in pkgs:
1490            loc = pkg
1491
1492            scheme, host, path = urlparse(loc)[0:3]
1493            dest = os.path.basename(path)
1494            if not scheme:
1495                if not loc.startswith('/'):
1496                    loc = "/%s" % loc
1497                loc = "file://%s" %loc
1498            # NB: if scheme was found, loc == pkg
1499            try:
1500                u = urlopen(loc)
1501            except Exception, e:
1502                raise service_error(service_error.req, 
1503                        "Cannot open %s: %s" % (loc, e))
1504            try:
1505                f = open("%s/%s" % (softdir, dest) , "w")
1506                self.log.debug("Writing %s/%s" % (softdir,dest) )
1507                data = u.read(4096)
1508                while data:
1509                    f.write(data)
1510                    data = u.read(4096)
1511                f.close()
1512                u.close()
1513            except Exception, e:
1514                raise service_error(service_error.internal,
1515                        "Could not copy %s: %s" % (loc, e))
1516            path = re.sub("/tmp", "", linkpath)
1517            # XXX
1518            softmap[pkg] = \
1519                    "%s/%s/%s" %\
1520                    ( self.repo_url, path, dest)
1521
1522            # Allow the individual segments to access the software by assigning
1523            # an attribute to each testbed allocation that encodes the data to
1524            # be released.  This expression collects the data for each run of
1525            # the loop.
1526            auth_attrs.update([
1527                (tbparams[tb]['allocID']['fedid'], "/%s/%s" % ( path, dest)) \
1528                        for tb in tbparams.keys()])
1529
1530        self.append_experiment_authorization(expid, auth_attrs)
1531
1532        # Convert the software locations in the segments into the local
1533        # copies on this host
1534        for soft in [ s for tb in topo.values() \
1535                for e in tb.elements \
1536                    if getattr(e, 'software', False) \
1537                        for s in e.software ]:
1538            if softmap.has_key(soft.location):
1539                soft.location = softmap[soft.location]
1540
1541
1542    def new_experiment(self, req, fid):
1543        """
1544        The external interface to empty initial experiment creation called from
1545        the dispatcher.
1546
1547        Creates a working directory, splits the incoming description using the
1548        splitter script and parses out the avrious subsections using the
1549        lcasses above.  Once each sub-experiment is created, use pooled threads
1550        to instantiate them and start it all up.
1551        """
1552        req = req.get('NewRequestBody', None)
1553        if not req:
1554            raise service_error(service_error.req,
1555                    "Bad request format (no NewRequestBody)")
1556
1557        if self.auth.import_credentials(data_list=req.get('credential', [])):
1558            self.auth.save()
1559
1560        access_ok, proof = self.auth.check_attribute(fid, 'new', 
1561                with_proof=True)
1562
1563        if not access_ok:
1564            raise service_error(service_error.access, "New access denied",
1565                    proof=[proof])
1566
1567        try:
1568            tmpdir = tempfile.mkdtemp(prefix="split-")
1569        except EnvironmentError:
1570            raise service_error(service_error.internal, "Cannot create tmp dir")
1571
1572        try:
1573            access_user = self.accessdb[fid]
1574        except KeyError:
1575            raise service_error(service_error.internal,
1576                    "Access map and authorizer out of sync in " + \
1577                            "new_experiment for fedid %s"  % fid)
1578
1579        # Generate an ID for the experiment (slice) and a certificate that the
1580        # allocator can use to prove they own it.  We'll ship it back through
1581        # the encrypted connection.  If the requester supplied one, use it.
1582        if 'experimentAccess' in req and 'X509' in req['experimentAccess']:
1583            expcert = req['experimentAccess']['X509']
1584            expid = fedid(certstr=expcert)
1585            self.state_lock.acquire()
1586            if expid in self.state:
1587                self.state_lock.release()
1588                raise service_error(service_error.req, 
1589                        'fedid %s identifies an existing experiment' % expid)
1590            self.state_lock.release()
1591        else:
1592            (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1593
1594        #now we're done with the tmpdir, and it should be empty
1595        if self.cleanup:
1596            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1597            os.rmdir(tmpdir)
1598        else:
1599            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1600
1601        eid = self.create_experiment_state(fid, req, expid, expcert, 
1602                state='empty')
1603
1604        rv = {
1605                'experimentID': [
1606                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1607                ],
1608                'experimentStatus': 'empty',
1609                'experimentAccess': { 'X509' : expcert },
1610                'proof': proof.to_dict(),
1611            }
1612
1613        return rv
1614
1615    # create_experiment sub-functions
1616
1617    @staticmethod
1618    def get_experiment_key(req, field='experimentID'):
1619        """
1620        Parse the experiment identifiers out of the request (the request body
1621        tag has been removed).  Specifically this pulls either the fedid or the
1622        localname out of the experimentID field.  A fedid is preferred.  If
1623        neither is present or the request does not contain the fields,
1624        service_errors are raised.
1625        """
1626        # Get the experiment access
1627        exp = req.get(field, None)
1628        if exp:
1629            if exp.has_key('fedid'):
1630                key = exp['fedid']
1631            elif exp.has_key('localname'):
1632                key = exp['localname']
1633            else:
1634                raise service_error(service_error.req, "Unknown lookup type")
1635        else:
1636            raise service_error(service_error.req, "No request?")
1637
1638        return key
1639
1640    def get_experiment_ids_and_start(self, key, tmpdir):
1641        """
1642        Get the experiment name, id and access certificate from the state, and
1643        set the experiment state to 'starting'.  returns a triple (fedid,
1644        localname, access_cert_file). The access_cert_file is a copy of the
1645        contents of the access certificate, created in the tempdir with
1646        restricted permissions.  If things are confused, raise an exception.
1647        """
1648
1649        expid = eid = None
1650        self.state_lock.acquire()
1651        if key in self.state:
1652            exp = self.state[key]
1653            exp.status = "starting"
1654            expid = exp.fedid
1655            eid = exp.localname
1656            expcert = exp.identity
1657        self.state_lock.release()
1658
1659        # make a protected copy of the access certificate so the experiment
1660        # controller can act as the experiment principal.
1661        if expcert:
1662            expcert_file = self.make_temp_certfile(expcert, tmpdir)
1663            if not expcert_file:
1664                raise service_error(service_error.internal, 
1665                        "Cannot create temp cert file?")
1666        else:
1667            expcert_file = None
1668
1669        return (eid, expid, expcert_file)
1670
1671    def get_topology(self, req, tmpdir):
1672        """
1673        Get the ns2 content and put it into a file for parsing.  Call the local
1674        or remote parser and return the topdl.Topology.  Errors result in
1675        exceptions.  req is the request and tmpdir is a work directory.
1676        """
1677
1678        # The tcl parser needs to read a file so put the content into that file
1679        descr=req.get('experimentdescription', None)
1680        if descr:
1681            if 'ns2description' in descr:
1682                file_content=descr['ns2description']
1683            elif 'topdldescription' in descr:
1684                return topdl.Topology(**descr['topdldescription'])
1685            else:
1686                raise service_error(service_error.req, 
1687                        'Unknown experiment description type')
1688        else:
1689            raise service_error(service_error.req, "No experiment description")
1690
1691
1692        if self.splitter_url:
1693            self.log.debug("Calling remote topdl translator at %s" % \
1694                    self.splitter_url)
1695            top = self.remote_ns2topdl(self.splitter_url, file_content)
1696        else:
1697            tclfile = os.path.join(tmpdir, "experiment.tcl")
1698            if file_content:
1699                try:
1700                    f = open(tclfile, 'w')
1701                    f.write(file_content)
1702                    f.close()
1703                except EnvironmentError:
1704                    raise service_error(service_error.internal,
1705                            "Cannot write temp experiment description")
1706            else:
1707                raise service_error(service_error.req, 
1708                        "Only ns2descriptions supported")
1709            pid = "dummy"
1710            gid = "dummy"
1711            eid = "dummy"
1712
1713            tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
1714                str(self.muxmax), '-m', 'dummy']
1715
1716            tclcmd.extend([pid, gid, eid, tclfile])
1717
1718            self.log.debug("running local splitter %s", " ".join(tclcmd))
1719            # This is just fantastic.  As a side effect the parser copies
1720            # tb_compat.tcl into the current directory, so that directory
1721            # must be writable by the fedd user.  Doing this in the
1722            # temporary subdir ensures this is the case.
1723            tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
1724                    cwd=tmpdir)
1725            split_data = tclparser.stdout
1726
1727            top = topdl.topology_from_xml(file=split_data, top="experiment")
1728            os.remove(tclfile)
1729
1730        return top
1731
1732    def get_testbed_services(self, req, testbeds):
1733        """
1734        Parse the services section of the request into into two dicts mapping
1735        testbed to lists of federated_service objects.  The first lists all
1736        exporters of services, and the second all exporters of services that
1737        need control portals in the experiment.
1738        """
1739        masters = { }
1740        pmasters = { }
1741        for s in req.get('service', []):
1742            # If this is a service request with the importall field
1743            # set, fill it out.
1744
1745            if s.get('importall', False):
1746                s['import'] = [ tb for tb in testbeds \
1747                        if tb not in s.get('export',[])]
1748                del s['importall']
1749
1750            # Add the service to masters
1751            for tb in s.get('export', []):
1752                if s.get('name', None):
1753
1754                    params = { }
1755                    for a in s.get('fedAttr', []):
1756                        params[a.get('attribute', '')] = a.get('value','')
1757
1758                    fser = federated_service(name=s['name'],
1759                            exporter=tb, importers=s.get('import',[]),
1760                            params=params)
1761                    if fser.name == 'hide_hosts' \
1762                            and 'hosts' not in fser.params:
1763                        fser.params['hosts'] = \
1764                                ",".join(tb_hosts.get(fser.exporter, []))
1765                    if tb in masters: masters[tb].append(fser)
1766                    else: masters[tb] = [fser]
1767
1768                    if fser.portal:
1769                        if tb not in pmasters: pmasters[tb] = [ fser ]
1770                        else: pmasters[tb].append(fser)
1771                else:
1772                    self.log.error('Testbed service does not have name " + \
1773                            "and importers')
1774        return masters, pmasters
1775
1776    def generate_keys_and_hosts(self, tmpdir, expid, hosts, tbparams):
1777        """
1778        Create the ssh keys necessary for interconnecting the portal nodes and
1779        the global hosts file for letting each segment know about the IP
1780        addresses in play.  Save these into the repo.  Add attributes to the
1781        autorizer allowing access controllers to download them and return a set
1782        of attributes that inform the segments where to find this stuff.  May
1783        raise service_errors in if there are problems.
1784        """
1785        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1786        gw_secretkey_base = "fed.%s" % self.ssh_type
1787        keydir = os.path.join(tmpdir, 'keys')
1788        gw_pubkey = os.path.join(keydir, gw_pubkey_base)
1789        gw_secretkey = os.path.join(keydir, gw_secretkey_base)
1790
1791        try:
1792            self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1793        except ValueError:
1794            raise service_error(service_error.server_config, 
1795                    "Bad key type (%s)" % self.ssh_type)
1796
1797        self.generate_seer_certs(keydir)
1798
1799        # Copy configuration files into the remote file store
1800        # The config urlpath
1801        configpath = "/%s/config" % expid
1802        # The config file system location
1803        configdir ="%s%s" % ( self.repodir, configpath)
1804        try:
1805            os.makedirs(configdir)
1806        except EnvironmentError, e:
1807            raise service_error(service_error.internal,
1808                    "Cannot create config directory: %s" % e)
1809        try:
1810            f = open("%s/hosts" % configdir, "w")
1811            print >> f, string.join(hosts, '\n')
1812            f.close()
1813        except EnvironmentError, e:
1814            raise service_error(service_error.internal, 
1815                    "Cannot write hosts file: %s" % e)
1816        try:
1817            copy_file(gw_pubkey, os.path.join(configdir, gw_pubkey_base))
1818            copy_file(gw_secretkey, os.path.join(configdir, gw_secretkey_base))
1819            copy_file(os.path.join(keydir, 'ca.pem'), 
1820                    os.path.join(configdir, 'ca.pem'))
1821            copy_file(os.path.join(keydir, 'node.pem'), 
1822                    os.path.join(configdir, 'node.pem'))
1823        except EnvironmentError, e:
1824            raise service_error(service_error.internal, 
1825                    "Cannot copy keyfiles: %s" % e)
1826
1827        # Allow the individual testbeds to access the configuration files,
1828        # again by setting an attribute for the relevant pathnames on each
1829        # allocation principal.  Yeah, that's a long list comprehension.
1830        self.append_experiment_authorization(expid, set([
1831            (tbparams[tb]['allocID']['fedid'], "%s/%s" % (configpath, f)) \
1832                    for tb in tbparams.keys() \
1833                        for f in ("hosts", 'ca.pem', 'node.pem', 
1834                            gw_secretkey_base, gw_pubkey_base)]))
1835
1836        attrs = [ 
1837                {
1838                    'attribute': 'ssh_pubkey', 
1839                    'value': '%s/%s/config/%s' % \
1840                            (self.repo_url, expid, gw_pubkey_base)
1841                },
1842                {
1843                    'attribute': 'ssh_secretkey', 
1844                    'value': '%s/%s/config/%s' % \
1845                            (self.repo_url, expid, gw_secretkey_base)
1846                },
1847                {
1848                    'attribute': 'hosts', 
1849                    'value': '%s/%s/config/hosts' % \
1850                            (self.repo_url, expid)
1851                },
1852                {
1853                    'attribute': 'seer_ca_pem', 
1854                    'value': '%s/%s/config/%s' % \
1855                            (self.repo_url, expid, 'ca.pem')
1856                },
1857                {
1858                    'attribute': 'seer_node_pem', 
1859                    'value': '%s/%s/config/%s' % \
1860                            (self.repo_url, expid, 'node.pem')
1861                },
1862            ]
1863        return attrs
1864
1865
1866    def get_vtopo(self, req, fid):
1867        """
1868        Return the stored virtual topology for this experiment
1869        """
1870        rv = None
1871        state = None
1872
1873        req = req.get('VtopoRequestBody', None)
1874        if not req:
1875            raise service_error(service_error.req,
1876                    "Bad request format (no VtopoRequestBody)")
1877        exp = req.get('experiment', None)
1878        if exp:
1879            if exp.has_key('fedid'):
1880                key = exp['fedid']
1881                keytype = "fedid"
1882            elif exp.has_key('localname'):
1883                key = exp['localname']
1884                keytype = "localname"
1885            else:
1886                raise service_error(service_error.req, "Unknown lookup type")
1887        else:
1888            raise service_error(service_error.req, "No request?")
1889
1890        proof = self.check_experiment_access(fid, key)
1891
1892        self.state_lock.acquire()
1893        # XXX: this needs to be recalculated
1894        if key in self.state:
1895            if self.state[key].top is not None:
1896                vtopo = topdl.topology_to_vtopo(self.state[key].top)
1897                rv = { 'experiment' : {keytype: key },
1898                        'vtopo': vtopo,
1899                        'proof': proof.to_dict(), 
1900                    }
1901            else:
1902                state = self.state[key].status
1903        self.state_lock.release()
1904
1905        if rv: return rv
1906        else: 
1907            if state:
1908                raise service_error(service_error.partial, 
1909                        "Not ready: %s" % state)
1910            else:
1911                raise service_error(service_error.req, "No such experiment")
1912
1913    def get_vis(self, req, fid):
1914        """
1915        Return the stored visualization for this experiment
1916        """
1917        rv = None
1918        state = None
1919
1920        req = req.get('VisRequestBody', None)
1921        if not req:
1922            raise service_error(service_error.req,
1923                    "Bad request format (no VisRequestBody)")
1924        exp = req.get('experiment', None)
1925        if exp:
1926            if exp.has_key('fedid'):
1927                key = exp['fedid']
1928                keytype = "fedid"
1929            elif exp.has_key('localname'):
1930                key = exp['localname']
1931                keytype = "localname"
1932            else:
1933                raise service_error(service_error.req, "Unknown lookup type")
1934        else:
1935            raise service_error(service_error.req, "No request?")
1936
1937        proof = self.check_experiment_access(fid, key)
1938
1939        self.state_lock.acquire()
1940        # Generate the visualization
1941        if key in self.state:
1942            if self.state[key].top is not None:
1943                try:
1944                    vis = self.genviz(
1945                            topdl.topology_to_vtopo(self.state[key].topo))
1946                except service_error, e:
1947                    self.state_lock.release()
1948                    raise e
1949                rv =  { 'experiment' : {keytype: key },
1950                        'vis': vis,
1951                        'proof': proof.to_dict(), 
1952                        }
1953            else:
1954                state = self.state[key].status
1955        self.state_lock.release()
1956
1957        if rv: return rv
1958        else:
1959            if state:
1960                raise service_error(service_error.partial, 
1961                        "Not ready: %s" % state)
1962            else:
1963                raise service_error(service_error.req, "No such experiment")
1964
1965   
1966    def save_federant_information(self, allocated, tbparams, eid, vtopo, vis,
1967            top):
1968        """
1969        Store the various data that have changed in the experiment state
1970        between when it was started and the beginning of resource allocation.
1971        This is basically the information about each local allocation.  This
1972        fills in the values of the placeholder allocation in the state.  It
1973        also collects the access proofs and returns them as dicts for a
1974        response message.
1975        """
1976        self.state_lock.acquire()
1977        # XXX:
1978        #self.state[eid]['vtopo'] = vtopo
1979        #self.state[eid]['vis'] = vis
1980        exp = self.state[eid]
1981        exp.topology = top
1982        # save federant information
1983        for k in allocated.keys():
1984            exp.add_allocation(
1985                    allocation_info(
1986                        tb=k, 
1987                        allocID=tbparams[k]['allocID'].get('fedid', None),
1988                        uri=tbparams[k]['uri'],
1989                        proof=tbparams[k]['proof'])
1990                    )
1991
1992        # Access proofs for the response message
1993        proofs = [copy.deepcopy(p) for k in tbparams.keys()\
1994                    for p in tbparams[k]['proof']]
1995        if self.state_filename: 
1996            self.write_state()
1997        self.state_lock.release()
1998        return proofs
1999
2000    def clear_placeholder(self, eid, expid, tmpdir):
2001        """
2002        Clear the placeholder and remove any allocated temporary dir.
2003        """
2004
2005        self.state_lock.acquire()
2006        del self.state[eid]
2007        del self.state[expid]
2008        if self.state_filename: self.write_state()
2009        self.state_lock.release()
2010        if tmpdir and self.cleanup:
2011            self.remove_dirs(tmpdir)
2012
2013    # end of create_experiment sub-functions
2014
2015    def create_experiment(self, req, fid):
2016        """
2017        The external interface to experiment creation called from the
2018        dispatcher.
2019
2020        Creates a working directory, splits the incoming description using the
2021        splitter script and parses out the various subsections using the
2022        classes above.  Once each sub-experiment is created, use pooled threads
2023        to instantiate them and start it all up.
2024        """
2025
2026        req = req.get('CreateRequestBody', None)
2027        if req:
2028            key = self.get_experiment_key(req)
2029        else:
2030            raise service_error(service_error.req,
2031                    "Bad request format (no CreateRequestBody)")
2032
2033        # Import information from the requester
2034        if self.auth.import_credentials(data_list=req.get('credential', [])):
2035            self.auth.save()
2036
2037        # Make sure that the caller can talk to us
2038        proof = self.check_experiment_access(fid, key)
2039
2040        # Install the testbed map entries supplied with the request into a copy
2041        # of the testbed map.
2042        tbmap = dict(self.tbmap)
2043        for m in req.get('testbedmap', []):
2044            if 'testbed' in m and 'uri' in m:
2045                tbmap[m['testbed']] = m['uri']
2046
2047        # a place to work
2048        try:
2049            tmpdir = tempfile.mkdtemp(prefix="split-")
2050            os.mkdir(tmpdir+"/keys")
2051        except EnvironmentError:
2052            raise service_error(service_error.internal, "Cannot create tmp dir")
2053
2054        tbparams = { }
2055
2056        eid, expid, expcert_file = \
2057                self.get_experiment_ids_and_start(key, tmpdir)
2058
2059        # This catches exceptions to clear the placeholder if necessary
2060        try: 
2061            if not (eid and expid):
2062                raise service_error(service_error.internal, 
2063                        "Cannot find local experiment info!?")
2064
2065            top = self.get_topology(req, tmpdir)
2066            self.confirm_software(top)
2067            # Assign the IPs
2068            hosts, ip_allocator = self.allocate_ips_to_topo(top)
2069            # Find the testbeds to look up
2070            tb_hosts = { }
2071            testbeds = [ ]
2072            for e in top.elements:
2073                if isinstance(e, topdl.Computer):
2074                    tb = e.get_attribute('testbed') or 'default'
2075                    if tb in tb_hosts: tb_hosts[tb].append(e.name)
2076                    else: 
2077                        tb_hosts[tb] = [ e.name ]
2078                        testbeds.append(tb)
2079
2080            masters, pmasters = self.get_testbed_services(req, testbeds)
2081            allocated = { }         # Testbeds we can access
2082            topo ={ }               # Sub topologies
2083            connInfo = { }          # Connection information
2084
2085            self.split_topology(top, topo, testbeds)
2086
2087            self.get_access_to_testbeds(testbeds, fid, allocated, 
2088                    tbparams, masters, tbmap, expid, expcert_file)
2089
2090            attrs = self.generate_keys_and_hosts(tmpdir, expid, hosts, tbparams)
2091
2092            part = experiment_partition(self.auth, self.store_url, tbmap,
2093                    self.muxmax, self.direct_transit)
2094            part.add_portals(top, topo, eid, pmasters, tbparams, ip_allocator,
2095                    connInfo, expid)
2096
2097            auth_attrs = set()
2098            # Now get access to the dynamic testbeds (those added above)
2099            for tb in [ t for t in topo if t not in allocated]:
2100                self.get_access(tb, tbparams, fid, masters, tbmap, 
2101                        expid, expcert_file)
2102                allocated[tb] = 1
2103                store_keys = topo[tb].get_attribute('store_keys')
2104                # Give the testbed access to keys it exports or imports
2105                if store_keys:
2106                    auth_attrs.update(set([
2107                        (tbparams[tb]['allocID']['fedid'], sk) \
2108                                for sk in store_keys.split(" ")]))
2109
2110            if auth_attrs:
2111                self.append_experiment_authorization(expid, auth_attrs)
2112
2113            # transit and disconnected testbeds may not have a connInfo entry.
2114            # Fill in the blanks.
2115            for t in allocated.keys():
2116                if not connInfo.has_key(t):
2117                    connInfo[t] = { }
2118
2119            self.wrangle_software(expid, top, topo, tbparams)
2120
2121            vtopo = topdl.topology_to_vtopo(top)
2122            vis = self.genviz(vtopo)
2123            proofs = self.save_federant_information(allocated, tbparams, 
2124                    eid, vtopo, vis, top)
2125        except service_error, e:
2126            # If something goes wrong in the parse (usually an access error)
2127            # clear the placeholder state.  From here on out the code delays
2128            # exceptions.  Failing at this point returns a fault to the remote
2129            # caller.
2130            self.clear_placeholder(eid, expid, tmpdir)
2131            raise e
2132
2133        # Start the background swapper and return the starting state.  From
2134        # here on out, the state will stick around a while.
2135
2136        # Create a logger that logs to the experiment's state object as well as
2137        # to the main log file.
2138        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
2139        alloc_collector = self.list_log(self.state[eid].log)
2140        h = logging.StreamHandler(alloc_collector)
2141        # XXX: there should be a global one of these rather than repeating the
2142        # code.
2143        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2144                    '%d %b %y %H:%M:%S'))
2145        alloc_log.addHandler(h)
2146
2147        # Start a thread to do the resource allocation
2148        t  = Thread(target=self.allocate_resources,
2149                args=(allocated, masters, eid, expid, tbparams, 
2150                    top, topo, tmpdir, alloc_log, alloc_collector, attrs,
2151                    connInfo, tbmap, expcert_file),
2152                name=eid)
2153        t.start()
2154
2155        rv = {
2156                'experimentID': [
2157                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2158                ],
2159                'experimentStatus': 'starting',
2160                'proof': [ proof.to_dict() ] + proofs,
2161            }
2162
2163        return rv
2164   
2165    def get_experiment_fedid(self, key):
2166        """
2167        find the fedid associated with the localname key in the state database.
2168        """
2169
2170        rv = None
2171        self.state_lock.acquire()
2172        if key in self.state:
2173            rv = self.state[key].fedid
2174        self.state_lock.release()
2175        return rv
2176
2177    def check_experiment_access(self, fid, key):
2178        """
2179        Confirm that the fid has access to the experiment.  Though a request
2180        may be made in terms of a local name, the access attribute is always
2181        the experiment's fedid.
2182        """
2183        if not isinstance(key, fedid):
2184            key = self.get_experiment_fedid(key)
2185
2186        access_ok, proof = self.auth.check_attribute(fid, key, with_proof=True)
2187
2188        if access_ok:
2189            return proof
2190        else:
2191            raise service_error(service_error.access, "Access Denied",
2192                proof)
2193
2194
2195    def get_handler(self, path, fid):
2196        """
2197        Perhaps surprisingly named, this function handles HTTP GET requests to
2198        this server (SOAP requests are POSTs).
2199        """
2200        self.log.info("Get handler %s %s" % (path, fid))
2201        # XXX: log proofs?
2202        if self.auth.check_attribute(fid, path):
2203            return ("%s/%s" % (self.repodir, path), "application/binary")
2204        else:
2205            return (None, None)
2206
2207   
2208    def get_info(self, req, fid):
2209        """
2210        Return all the stored info about this experiment
2211        """
2212        rv = None
2213
2214        req = req.get('InfoRequestBody', None)
2215        if not req:
2216            raise service_error(service_error.req,
2217                    "Bad request format (no InfoRequestBody)")
2218        exp = req.get('experiment', None)
2219        legacy = req.get('legacy', False)
2220        if exp:
2221            if exp.has_key('fedid'):
2222                key = exp['fedid']
2223                keytype = "fedid"
2224            elif exp.has_key('localname'):
2225                key = exp['localname']
2226                keytype = "localname"
2227            else:
2228                raise service_error(service_error.req, "Unknown lookup type")
2229        else:
2230            raise service_error(service_error.req, "No request?")
2231
2232        proof = self.check_experiment_access(fid, key)
2233
2234        self.state_lock.acquire()
2235        if self.state.has_key(key):
2236            rv = self.state[key].get_info()
2237            # Copy the topo if we need to generate legacy representations
2238            if legacy:
2239                top = self.state[key].top
2240                if top is not None: top = top.clone()
2241        self.state_lock.release()
2242
2243        # If the legacy visualization and topology representations are
2244        # requested, calculate them and add them to teh return.
2245        if legacy and rv is not None:
2246            if top is not None:
2247                vtopo = topdl.topology_to_vtopo(top)
2248                if vtopo is not None:
2249                    rv['vtopo'] = vtopo
2250                    try:
2251                        vis = self.genviz(vtopo)
2252                    except service_error, e:
2253                        self.log.debug('Problem generating visualization: %s' \
2254                                % e)
2255                        vis = None
2256                    if vis is not None:
2257                        rv['vis'] = vis
2258        if rv:
2259            rv['proof'] = proof.to_dict()
2260            return rv
2261        else:
2262            raise service_error(service_error.req, "No such experiment")
2263
2264    def get_multi_info(self, req, fid):
2265        """
2266        Return all the stored info that this fedid can access
2267        """
2268        rv = { 'info': [ ], 'proof': [ ] }
2269
2270        self.state_lock.acquire()
2271        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2272            try:
2273                proof = self.check_experiment_access(fid, key)
2274            except service_error, e:
2275                if e.code == service_error.access:
2276                    continue
2277                else:
2278                    self.state_lock.release()
2279                    raise e
2280
2281            if self.state.has_key(key):
2282                e = self.state[key].get_info()
2283                e['proof'] = proof.to_dict()
2284                rv['info'].append(e)
2285                rv['proof'].append(proof.to_dict())
2286        self.state_lock.release()
2287        return rv
2288
2289    def check_termination_status(self, fed_exp, force):
2290        """
2291        Confirm that the experiment is sin a valid state to stop (or force it)
2292        return the state - invalid states for deletion and force settings cause
2293        exceptions.
2294        """
2295        self.state_lock.acquire()
2296        status = fed_exp.status
2297
2298        if status:
2299            if status in ('starting', 'terminating'):
2300                if not force:
2301                    self.state_lock.release()
2302                    raise service_error(service_error.partial, 
2303                            'Experiment still being created or destroyed')
2304                else:
2305                    self.log.warning('Experiment in %s state ' % status + \
2306                            'being terminated by force.')
2307            self.state_lock.release()
2308            return status
2309        else:
2310            # No status??? trouble
2311            self.state_lock.release()
2312            raise service_error(service_error.internal,
2313                    "Experiment has no status!?")
2314
2315
2316    def get_termination_info(self, fed_exp):
2317        ids = []
2318        term_params = { }
2319        self.state_lock.acquire()
2320        ids = [ x for x in (fed_exp.localname, fed_exp.fedid) if x is not None ]
2321        expcert = fed_exp.identity
2322        repo = "%s" % fed_exp.fedid
2323
2324        # Collect the allocation/segment ids into a dict keyed by the fedid
2325        # of the allocation that contains a tuple of uri, aid
2326        for i, fed in enumerate(fed_exp.get_all_allocations()):
2327            uri = fed.uri
2328            aid = fed.allocID
2329            term_params[aid] = (uri, aid)
2330        # Change the experiment state
2331        fed_exp.status = 'terminating'
2332        if self.state_filename: self.write_state()
2333        self.state_lock.release()
2334
2335        return ids, term_params, expcert, repo
2336
2337
2338    def deallocate_resources(self, term_params, expcert, status, force, 
2339            dealloc_log):
2340        tmpdir = None
2341        # This try block makes sure the tempdir is cleared
2342        try:
2343            # If no expcert, try the deallocation as the experiment
2344            # controller instance.
2345            if expcert and self.auth_type != 'legacy': 
2346                try:
2347                    tmpdir = tempfile.mkdtemp(prefix="term-")
2348                except EnvironmentError:
2349                    raise service_error(service_error.internal, 
2350                            "Cannot create tmp dir")
2351                cert_file = self.make_temp_certfile(expcert, tmpdir)
2352                pw = None
2353            else: 
2354                cert_file = self.cert_file
2355                pw = self.cert_pwd
2356
2357            # Stop everyone.  NB, wait_for_all waits until a thread starts
2358            # and then completes, so we can't wait if nothing starts.  So,
2359            # no tbparams, no start.
2360            if len(term_params) > 0:
2361                tp = thread_pool(self.nthreads)
2362                for k, (uri, aid) in term_params.items():
2363                    # Create and start a thread to stop the segment
2364                    tp.wait_for_slot()
2365                    t  = pooled_thread(\
2366                            target=self.terminate_segment(log=dealloc_log,
2367                                testbed=uri,
2368                                cert_file=cert_file, 
2369                                cert_pwd=pw,
2370                                trusted_certs=self.trusted_certs,
2371                                caller=self.call_TerminateSegment),
2372                            args=(uri, aid), name=k,
2373                            pdata=tp, trace_file=self.trace_file)
2374                    t.start()
2375                # Wait for completions
2376                tp.wait_for_all_done()
2377
2378            # release the allocations (failed experiments have done this
2379            # already, and starting experiments may be in odd states, so we
2380            # ignore errors releasing those allocations
2381            try: 
2382                for k, (uri, aid)  in term_params.items():
2383                    self.release_access(None, aid, uri=uri,
2384                            cert_file=cert_file, cert_pwd=pw)
2385            except service_error, e:
2386                if status != 'failed' and not force:
2387                    raise e
2388
2389        # Clean up the tmpdir no matter what
2390        finally:
2391            if tmpdir: self.remove_dirs(tmpdir)
2392
2393    def terminate_experiment(self, req, fid):
2394        """
2395        Swap this experiment out on the federants and delete the shared
2396        information
2397        """
2398        tbparams = { }
2399        req = req.get('TerminateRequestBody', None)
2400        if not req:
2401            raise service_error(service_error.req,
2402                    "Bad request format (no TerminateRequestBody)")
2403
2404        key = self.get_experiment_key(req, 'experiment')
2405        proof = self.check_experiment_access(fid, key)
2406        exp = req.get('experiment', False)
2407        force = req.get('force', False)
2408
2409        dealloc_list = [ ]
2410
2411
2412        # Create a logger that logs to the dealloc_list as well as to the main
2413        # log file.
2414        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2415        h = logging.StreamHandler(self.list_log(dealloc_list))
2416        # XXX: there should be a global one of these rather than repeating the
2417        # code.
2418        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2419                    '%d %b %y %H:%M:%S'))
2420        dealloc_log.addHandler(h)
2421
2422        self.state_lock.acquire()
2423        fed_exp = self.state.get(key, None)
2424        self.state_lock.release()
2425        repo = None
2426
2427        if fed_exp:
2428            status = self.check_termination_status(fed_exp, force)
2429            ids, term_params, expcert, repo = self.get_termination_info(fed_exp)
2430            self.deallocate_resources(term_params, expcert, status, force, 
2431                    dealloc_log)
2432
2433            # Remove the terminated experiment
2434            self.state_lock.acquire()
2435            for id in ids:
2436                self.clear_experiment_authorization(id, need_state_lock=False)
2437                if id in self.state: del self.state[id]
2438
2439            if self.state_filename: self.write_state()
2440            self.state_lock.release()
2441
2442            # Delete any synch points associated with this experiment.  All
2443            # synch points begin with the fedid of the experiment.
2444            fedid_keys = set(["fedid:%s" % f for f in ids \
2445                    if isinstance(f, fedid)])
2446            for k in self.synch_store.all_keys():
2447                try:
2448                    if len(k) > 45 and k[0:46] in fedid_keys:
2449                        self.synch_store.del_value(k)
2450                except synch_store.BadDeletionError:
2451                    pass
2452            self.write_store()
2453
2454            # Remove software and other cached stuff from the filesystem.
2455            if repo:
2456                self.remove_dirs("%s/%s" % (self.repodir, repo))
2457       
2458            return { 
2459                    'experiment': exp , 
2460                    'deallocationLog': string.join(dealloc_list, ''),
2461                    'proof': [proof.to_dict()],
2462                    }
2463        else:
2464            raise service_error(service_error.req, "No saved state")
2465
2466
2467    def GetValue(self, req, fid):
2468        """
2469        Get a value from the synchronized store
2470        """
2471        req = req.get('GetValueRequestBody', None)
2472        if not req:
2473            raise service_error(service_error.req,
2474                    "Bad request format (no GetValueRequestBody)")
2475       
2476        name = req.get('name', None)
2477        wait = req.get('wait', False)
2478        rv = { 'name': name }
2479
2480        if not name:
2481            raise service_error(service_error.req, "No name?")
2482
2483        access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True)
2484
2485        if access_ok:
2486            self.log.debug("[GetValue] asking for %s " % name)
2487            try:
2488                v = self.synch_store.get_value(name, wait)
2489            except synch_store.RevokedKeyError:
2490                # No more synch on this key
2491                raise service_error(service_error.federant, 
2492                        "Synch key %s revoked" % name)
2493            if v is not None:
2494                rv['value'] = v
2495            rv['proof'] = proof.to_dict()
2496            self.log.debug("[GetValue] got %s from %s" % (v, name))
2497            return rv
2498        else:
2499            raise service_error(service_error.access, "Access Denied",
2500                    proof=proof)
2501       
2502
2503    def SetValue(self, req, fid):
2504        """
2505        Set a value in the synchronized store
2506        """
2507        req = req.get('SetValueRequestBody', None)
2508        if not req:
2509            raise service_error(service_error.req,
2510                    "Bad request format (no SetValueRequestBody)")
2511       
2512        name = req.get('name', None)
2513        v = req.get('value', '')
2514
2515        if not name:
2516            raise service_error(service_error.req, "No name?")
2517
2518        access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True)
2519
2520        if access_ok:
2521            try:
2522                self.synch_store.set_value(name, v)
2523                self.write_store()
2524                self.log.debug("[SetValue] set %s to %s" % (name, v))
2525            except synch_store.CollisionError:
2526                # Translate into a service_error
2527                raise service_error(service_error.req,
2528                        "Value already set: %s" %name)
2529            except synch_store.RevokedKeyError:
2530                # No more synch on this key
2531                raise service_error(service_error.federant, 
2532                        "Synch key %s revoked" % name)
2533                return { 'name': name, 'value': v, 'proof': proof.to_dict() }
2534        else:
2535            raise service_error(service_error.access, "Access Denied",
2536                    proof=proof)
Note: See TracBrowser for help on using the repository browser.