source: fedd/federation/experiment_control.py @ 558d829

compt_changes
Last change on this file since 558d829 was 4ffa6f8, checked in by Ted Faber <faber@…>, 12 years ago

Add support for nat_portal parameter. Remove old half-assed active
endpoints

  • Property mode set to 100644
File size: 94.4 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15import shutil
16import zipfile
17
18import os.path
19
20import traceback
21# For parsing visualization output and splitter output
22import xml.parsers.expat
23
24from threading import Lock, Thread, Condition
25from subprocess import call, Popen, PIPE
26from string import join
27
28from urlparse import urlparse
29from urllib2 import urlopen, URLError
30
31from util import *
32from deter import fedid, generate_fedid
33from remote_service import xmlrpc_handler, soap_handler, service_caller
34from service_error import service_error
35from synch_store import synch_store
36from experiment_partition import experiment_partition
37from experiment_control_legacy import experiment_control_legacy
38from authorizer import abac_authorizer
39from thread_pool import thread_pool, pooled_thread
40from experiment_info import experiment_info, allocation_info, federated_service
41from operation_status import operation_status
42
43from deter import topdl
44from deter import ip_allocator
45from deter import ip_addr
46import list_log
47
48
49class nullHandler(logging.Handler):
50    def emit(self, record): pass
51
52fl = logging.getLogger("fedd.experiment_control")
53fl.addHandler(nullHandler())
54
55class experiment_control_local(experiment_control_legacy):
56    """
57    Control of experiments that this system can directly access.
58
59    Includes experiment creation, termination and information dissemination.
60    Thred safe.
61    """
62
63    class ssh_cmd_timeout(RuntimeError): pass
64   
65    call_RequestAccess = service_caller('RequestAccess')
66    call_ReleaseAccess = service_caller('ReleaseAccess')
67    call_StartSegment = service_caller('StartSegment')
68    call_TerminateSegment = service_caller('TerminateSegment')
69    call_InfoSegment = service_caller('InfoSegment')
70    call_OperationSegment = service_caller('OperationSegment')
71    call_Ns2Topdl = service_caller('Ns2Topdl')
72
73    def __init__(self, config=None, auth=None):
74        """
75        Intialize the various attributes, most from the config object
76        """
77
78        def parse_tarfile_list(tf):
79            """
80            Parse a tarfile list from the configuration.  This is a set of
81            paths and tarfiles separated by spaces.
82            """
83            rv = [ ]
84            if tf is not None:
85                tl = tf.split()
86                while len(tl) > 1:
87                    p, t = tl[0:2]
88                    del tl[0:2]
89                    rv.append((p, t))
90            return rv
91
92        self.list_log = list_log.list_log
93
94        self.cert_file = config.get("experiment_control", "cert_file")
95        if self.cert_file:
96            self.cert_pwd = config.get("experiment_control", "cert_pwd")
97        else:
98            self.cert_file = config.get("globals", "cert_file")
99            self.cert_pwd = config.get("globals", "cert_pwd")
100
101        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
102                or config.get("globals", "trusted_certs")
103
104        self.repodir = config.get("experiment_control", "repodir")
105        self.repo_url = config.get("experiment_control", "repo_url", 
106                "https://users.isi.deterlab.net:23235");
107
108        self.exp_stem = "fed-stem"
109        self.log = logging.getLogger("fedd.experiment_control")
110        set_log_level(config, "experiment_control", self.log)
111        self.muxmax = 2
112        self.nthreads = 10
113        self.randomize_experiments = False
114
115        self.splitter = None
116        self.ssh_keygen = "/usr/bin/ssh-keygen"
117        self.ssh_identity_file = None
118
119
120        self.debug = config.getboolean("experiment_control", "create_debug")
121        self.cleanup = not config.getboolean("experiment_control", 
122                "leave_tmpfiles")
123        self.state_filename = config.get("experiment_control", 
124                "experiment_state")
125        self.store_filename = config.get("experiment_control", 
126                "synch_store")
127        self.store_url = config.get("experiment_control", "store_url")
128        self.splitter_url = config.get("experiment_control", "ns2topdl_uri")
129        self.grouper_url = config.get("experiment_control", "grouper_url")
130        self.fedkit = parse_tarfile_list(\
131                config.get("experiment_control", "fedkit"))
132        self.gatewaykit = parse_tarfile_list(\
133                config.get("experiment_control", "gatewaykit"))
134
135        dt = config.get("experiment_control", "direct_transit")
136        self.auth_type = config.get('experiment_control', 'auth_type') \
137                or 'legacy'
138        self.auth_dir = config.get('experiment_control', 'auth_dir')
139        # XXX: document this!
140        self.info_cache_limit = \
141                config.getint('experiment_control', 'info_cache', 600)
142        if dt: self.direct_transit = [ tb.strip() for tb in dt.split(",")]
143        else: self.direct_transit = [ ]
144        # NB for internal master/slave ops, not experiment setup
145        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
146
147        self.overrides = set([])
148        ovr = config.get('experiment_control', 'overrides')
149        if ovr:
150            for o in ovr.split(","):
151                o = o.strip()
152                if o.startswith('fedid:'): o = o[len('fedid:'):]
153                self.overrides.add(fedid(hexstr=o))
154
155        self.state = { }
156        self.state_lock = Lock()
157        self.tclsh = "/usr/local/bin/otclsh"
158        self.tcl_splitter = config.get("ns2topdl", "tcl_splitter") or \
159                config.get("experiment_control", "tcl_splitter",
160                        "/usr/testbed/lib/ns2ir/parse.tcl")
161        mapdb_file = config.get("experiment_control", "mapdb")
162        self.trace_file = sys.stderr
163
164        self.def_expstart = \
165                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
166                "/tmp/federate";
167        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
168                "FEDDIR/hosts";
169        self.def_gwstart = \
170                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
171                "/tmp/bridge.log";
172        self.def_mgwstart = \
173                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
174                "/tmp/bridge.log";
175        self.def_gwimage = "FBSD61-TUNNEL2";
176        self.def_gwtype = "pc";
177        self.local_access = { }
178
179        if self.auth_type == 'legacy':
180            if auth:
181                self.auth = auth
182            else:
183                self.log.error( "[access]: No authorizer initialized, " +\
184                        "creating local one.")
185                auth = authorizer()
186            self.get_access = self.legacy_get_access
187        elif self.auth_type == 'abac':
188            self.auth = abac_authorizer(load=self.auth_dir, 
189                    update=os.path.join(self.auth_dir,'update'))
190        else:
191            raise service_error(service_error.internal, 
192                    "Unknown auth_type: %s" % self.auth_type)
193
194        if mapdb_file:
195            self.read_mapdb(mapdb_file)
196        else:
197            self.log.warn("[experiment_control] No testbed map, using defaults")
198            self.tbmap = { 
199                    'deter':'https://users.isi.deterlab.net:23235',
200                    'emulab':'https://users.isi.deterlab.net:23236',
201                    'ucb':'https://users.isi.deterlab.net:23237',
202                    }
203
204        # Grab saved state.  OK to do this w/o locking because it's read only
205        # and only one thread should be in existence that can see self.state at
206        # this point.
207        if self.state_filename:
208            self.read_state()
209
210        if self.store_filename:
211            self.read_store()
212        else:
213            self.log.warning("No saved synch store")
214            self.synch_store = synch_store
215
216        # Dispatch tables
217        self.soap_services = {\
218                'New': soap_handler('New', self.new_experiment),
219                'Create': soap_handler('Create', self.create_experiment),
220                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
221                'Vis': soap_handler('Vis', self.get_vis),
222                'Info': soap_handler('Info', self.get_info),
223                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
224                'Operation': soap_handler('Operation', self.do_operation),
225                'Terminate': soap_handler('Terminate', 
226                    self.terminate_experiment),
227                'GetValue': soap_handler('GetValue', self.GetValue),
228                'SetValue': soap_handler('SetValue', self.SetValue),
229        }
230
231        self.xmlrpc_services = {\
232                'New': xmlrpc_handler('New', self.new_experiment),
233                'Create': xmlrpc_handler('Create', self.create_experiment),
234                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
235                'Vis': xmlrpc_handler('Vis', self.get_vis),
236                'Info': xmlrpc_handler('Info', self.get_info),
237                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
238                'Terminate': xmlrpc_handler('Terminate',
239                    self.terminate_experiment),
240                'Operation': xmlrpc_handler('Operation', self.do_operation),
241                'GetValue': xmlrpc_handler('GetValue', self.GetValue),
242                'SetValue': xmlrpc_handler('SetValue', self.SetValue),
243        }
244
245    # Call while holding self.state_lock
246    def write_state(self):
247        """
248        Write a new copy of experiment state after copying the existing state
249        to a backup.
250
251        State format is a simple pickling of the state dictionary.
252        """
253        if os.access(self.state_filename, os.W_OK):
254            copy_file(self.state_filename, \
255                    "%s.bak" % self.state_filename)
256        try:
257            f = open(self.state_filename, 'w')
258            pickle.dump(self.state, f)
259        except EnvironmentError, e:
260            self.log.error("Can't write file %s: %s" % \
261                    (self.state_filename, e))
262        except pickle.PicklingError, e:
263            self.log.error("Pickling problem: %s" % e)
264        except TypeError, e:
265            self.log.error("Pickling problem (TypeError): %s" % e)
266
267    @staticmethod
268    def get_alloc_ids(exp):
269        """
270        Used by read_store and read state.  This used to be worse.
271        """
272
273        return [ a.allocID for a in exp.get_all_allocations() ]
274
275
276    # Call while holding self.state_lock
277    def read_state(self):
278        """
279        Read a new copy of experiment state.  Old state is overwritten.
280
281        State format is a simple pickling of the state dictionary.
282        """
283       
284        try:
285            f = open(self.state_filename, "r")
286            self.state = pickle.load(f)
287            self.log.debug("[read_state]: Read state from %s" % \
288                    self.state_filename)
289        except EnvironmentError, e:
290            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
291                    % (self.state_filename, e))
292        except pickle.UnpicklingError, e:
293            self.log.warning(("[read_state]: No saved state: " + \
294                    "Unpickling failed: %s") % e)
295       
296        for s in self.state.values():
297            try:
298
299                eid = s.fedid
300                if eid : 
301                    if self.auth_type == 'legacy':
302                        # XXX: legacy
303                        # Give the owner rights to the experiment
304                        #self.auth.set_attribute(s['owner'], eid)
305                        # And holders of the eid as well
306                        self.auth.set_attribute(eid, eid)
307                        # allow overrides to control experiments as well
308                        for o in self.overrides:
309                            self.auth.set_attribute(o, eid)
310                        # Set permissions to allow reading of the software
311                        # repo, if any, as well.
312                        for a in self.get_alloc_ids(s):
313                            self.auth.set_attribute(a, 'repo/%s' % eid)
314                else:
315                    raise KeyError("No experiment id")
316            except KeyError, e:
317                self.log.warning("[read_state]: State ownership or identity " +\
318                        "misformatted in %s: %s" % (self.state_filename, e))
319
320    def read_mapdb(self, file):
321        """
322        Read a simple colon separated list of mappings for the
323        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
324        also adds testbeds to active if they include , active after
325        their name.
326        """
327
328        self.tbmap = { }
329        lineno =0
330        try:
331            f = open(file, "r")
332            for line in f:
333                lineno += 1
334                line = line.strip()
335                if line.startswith('#') or len(line) == 0:
336                    continue
337                try:
338                    label, url = line.split(':', 1)
339                    self.tbmap[label] = url
340                except ValueError, e:
341                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
342                            "map db: %s %s" % (lineno, line, e))
343        except EnvironmentError, e:
344            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
345                    "open %s: %s" % (file, e))
346        else:
347            f.close()
348
349    def read_store(self):
350        try:
351            self.synch_store = synch_store()
352            self.synch_store.load(self.store_filename)
353            self.log.debug("[read_store]: Read store from %s" % \
354                    self.store_filename)
355        except EnvironmentError, e:
356            self.log.warning("[read_store]: No saved store: Can't open %s: %s"\
357                    % (self.state_filename, e))
358            self.synch_store = synch_store()
359
360        # Set the initial permissions on data in the store.  XXX: This ad hoc
361        # authorization attribute initialization is getting out of hand.
362        # XXX: legacy
363        if self.auth_type == 'legacy':
364            for k in self.synch_store.all_keys():
365                try:
366                    if k.startswith('fedid:'):
367                        fid = fedid(hexstr=k[6:46])
368                        if self.state.has_key(fid):
369                            for a in self.get_alloc_ids(self.state[fid]):
370                                self.auth.set_attribute(a, k)
371                except ValueError, e:
372                    self.log.warn("Cannot deduce permissions for %s" % k)
373
374
375    def write_store(self):
376        """
377        Write a new copy of synch_store after writing current state
378        to a backup.  We use the internal synch_store pickle method to avoid
379        incinsistent data.
380
381        State format is a simple pickling of the store.
382        """
383        if os.access(self.store_filename, os.W_OK):
384            copy_file(self.store_filename, \
385                    "%s.bak" % self.store_filename)
386        try:
387            self.synch_store.save(self.store_filename)
388        except EnvironmentError, e:
389            self.log.error("Can't write file %s: %s" % \
390                    (self.store_filename, e))
391        except TypeError, e:
392            self.log.error("Pickling problem (TypeError): %s" % e)
393
394    # XXX this may belong somewhere else
395
396    def get_grouper_updates(self, fid):
397        if self.grouper_url is None: return
398        d = tempfile.mkdtemp()
399        try:
400            fstr = "%s" % fid
401            # XXX locking
402            zipname = os.path.join(d, 'grouper.zip')
403            dest = os.path.join(self.auth_dir, 'update')
404            resp = urlopen('%s?uid=%s' % (self.grouper_url, fstr))
405            f = open(zipname, 'w')
406            f.write(resp.read())
407            f.close()
408            zf = zipfile.ZipFile(zipname, 'r')
409            zf.extractall(dest)
410            zf.close()
411        except URLError, e:
412            self.log.error("Cannot connect to grouper: %s" % e)
413            pass
414        finally:
415            shutil.rmtree(d)
416
417
418    def remove_dirs(self, dir):
419        """
420        Remove the directory tree and all files rooted at dir.  Log any errors,
421        but continue.
422        """
423        self.log.debug("[removedirs]: removing %s" % dir)
424        try:
425            for path, dirs, files in os.walk(dir, topdown=False):
426                for f in files:
427                    os.remove(os.path.join(path, f))
428                for d in dirs:
429                    os.rmdir(os.path.join(path, d))
430            os.rmdir(dir)
431        except EnvironmentError, e:
432            self.log.error("Error deleting directory tree in %s" % e);
433
434    @staticmethod
435    def make_temp_certfile(expcert, tmpdir):
436        """
437        make a protected copy of the access certificate so the experiment
438        controller can act as the experiment principal.  mkstemp is the most
439        secure way to do that. The directory should be created by
440        mkdtemp.  Return the filename.
441        """
442        if expcert and tmpdir:
443            try:
444                certf, certfn = tempfile.mkstemp(suffix=".pem", dir=tmpdir)
445                f = os.fdopen(certf, 'w')
446                print >> f, expcert
447                f.close()
448            except EnvironmentError, e:
449                raise service_error(service_error.internal, 
450                        "Cannot create temp cert file?")
451            return certfn
452        else:
453            return None
454
455       
456    def generate_ssh_keys(self, dest, type="rsa" ):
457        """
458        Generate a set of keys for the gateways to use to talk.
459
460        Keys are of type type and are stored in the required dest file.
461        """
462        valid_types = ("rsa", "dsa")
463        t = type.lower();
464        if t not in valid_types: raise ValueError
465        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
466
467        try:
468            trace = open("/dev/null", "w")
469        except EnvironmentError:
470            raise service_error(service_error.internal,
471                    "Cannot open /dev/null??");
472
473        # May raise CalledProcessError
474        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
475        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
476        if rv != 0:
477            raise service_error(service_error.internal, 
478                    "Cannot generate nonce ssh keys.  %s return code %d" \
479                            % (self.ssh_keygen, rv))
480
481    def generate_seer_certs(self, destdir):
482        '''
483        Create a SEER ca cert and a node cert in destdir/ca.pem and
484        destdir/node.pem respectively.  These will be distributed throughout
485        the federated experiment.  This routine reports errors via
486        service_errors.
487        '''
488        openssl = '/usr/bin/openssl'
489        # All the filenames and parameters we need for openssl calls below
490        ca_key =os.path.join(destdir, 'ca.key') 
491        ca_pem = os.path.join(destdir, 'ca.pem')
492        node_key =os.path.join(destdir, 'node.key') 
493        node_pem = os.path.join(destdir, 'node.pem')
494        node_req = os.path.join(destdir, 'node.req')
495        node_signed = os.path.join(destdir, 'node.signed')
496        days = '%s' % (365 * 10)
497        serial = '%s' % random.randint(0, 1<<16)
498
499        try:
500            # Sequence of calls to create a CA key, create a ca cert, create a
501            # node key, node signing request, and finally a signed node
502            # certificate.
503            sequence = (
504                    (openssl, 'genrsa', '-out', ca_key, '1024'),
505                    (openssl, 'req', '-new', '-x509', '-key', ca_key, '-out', 
506                        ca_pem, '-days', days, '-subj', 
507                        '/C=US/ST=CA/O=DETER/OU=fedd/CN=CA' ),
508                    (openssl, 'genrsa', '-out', node_key, '1024'),
509                    (openssl, 'req', '-new', '-key', node_key, '-out', 
510                        node_req, '-days', days, '-subj', 
511                        '/C=US/ST=CA/O=DETER/OU=fedd/CN=node' ),
512                    (openssl, 'x509', '-CA', ca_pem, '-CAkey', ca_key, 
513                        '-set_serial', serial, '-req', '-in', node_req, 
514                        '-out', node_signed, '-days', days),
515                )
516            # Do all that stuff; bail if there's an error, and push all the
517            # output to dev/null.
518            for cmd in sequence:
519                trace = open("/dev/null", "w")
520                rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
521                if rv != 0:
522                    raise service_error(service_error.internal, 
523                            "Cannot generate SEER certs.  %s return code %d" \
524                                    % (' '.join(cmd), rv))
525            # Concatinate the node key and signed certificate into node.pem
526            f = open(node_pem, 'w')
527            for comp in (node_signed, node_key):
528                g = open(comp, 'r')
529                f.write(g.read())
530                g.close()
531            f.close()
532
533            # Throw out intermediaries.
534            for fn in (ca_key, node_key, node_req, node_signed):
535                os.unlink(fn)
536
537        except EnvironmentError, e:
538            # Any difficulties with the file system wind up here
539            raise service_error(service_error.internal,
540                    "File error on  %s while creating SEER certs: %s" % \
541                            (e.filename, e.strerror))
542
543
544
545    def gentopo(self, str):
546        """
547        Generate the topology data structure from the splitter's XML
548        representation of it.
549
550        The topology XML looks like:
551            <experiment>
552                <nodes>
553                    <node><vname></vname><ips>ip1:ip2</ips></node>
554                </nodes>
555                <lans>
556                    <lan>
557                        <vname></vname><vnode></vnode><ip></ip>
558                        <bandwidth></bandwidth><member>node:port</member>
559                    </lan>
560                </lans>
561        """
562        class topo_parse:
563            """
564            Parse the topology XML and create the dats structure.
565            """
566            def __init__(self):
567                # Typing of the subelements for data conversion
568                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
569                self.int_subelements = ( 'bandwidth',)
570                self.float_subelements = ( 'delay',)
571                # The final data structure
572                self.nodes = [ ]
573                self.lans =  [ ]
574                self.topo = { \
575                        'node': self.nodes,\
576                        'lan' : self.lans,\
577                    }
578                self.element = { }  # Current element being created
579                self.chars = ""     # Last text seen
580
581            def end_element(self, name):
582                # After each sub element the contents is added to the current
583                # element or to the appropriate list.
584                if name == 'node':
585                    self.nodes.append(self.element)
586                    self.element = { }
587                elif name == 'lan':
588                    self.lans.append(self.element)
589                    self.element = { }
590                elif name in self.str_subelements:
591                    self.element[name] = self.chars
592                    self.chars = ""
593                elif name in self.int_subelements:
594                    self.element[name] = int(self.chars)
595                    self.chars = ""
596                elif name in self.float_subelements:
597                    self.element[name] = float(self.chars)
598                    self.chars = ""
599
600            def found_chars(self, data):
601                self.chars += data.rstrip()
602
603
604        tp = topo_parse();
605        parser = xml.parsers.expat.ParserCreate()
606        parser.EndElementHandler = tp.end_element
607        parser.CharacterDataHandler = tp.found_chars
608
609        parser.Parse(str)
610
611        return tp.topo
612       
613
614    def genviz(self, topo):
615        """
616        Generate the visualization the virtual topology
617        """
618
619        neato = "/usr/local/bin/neato"
620        # These are used to parse neato output and to create the visualization
621        # file.
622        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="([\d\.]+),([\d\.]+)"')
623        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
624                "%s</type></node>"
625
626        try:
627            # Node names
628            nodes = [ n['vname'] for n in topo['node'] ]
629            topo_lans = topo['lan']
630        except KeyError, e:
631            raise service_error(service_error.internal, "Bad topology: %s" %e)
632
633        lans = { }
634        links = { }
635
636        # Walk through the virtual topology, organizing the connections into
637        # 2-node connections (links) and more-than-2-node connections (lans).
638        # When a lan is created, it's added to the list of nodes (there's a
639        # node in the visualization for the lan).
640        for l in topo_lans:
641            if links.has_key(l['vname']):
642                if len(links[l['vname']]) < 2:
643                    links[l['vname']].append(l['vnode'])
644                else:
645                    nodes.append(l['vname'])
646                    lans[l['vname']] = links[l['vname']]
647                    del links[l['vname']]
648                    lans[l['vname']].append(l['vnode'])
649            elif lans.has_key(l['vname']):
650                lans[l['vname']].append(l['vnode'])
651            else:
652                links[l['vname']] = [ l['vnode'] ]
653
654
655        # Open up a temporary file for dot to turn into a visualization
656        try:
657            df, dotname = tempfile.mkstemp()
658            dotfile = os.fdopen(df, 'w')
659        except EnvironmentError:
660            raise service_error(service_error.internal,
661                    "Failed to open file in genviz")
662
663        try:
664            dnull = open('/dev/null', 'w')
665        except EnvironmentError:
666            service_error(service_error.internal,
667                    "Failed to open /dev/null in genviz")
668
669        # Generate a dot/neato input file from the links, nodes and lans
670        try:
671            print >>dotfile, "graph G {"
672            for n in nodes:
673                print >>dotfile, '\t"%s"' % n
674            for l in links.keys():
675                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
676            for l in lans.keys():
677                for n in lans[l]:
678                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
679            print >>dotfile, "}"
680            dotfile.close()
681        except TypeError:
682            raise service_error(service_error.internal,
683                    "Single endpoint link in vtopo")
684        except EnvironmentError:
685            raise service_error(service_error.internal, "Cannot write dot file")
686
687        # Use dot to create a visualization
688        try:
689            dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005',
690                '-Gmaxiter=2000', '-Gpack=true', dotname], stdout=PIPE,
691                stderr=dnull, close_fds=True)
692        except EnvironmentError:
693            raise service_error(service_error.internal, 
694                    "Cannot generate visualization: is graphviz available?")
695        dnull.close()
696
697        # Translate dot to vis format
698        vis_nodes = [ ]
699        vis = { 'node': vis_nodes }
700        for line in dot.stdout:
701            m = vis_re.match(line)
702            if m:
703                vn = m.group(1)
704                vis_node = {'name': vn, \
705                        'x': float(m.group(2)),\
706                        'y' : float(m.group(3)),\
707                    }
708                if vn in links.keys() or vn in lans.keys():
709                    vis_node['type'] = 'lan'
710                else:
711                    vis_node['type'] = 'node'
712                vis_nodes.append(vis_node)
713        rv = dot.wait()
714
715        os.remove(dotname)
716        # XXX: graphviz seems to use low return codes for warnings, like
717        # "couldn't find font"
718        if rv < 2 : return vis
719        else: return None
720
721
722    def release_access(self, tb, aid, tbmap=None, uri=None, cert_file=None,
723            cert_pwd=None):
724        """
725        Release access to testbed through fedd
726        """
727
728        if not uri and tbmap:
729            uri = tbmap.get(tb, None)
730        if not uri:
731            raise service_error(service_error.server_config, 
732                    "Unknown testbed: %s" % tb)
733
734        if self.local_access.has_key(uri):
735            resp = self.local_access[uri].ReleaseAccess(\
736                    { 'ReleaseAccessRequestBody' : 
737                        {'allocID': {'fedid': aid}},}, 
738                    fedid(file=cert_file))
739            resp = { 'ReleaseAccessResponseBody': resp } 
740        else:
741            resp = self.call_ReleaseAccess(uri, {'allocID': {'fedid': aid} },
742                    cert_file, cert_pwd, self.trusted_certs)
743
744        # better error coding
745
746    def remote_ns2topdl(self, uri, desc):
747
748        req = {
749                'description' : { 'ns2description': desc },
750            }
751
752        r = self.call_Ns2Topdl(uri, req, self.cert_file, self.cert_pwd, 
753                self.trusted_certs)
754
755        if r.has_key('Ns2TopdlResponseBody'):
756            r = r['Ns2TopdlResponseBody']
757            ed = r.get('experimentdescription', None)
758            if ed.has_key('topdldescription'):
759                return topdl.Topology(**ed['topdldescription'])
760            else:
761                raise service_error(service_error.protocol, 
762                        "Bad splitter response (no output)")
763        else:
764            raise service_error(service_error.protocol, "Bad splitter response")
765
766    class start_segment:
767        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
768                cert_pwd=None, trusted_certs=None, caller=None,
769                log_collector=None):
770            self.log = log
771            self.debug = debug
772            self.cert_file = cert_file
773            self.cert_pwd = cert_pwd
774            self.trusted_certs = None
775            self.caller = caller
776            self.testbed = testbed
777            self.log_collector = log_collector
778            self.response = None
779            self.node = { }
780            self.subs = { }
781            self.tb = { }
782            self.proof = None
783
784        def make_map(self, resp):
785            if 'segmentdescription' not in resp  or \
786                    'topdldescription' not in resp['segmentdescription']:
787                self.log.warn('No topology returned from startsegment')
788                return 
789
790            top = topdl.Topology(
791                    **resp['segmentdescription']['topdldescription'])
792
793            for e in top.elements:
794                if isinstance(e, topdl.Computer):
795                    self.node[e.name] = e
796                elif isinstance(e, topdl.Testbed):
797                    self.tb[e.uri] = e
798            for s in top.substrates:
799                self.subs[s.name] = s
800
801        def __call__(self, uri, aid, topo, masters, attrs=None, connInfo=None):
802            req = {
803                    'allocID': { 'fedid' : aid }, 
804                    'segmentdescription': { 
805                        'topdldescription': topo.to_dict(),
806                    },
807                }
808
809            if connInfo:
810                req['connection'] = connInfo
811
812            import_svcs = [ s for m in masters.values() \
813                    for s in m if self.testbed in s.importers]
814
815            if import_svcs or self.testbed in masters:
816                req['service'] = []
817
818            for s in import_svcs:
819                for r in s.reqs:
820                    sr = copy.deepcopy(r)
821                    sr['visibility'] = 'import';
822                    req['service'].append(sr)
823
824            for s in masters.get(self.testbed, []):
825                for r in s.reqs:
826                    sr = copy.deepcopy(r)
827                    sr['visibility'] = 'export';
828                    req['service'].append(sr)
829
830            if attrs:
831                req['fedAttr'] = attrs
832
833            try:
834                self.log.debug("Calling StartSegment at %s " % uri)
835                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
836                        self.trusted_certs)
837                if r.has_key('StartSegmentResponseBody'):
838                    lval = r['StartSegmentResponseBody'].get('allocationLog',
839                            None)
840                    if lval and self.log_collector:
841                        for line in  lval.splitlines(True):
842                            self.log_collector.write(line)
843                    self.make_map(r['StartSegmentResponseBody'])
844                    if 'proof' in r: self.proof = r['proof']
845                    self.response = r
846                else:
847                    raise service_error(service_error.internal, 
848                            "Bad response!?: %s" %r)
849                return True
850            except service_error, e:
851                self.log.error("Start segment failed on %s: %s" % \
852                        (self.testbed, e))
853                return False
854
855
856
857    class terminate_segment:
858        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
859                cert_pwd=None, trusted_certs=None, caller=None):
860            self.log = log
861            self.debug = debug
862            self.cert_file = cert_file
863            self.cert_pwd = cert_pwd
864            self.trusted_certs = None
865            self.caller = caller
866            self.testbed = testbed
867
868        def __call__(self, uri, aid ):
869            req = {
870                    'allocID': {'fedid': aid }, 
871                }
872            self.log.info("Calling terminate segment")
873            try:
874                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
875                        self.trusted_certs)
876                self.log.info("Terminate segment succeeded")
877                return True
878            except service_error, e:
879                self.log.error("Terminate segment failed on %s: %s" % \
880                        (self.testbed, e))
881                return False
882
883    class info_segment(start_segment):
884        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
885                cert_pwd=None, trusted_certs=None, caller=None,
886                log_collector=None):
887            experiment_control_local.start_segment.__init__(self, debug, 
888                    log, testbed, cert_file, cert_pwd, trusted_certs, 
889                    caller, log_collector)
890
891        def __call__(self, uri, aid):
892            req = { 'allocID': { 'fedid' : aid } }
893
894            try:
895                self.log.debug("Calling InfoSegment at %s " % uri)
896                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
897                        self.trusted_certs)
898                if r.has_key('InfoSegmentResponseBody'):
899                    self.make_map(r['InfoSegmentResponseBody'])
900                    if 'proof' in r: self.proof = r['proof']
901                    self.response = r
902                else:
903                    raise service_error(service_error.internal, 
904                            "Bad response!?: %s" %r)
905                return True
906            except service_error, e:
907                self.log.error("Info segment failed on %s: %s" % \
908                        (self.testbed, e))
909                return False
910
911    class operation_segment:
912        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
913                cert_pwd=None, trusted_certs=None, caller=None,
914                log_collector=None):
915            self.log = log
916            self.debug = debug
917            self.cert_file = cert_file
918            self.cert_pwd = cert_pwd
919            self.trusted_certs = None
920            self.caller = caller
921            self.testbed = testbed
922            self.status = None
923
924        def __call__(self, uri, aid, op, targets, params):
925            req = { 
926                    'allocID': { 'fedid' : aid },
927                    'operation': op,
928                    'target': targets,
929                    }
930            if params: req['parameter'] = params
931
932
933            try:
934                self.log.debug("Calling OperationSegment at %s " % uri)
935                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
936                        self.trusted_certs)
937                if 'OperationSegmentResponseBody' in r:
938                    r = r['OperationSegmentResponseBody']
939                    if 'status' in r:
940                        self.status = r['status']
941                else:
942                    raise service_error(service_error.internal, 
943                            "Bad response!?: %s" %r)
944                return True
945            except service_error, e:
946                self.log.error("Operation segment failed on %s: %s" % \
947                        (self.testbed, e))
948                return False
949
950    def annotate_topology(self, top, data):
951        # These routines do various parts of the annotation
952        def add_new_names(nl, l):
953            """ add any names in nl to the list in l """
954            for n in nl:
955                if n not in l: l.append(n)
956       
957        def merge_services(ne, e):
958            for ns in ne.service:
959                # NB: the else is on the for
960                for s in e.service:
961                    if ns.name == s.name:
962                        s.importer = ns.importer
963                        s.param = ns.param
964                        s.description = ns.description
965                        s.status = ns.status
966                        break
967                else:
968                    e.service.append(ns)
969       
970        def merge_oses(ne, e):
971            """
972            Merge the operating system entries of ne into e
973            """
974            for nos in ne.os:
975                # NB: the else is on the for
976                for os in e.os:
977                    if nos.name == os.name:
978                        os.version = nos.version
979                        os.version = nos.distribution
980                        os.version = nos.distributionversion
981                        for a in nos.attribute:
982                            if os.get_attribute(a.attribute):
983                                os.remove_attribute(a.attribute)
984                            os.set_attribute(a.attribute, a.value)
985                        break
986                else:
987                    # If both nodes have one OS, this is a replacement
988                    if len(ne.os) == 1 and len(e.os) == 1: e.os = ne.os
989                    else: e.os.append(nos)
990
991        # Annotate the topology with embedding info
992        for e in top.elements:
993            if isinstance(e, topdl.Computer):
994                for s in data:
995                    ne = s.node.get(e.name, None)
996                    if ne is not None:
997                        add_new_names(ne.localname, e.localname)
998                        e.status = ne.status
999                        merge_services(ne, e)
1000                        add_new_names(ne.operation, e.operation)
1001                        if ne.os: merge_oses(ne, e)
1002                        break
1003            elif isinstance(e,topdl.Testbed):
1004                for s in data:
1005                    ne = s.tb.get(e.uri, None)
1006                    if ne is not None:
1007                        add_new_names(ne.localname, e.localname)
1008                        add_new_names(ne.operation, e.operation)
1009                        merge_services(ne, e)
1010                        for a in ne.attribute:
1011                            e.set_attribute(a.attribute, a.value)
1012        # Annotate substrates
1013        for s in top.substrates:
1014            for d in data:
1015                ss = d.subs.get(s.name, None)
1016                if ss is not None:
1017                    if ss.capacity is not None:
1018                        s.capacity = ss.capacity
1019                    if s.latency is not None:
1020                        s.latency = ss.latency
1021
1022
1023
1024    def allocate_resources(self, allocated, masters, eid, expid, 
1025            tbparams, top, topo, tmpdir, alloc_log=None, log_collector=None, 
1026            attrs=None, connInfo={}, tbmap=None, expcert=None):
1027
1028        started = { }           # Testbeds where a sub-experiment started
1029                                # successfully
1030
1031        # XXX
1032        fail_soft = False
1033
1034        if tbmap is None: tbmap = { }
1035
1036        log = alloc_log or self.log
1037
1038        tp = thread_pool(self.nthreads)
1039        threads = [ ]
1040        starters = [ ]
1041
1042        if expcert:
1043            cert = expcert
1044            pw = None
1045        else:
1046            cert = self.cert_file
1047            pw = self.cert_pwd
1048
1049        for tb in allocated.keys():
1050            # Create and start a thread to start the segment, and save it
1051            # to get the return value later
1052            tb_attrs = copy.copy(attrs)
1053            tp.wait_for_slot()
1054            uri = tbparams[tb].uri or tbmap.get(testbed_base(tb), None)
1055            base, suffix = split_testbed(tb)
1056            if suffix:
1057                tb_attrs.append({'attribute': 'experiment_name', 
1058                    'value': "%s-%s" % (eid, suffix)})
1059            else:
1060                tb_attrs.append({'attribute': 'experiment_name', 'value': eid})
1061            if not uri:
1062                raise service_error(service_error.internal, 
1063                        "Unknown testbed %s !?" % tb)
1064
1065            aid = tbparams[tb].allocID
1066            if not aid:
1067                raise service_error(service_error.internal, 
1068                        "No alloc id for testbed %s !?" % tb)
1069
1070            s = self.start_segment(log=log, debug=self.debug,
1071                    testbed=tb, cert_file=cert,
1072                    cert_pwd=pw, trusted_certs=self.trusted_certs,
1073                    caller=self.call_StartSegment,
1074                    log_collector=log_collector)
1075            starters.append(s)
1076            t  = pooled_thread(\
1077                    target=s, name=tb,
1078                    args=(uri, aid, topo[tb], masters, tb_attrs, connInfo[tb]),
1079                    pdata=tp, trace_file=self.trace_file)
1080            threads.append(t)
1081            t.start()
1082
1083        # Wait until all finish (keep pinging the log, though)
1084        mins = 0
1085        revoked = False
1086        while not tp.wait_for_all_done(60.0):
1087            mins += 1
1088            alloc_log.info("Waiting for sub threads (it has been %d mins)" \
1089                    % mins)
1090            if not revoked and \
1091                    len([ t.getName() for t in threads if t.rv == False]) > 0:
1092                # a testbed has failed.  Revoke this experiment's
1093                # synchronizarion values so that sub experiments will not
1094                # deadlock waiting for synchronization that will never happen
1095                self.log.info("A subexperiment has failed to swap in, " + \
1096                        "revoking synch keys")
1097                var_key = "fedid:%s" % expid
1098                for k in self.synch_store.all_keys():
1099                    if len(k) > 45 and k[0:46] == var_key:
1100                        self.synch_store.revoke_key(k)
1101                revoked = True
1102
1103        failed = [ t.getName() for t in threads if not t.rv ]
1104        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1105
1106        # If one failed clean up, unless fail_soft is set
1107        if failed:
1108            if not fail_soft:
1109                tp.clear()
1110                for tb in succeeded:
1111                    # Create and start a thread to stop the segment
1112                    tp.wait_for_slot()
1113                    uri = tbparams[tb].uri
1114                    t  = pooled_thread(\
1115                            target=self.terminate_segment(log=log,
1116                                testbed=tb,
1117                                cert_file=cert, 
1118                                cert_pwd=pw,
1119                                trusted_certs=self.trusted_certs,
1120                                caller=self.call_TerminateSegment),
1121                            args=(uri, tbparams[tb].allocID),
1122                            name=tb,
1123                            pdata=tp, trace_file=self.trace_file)
1124                    t.start()
1125                # Wait until all finish (if any are being stopped)
1126                if succeeded:
1127                    tp.wait_for_all_done()
1128
1129                # release the allocations
1130                for tb in tbparams.keys():
1131                    try:
1132                        self.release_access(tb, tbparams[tb].allocID, 
1133                                tbmap=tbmap, uri=tbparams[tb].uri,
1134                                cert_file=cert, cert_pwd=pw)
1135                    except service_error, e:
1136                        self.log.warn("Error releasing access: %s" % e.desc)
1137                # Remove the placeholder
1138                self.state_lock.acquire()
1139                self.state[eid].status = 'failed'
1140                self.state[eid].updated()
1141                if self.state_filename: self.write_state()
1142                self.state_lock.release()
1143                # Remove the repo dir
1144                self.remove_dirs("%s/%s" %(self.repodir, expid))
1145                # Walk up tmpdir, deleting as we go
1146                if self.cleanup:
1147                    self.remove_dirs(tmpdir)
1148                else:
1149                    log.debug("[start_experiment]: not removing %s" % tmpdir)
1150
1151
1152                log.error("Swap in failed on %s" % ",".join(failed))
1153                return
1154        else:
1155            # Walk through the successes and gather the proofs
1156            proofs = { }
1157            for s in starters:
1158                if s.proof:
1159                    proofs[s.testbed] = s.proof
1160            self.annotate_topology(top, starters)
1161            log.info("[start_segment]: Experiment %s active" % eid)
1162
1163
1164        # Walk up tmpdir, deleting as we go
1165        if self.cleanup:
1166            self.remove_dirs(tmpdir)
1167        else:
1168            log.debug("[start_experiment]: not removing %s" % tmpdir)
1169
1170        # Insert the experiment into our state and update the disk copy.
1171        self.state_lock.acquire()
1172        self.state[expid].status = 'active'
1173        self.state[eid] = self.state[expid]
1174        self.state[eid].top = top
1175        self.state[eid].updated()
1176        # Append startup proofs
1177        for f in self.state[eid].get_all_allocations():
1178            if f.tb in proofs:
1179                f.proof.append(proofs[f.tb])
1180
1181        if self.state_filename: self.write_state()
1182        self.state_lock.release()
1183        return
1184
1185
1186    def add_kit(self, e, kit):
1187        """
1188        Add a Software object created from the list of (install, location)
1189        tuples passed as kit  to the software attribute of an object e.  We
1190        do this enough to break out the code, but it's kind of a hack to
1191        avoid changing the old tuple rep.
1192        """
1193
1194        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1195
1196        if isinstance(e.software, list): e.software.extend(s)
1197        else: e.software = s
1198
1199    def append_experiment_authorization(self, expid, attrs, 
1200            need_state_lock=True):
1201        """
1202        Append the authorization information to system state
1203        """
1204
1205        for p, a in attrs:
1206            self.auth.set_attribute(p, a)
1207        self.auth.save()
1208
1209        if need_state_lock: self.state_lock.acquire()
1210        # XXX: really a no op?
1211        #self.state[expid]['auth'].update(attrs)
1212        if self.state_filename: self.write_state()
1213        if need_state_lock: self.state_lock.release()
1214
1215    def clear_experiment_authorization(self, expid, need_state_lock=True):
1216        """
1217        Attrs is a set of attribute principal pairs that need to be removed
1218        from the authenticator.  Remove them and save the authenticator.
1219        """
1220
1221        if need_state_lock: self.state_lock.acquire()
1222        # XXX: should be a no-op
1223        #if expid in self.state and 'auth' in self.state[expid]:
1224            #for p, a in self.state[expid]['auth']:
1225                #self.auth.unset_attribute(p, a)
1226            #self.state[expid]['auth'] = set()
1227        if self.state_filename: self.write_state()
1228        if need_state_lock: self.state_lock.release()
1229        self.auth.save()
1230
1231
1232    def create_experiment_state(self, fid, req, expid, expcert,
1233            state='starting'):
1234        """
1235        Create the initial entry in the experiment's state.  The expid and
1236        expcert are the experiment's fedid and certifacte that represents that
1237        ID, which are installed in the experiment state.  If the request
1238        includes a suggested local name that is used if possible.  If the local
1239        name is already taken by an experiment owned by this user that has
1240        failed, it is overwritten.  Otherwise new letters are added until a
1241        valid localname is found.  The generated local name is returned.
1242        """
1243
1244        if req.has_key('experimentID') and \
1245                req['experimentID'].has_key('localname'):
1246            overwrite = False
1247            eid = req['experimentID']['localname']
1248            # If there's an old failed experiment here with the same local name
1249            # and accessible by this user, we'll overwrite it, otherwise we'll
1250            # fall through and do the collision avoidance.
1251            old_expid = self.get_experiment_fedid(eid)
1252            if old_expid:
1253                users_experiment = True
1254                try:
1255                    self.check_experiment_access(fid, old_expid)
1256                except service_error, e:
1257                    if e.code == service_error.access: users_experiment = False
1258                    else: raise e
1259                if users_experiment:
1260                    self.state_lock.acquire()
1261                    status = self.state[eid].status
1262                    if status and status == 'failed':
1263                        # remove the old access attributes
1264                        self.clear_experiment_authorization(eid,
1265                                need_state_lock=False)
1266                        overwrite = True
1267                        del self.state[eid]
1268                        del self.state[old_expid]
1269                    self.state_lock.release()
1270                else:
1271                    self.log.info('Experiment %s exists, ' % eid + \
1272                            'but this user cannot access it')
1273            self.state_lock.acquire()
1274            while (self.state.has_key(eid) and not overwrite):
1275                eid += random.choice(string.ascii_letters)
1276            # Initial state
1277            self.state[eid] = experiment_info(fedid=expid, localname=eid, 
1278                    identity=expcert)
1279            self.state[expid] = self.state[eid]
1280            if self.state_filename: self.write_state()
1281            self.state_lock.release()
1282        else:
1283            eid = self.exp_stem
1284            for i in range(0,5):
1285                eid += random.choice(string.ascii_letters)
1286            self.state_lock.acquire()
1287            while (self.state.has_key(eid)):
1288                eid = self.exp_stem
1289                for i in range(0,5):
1290                    eid += random.choice(string.ascii_letters)
1291            # Initial state
1292            self.state[eid] = experiment_info(fedid=expid, localname=eid, 
1293                    identity=expcert)
1294            self.state[expid] = self.state[eid]
1295            if self.state_filename: self.write_state()
1296            self.state_lock.release()
1297
1298        # Let users touch the state.  Authorize this fid and the expid itself
1299        # to touch the experiment, as well as allowing th eoverrides.
1300        self.append_experiment_authorization(eid, 
1301                set([(fid, expid), (expid,expid)] + \
1302                        [ (o, expid) for o in self.overrides]))
1303
1304        return eid
1305
1306
1307    def allocate_ips_to_topo(self, top):
1308        """
1309        Add an ip4_address attribute to all the hosts in the topology, based on
1310        the shared substrates on which they sit.  An /etc/hosts file is also
1311        created and returned as a list of hostfiles entries.  We also return
1312        the allocator, because we may need to allocate IPs to portals
1313        (specifically DRAGON portals).
1314        """
1315        subs = sorted(top.substrates, 
1316                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1317                reverse=True)
1318        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1319        ifs = { }
1320        hosts = [ ]
1321
1322        for idx, s in enumerate(subs):
1323            net_size = len(s.interfaces)+2
1324
1325            a = ips.allocate(net_size)
1326            if a :
1327                base, num = a
1328                if num < net_size: 
1329                    raise service_error(service_error.internal,
1330                            "Allocator returned wrong number of IPs??")
1331            else:
1332                raise service_error(service_error.req, 
1333                        "Cannot allocate IP addresses")
1334            mask = ips.min_alloc
1335            while mask < net_size:
1336                mask *= 2
1337
1338            netmask = ((2**32-1) ^ (mask-1))
1339
1340            base += 1
1341            for i in s.interfaces:
1342                i.attribute.append(
1343                        topdl.Attribute('ip4_address', 
1344                            "%s" % ip_addr(base)))
1345                i.attribute.append(
1346                        topdl.Attribute('ip4_netmask', 
1347                            "%s" % ip_addr(int(netmask))))
1348
1349                hname = i.element.name
1350                if ifs.has_key(hname):
1351                    hosts.append("%s\t%s-%s %s-%d" % \
1352                            (ip_addr(base), hname, s.name, hname,
1353                                ifs[hname]))
1354                else:
1355                    ifs[hname] = 0
1356                    hosts.append("%s\t%s-%s %s-%d %s" % \
1357                            (ip_addr(base), hname, s.name, hname,
1358                                ifs[hname], hname))
1359
1360                ifs[hname] += 1
1361                base += 1
1362        return hosts, ips
1363
1364    def get_access_to_testbeds(self, testbeds, fid, allocated, 
1365            tbparam, masters, tbmap, expid=None, expcert=None):
1366        for tb in testbeds:
1367            self.get_access(tb, tbparam, fid, masters, tbmap, expid,
1368                    expcert)
1369            allocated[tb] = 1
1370
1371    def get_access(self, tb, tbparam,fid, masters, tbmap, expid=None, 
1372            expcert=None):
1373        """
1374        Get access to testbed through fedd and set the parameters for that tb
1375        """
1376        def get_export_project(svcs):
1377            """
1378            Look through for the list of federated_service for this testbed
1379            objects for a project_export service, and extract the project
1380            parameter.
1381            """
1382
1383            pe = [s for s in svcs if s.name=='project_export']
1384            if len(pe) == 1:
1385                return pe[0].params.get('project', None)
1386            elif len(pe) == 0:
1387                return None
1388            else:
1389                raise service_error(service_error.req,
1390                        "More than one project export is not supported")
1391
1392        def add_services(svcs, type, slist, keys):
1393            """
1394            Add the given services to slist.  type is import or export.  Also
1395            add a mapping entry from the assigned id to the original service
1396            record.
1397            """
1398            for i, s in enumerate(svcs):
1399                idx = '%s%d' % (type, i)
1400                keys[idx] = s
1401                sr = {'id': idx, 'name': s.name, 'visibility': type }
1402                if s.params:
1403                    sr['fedAttr'] = [ { 'attribute': k, 'value': v } \
1404                            for k, v in s.params.items()]
1405                slist.append(sr)
1406
1407        uri = tbmap.get(testbed_base(tb), None)
1408        if not uri:
1409            raise service_error(service_error.server_config, 
1410                    "Unknown testbed: %s" % tb)
1411
1412        export_svcs = masters.get(tb,[])
1413        import_svcs = [ s for m in masters.values() \
1414                for s in m \
1415                    if tb in s.importers ]
1416
1417        export_project = get_export_project(export_svcs)
1418        # Compose the credential list so that IDs come before attributes
1419        creds = set()
1420        keys = set()
1421        certs = self.auth.get_creds_for_principal(fid)
1422        # Append credenials about this experiment controller - e.g. that it is
1423        # trusted.
1424        certs.update(self.auth.get_creds_for_principal(
1425            fedid(file=self.cert_file)))
1426        if expid:
1427            certs.update(self.auth.get_creds_for_principal(expid))
1428        for c in certs:
1429            keys.add(c.issuer_cert())
1430            creds.add(c.attribute_cert())
1431        creds = list(keys) + list(creds)
1432
1433        if expcert: cert, pw = expcert, None
1434        else: cert, pw = self.cert_file, self.cert_pw
1435
1436        # Request credentials
1437        req = {
1438                'abac_credential': creds,
1439            }
1440        # Make the service request from the services we're importing and
1441        # exporting.  Keep track of the export request ids so we can
1442        # collect the resulting info from the access response.
1443        e_keys = { }
1444        if import_svcs or export_svcs:
1445            slist = []
1446            add_services(import_svcs, 'import', slist, e_keys)
1447            add_services(export_svcs, 'export', slist, e_keys)
1448            req['service'] = slist
1449
1450        if self.local_access.has_key(uri):
1451            # Local access call
1452            req = { 'RequestAccessRequestBody' : req }
1453            r = self.local_access[uri].RequestAccess(req, 
1454                    fedid(file=self.cert_file))
1455            r = { 'RequestAccessResponseBody' : r }
1456        else:
1457            r = self.call_RequestAccess(uri, req, cert, pw, self.trusted_certs)
1458
1459        if r.has_key('RequestAccessResponseBody'):
1460            # Through to here we have a valid response, not a fault.
1461            # Access denied is a fault, so something better or worse than
1462            # access denied has happened.
1463            r = r['RequestAccessResponseBody']
1464            self.log.debug("[get_access] Access granted")
1465        else:
1466            raise service_error(service_error.protocol,
1467                        "Bad proxy response")
1468        if 'proof' not in r:
1469            raise service_error(service_error.protocol,
1470                        "Bad access response (no access proof)")
1471
1472        tbparam[tb] = allocation_info(allocID=r['allocID'].get('fedid', None),
1473                tb=tb, uri=uri, proof=[r['proof']], 
1474                services=masters.get(tb, None))
1475
1476        # Collect the responses corresponding to the services this testbed
1477        # exports.  These will be the service requests that we will include in
1478        # the start segment requests (with appropriate visibility values) to
1479        # import and export the segments.
1480        for s in r.get('service', []):
1481            id = s.get('id', None)
1482            # Note that this attaches the response to the object in the masters
1483            # data structure.  (The e_keys index disappears when this fcn
1484            # returns)
1485            if id and id in e_keys:
1486                e_keys[id].reqs.append(s)
1487
1488        # Add attributes to parameter space.  We don't allow attributes to
1489        # overlay any parameters already installed.
1490        for a in r.get('fedAttr', []):
1491            try:
1492                if a['attribute']:
1493                    tbparam[tb].set_attribute(a['attribute'], a['value'])
1494            except KeyError:
1495                self.log.error("Bad attribute in response: %s" % a)
1496
1497
1498    def split_topology(self, top, topo, testbeds):
1499        """
1500        Create the sub-topologies that are needed for experiment instantiation.
1501        """
1502        for tb in testbeds:
1503            topo[tb] = top.clone()
1504            # copy in for loop allows deletions from the original
1505            for e in [ e for e in topo[tb].elements]:
1506                etb = e.get_attribute('testbed')
1507                # NB: elements without a testbed attribute won't appear in any
1508                # sub topologies. 
1509                if not etb or etb != tb:
1510                    for i in e.interface:
1511                        for s in i.subs:
1512                            try:
1513                                s.interfaces.remove(i)
1514                            except ValueError:
1515                                raise service_error(service_error.internal,
1516                                        "Can't remove interface??")
1517                    topo[tb].elements.remove(e)
1518            topo[tb].make_indices()
1519
1520    def confirm_software(self, top):
1521        """
1522        Make sure that the software to be loaded in the topo is all available
1523        before we begin making access requests, etc.  This is a subset of
1524        wrangle_software.
1525        """
1526        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1527        pkgs.update([x.location for e in top.elements for x in e.software])
1528
1529        for pkg in pkgs:
1530            loc = pkg
1531
1532            scheme, host, path = urlparse(loc)[0:3]
1533            dest = os.path.basename(path)
1534            if not scheme:
1535                if not loc.startswith('/'):
1536                    loc = "/%s" % loc
1537                loc = "file://%s" %loc
1538            # NB: if scheme was found, loc == pkg
1539            try:
1540                u = urlopen(loc)
1541                u.close()
1542            except Exception, e:
1543                raise service_error(service_error.req, 
1544                        "Cannot open %s: %s" % (loc, e))
1545        return True
1546
1547    def wrangle_software(self, expid, top, topo, tbparams):
1548        """
1549        Copy software out to the repository directory, allocate permissions and
1550        rewrite the segment topologies to look for the software in local
1551        places.
1552        """
1553
1554        # Copy the rpms and tarfiles to a distribution directory from
1555        # which the federants can retrieve them
1556        linkpath = "%s/software" %  expid
1557        softdir ="%s/%s" % ( self.repodir, linkpath)
1558        softmap = { }
1559
1560        # self.fedkit and self.gateway kit are lists of tuples of
1561        # (install_location, download_location) this extracts the download
1562        # locations.
1563        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1564        pkgs.update([x.location for e in top.elements for x in e.software])
1565        try:
1566            os.makedirs(softdir)
1567        except EnvironmentError, e:
1568            raise service_error(
1569                    "Cannot create software directory: %s" % e)
1570        # The actual copying.  Everything's converted into a url for copying.
1571        auth_attrs = set()
1572        for pkg in pkgs:
1573            loc = pkg
1574
1575            scheme, host, path = urlparse(loc)[0:3]
1576            dest = os.path.basename(path)
1577            if not scheme:
1578                if not loc.startswith('/'):
1579                    loc = "/%s" % loc
1580                loc = "file://%s" %loc
1581            # NB: if scheme was found, loc == pkg
1582            try:
1583                u = urlopen(loc)
1584            except Exception, e:
1585                raise service_error(service_error.req, 
1586                        "Cannot open %s: %s" % (loc, e))
1587            try:
1588                f = open("%s/%s" % (softdir, dest) , "w")
1589                self.log.debug("Writing %s/%s" % (softdir,dest) )
1590                data = u.read(4096)
1591                while data:
1592                    f.write(data)
1593                    data = u.read(4096)
1594                f.close()
1595                u.close()
1596            except Exception, e:
1597                raise service_error(service_error.internal,
1598                        "Could not copy %s: %s" % (loc, e))
1599            path = re.sub("/tmp", "", linkpath)
1600            # XXX
1601            softmap[pkg] = \
1602                    "%s/%s/%s" %\
1603                    ( self.repo_url, path, dest)
1604
1605            # Allow the individual segments to access the software by assigning
1606            # an attribute to each testbed allocation that encodes the data to
1607            # be released.  This expression collects the data for each run of
1608            # the loop.
1609            auth_attrs.update([
1610                (tbparams[tb].allocID, "/%s/%s" % ( path, dest)) \
1611                        for tb in tbparams.keys()])
1612
1613        self.append_experiment_authorization(expid, auth_attrs)
1614
1615        # Convert the software locations in the segments into the local
1616        # copies on this host
1617        for soft in [ s for tb in topo.values() \
1618                for e in tb.elements \
1619                    if getattr(e, 'software', False) \
1620                        for s in e.software ]:
1621            if softmap.has_key(soft.location):
1622                soft.location = softmap[soft.location]
1623
1624
1625    def new_experiment(self, req, fid):
1626        """
1627        The external interface to empty initial experiment creation called from
1628        the dispatcher.
1629
1630        Creates a working directory, splits the incoming description using the
1631        splitter script and parses out the avrious subsections using the
1632        lcasses above.  Once each sub-experiment is created, use pooled threads
1633        to instantiate them and start it all up.
1634        """
1635        self.log.info("New experiment call started for %s" % fid)
1636        req = req.get('NewRequestBody', None)
1637        if not req:
1638            raise service_error(service_error.req,
1639                    "Bad request format (no NewRequestBody)")
1640
1641        # import may partially succeed so always save credentials and warn
1642        if not self.auth.import_credentials(data_list=req.get('credential', [])):
1643            self.log.debug("Failed to import delegation credentials(!)")
1644        self.get_grouper_updates(fid)
1645        self.auth.update()
1646        self.auth.save()
1647
1648        try:
1649            access_ok, proof = self.auth.check_attribute(fid, 'new', 
1650                    with_proof=True)
1651        except service_error, e:
1652            self.log.info("New experiment call for %s: access denied" % fid)
1653            raise e
1654
1655
1656        if not access_ok:
1657            self.log.info("New experiment call for %s: Access denied" % fid)
1658            raise service_error(service_error.access, "New access denied",
1659                    proof=[proof])
1660
1661        try:
1662            tmpdir = tempfile.mkdtemp(prefix="split-")
1663        except EnvironmentError:
1664            raise service_error(service_error.internal, "Cannot create tmp dir")
1665
1666        # Generate an ID for the experiment (slice) and a certificate that the
1667        # allocator can use to prove they own it.  We'll ship it back through
1668        # the encrypted connection.  If the requester supplied one, use it.
1669        if 'experimentAccess' in req and 'X509' in req['experimentAccess']:
1670            expcert = req['experimentAccess']['X509']
1671            expid = fedid(certstr=expcert)
1672            self.state_lock.acquire()
1673            if expid in self.state:
1674                self.state_lock.release()
1675                raise service_error(service_error.req, 
1676                        'fedid %s identifies an existing experiment' % expid)
1677            self.state_lock.release()
1678        else:
1679            (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1680
1681        #now we're done with the tmpdir, and it should be empty
1682        if self.cleanup:
1683            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1684            os.rmdir(tmpdir)
1685        else:
1686            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1687
1688        eid = self.create_experiment_state(fid, req, expid, expcert, 
1689                state='empty')
1690
1691        rv = {
1692                'experimentID': [
1693                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1694                ],
1695                'experimentStatus': 'empty',
1696                'experimentAccess': { 'X509' : expcert },
1697                'proof': proof.to_dict(),
1698            }
1699
1700        self.log.info("New experiment call succeeded for %s" % fid)
1701        return rv
1702
1703    # create_experiment sub-functions
1704
1705    @staticmethod
1706    def get_experiment_key(req, field='experimentID'):
1707        """
1708        Parse the experiment identifiers out of the request (the request body
1709        tag has been removed).  Specifically this pulls either the fedid or the
1710        localname out of the experimentID field.  A fedid is preferred.  If
1711        neither is present or the request does not contain the fields,
1712        service_errors are raised.
1713        """
1714        # Get the experiment access
1715        exp = req.get(field, None)
1716        if exp:
1717            if exp.has_key('fedid'):
1718                key = exp['fedid']
1719            elif exp.has_key('localname'):
1720                key = exp['localname']
1721            else:
1722                raise service_error(service_error.req, "Unknown lookup type")
1723        else:
1724            raise service_error(service_error.req, "No request?")
1725
1726        return key
1727
1728    def get_experiment_ids_and_start(self, key, tmpdir):
1729        """
1730        Get the experiment name, id and access certificate from the state, and
1731        set the experiment state to 'starting'.  returns a triple (fedid,
1732        localname, access_cert_file). The access_cert_file is a copy of the
1733        contents of the access certificate, created in the tempdir with
1734        restricted permissions.  If things are confused, raise an exception.
1735        """
1736
1737        expid = eid = None
1738        self.state_lock.acquire()
1739        if key in self.state:
1740            exp = self.state[key]
1741            exp.status = "starting"
1742            exp.updated()
1743            expid = exp.fedid
1744            eid = exp.localname
1745            expcert = exp.identity
1746        self.state_lock.release()
1747
1748        # make a protected copy of the access certificate so the experiment
1749        # controller can act as the experiment principal.
1750        if expcert:
1751            expcert_file = self.make_temp_certfile(expcert, tmpdir)
1752            if not expcert_file:
1753                raise service_error(service_error.internal, 
1754                        "Cannot create temp cert file?")
1755        else:
1756            expcert_file = None
1757
1758        return (eid, expid, expcert_file)
1759
1760    def get_topology(self, req, tmpdir):
1761        """
1762        Get the ns2 content and put it into a file for parsing.  Call the local
1763        or remote parser and return the topdl.Topology.  Errors result in
1764        exceptions.  req is the request and tmpdir is a work directory.
1765        """
1766
1767        # The tcl parser needs to read a file so put the content into that file
1768        descr=req.get('experimentdescription', None)
1769        if descr:
1770            if 'ns2description' in descr:
1771                file_content=descr['ns2description']
1772            elif 'topdldescription' in descr:
1773                return topdl.Topology(**descr['topdldescription'])
1774            else:
1775                raise service_error(service_error.req, 
1776                        'Unknown experiment description type')
1777        else:
1778            raise service_error(service_error.req, "No experiment description")
1779
1780
1781        if self.splitter_url:
1782            self.log.debug("Calling remote topdl translator at %s" % \
1783                    self.splitter_url)
1784            top = self.remote_ns2topdl(self.splitter_url, file_content)
1785        else:
1786            tclfile = os.path.join(tmpdir, "experiment.tcl")
1787            if file_content:
1788                try:
1789                    f = open(tclfile, 'w')
1790                    f.write(file_content)
1791                    f.close()
1792                except EnvironmentError:
1793                    raise service_error(service_error.internal,
1794                            "Cannot write temp experiment description")
1795            else:
1796                raise service_error(service_error.req, 
1797                        "Only ns2descriptions supported")
1798            pid = "dummy"
1799            gid = "dummy"
1800            eid = "dummy"
1801
1802            tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
1803                str(self.muxmax), '-m', 'dummy']
1804
1805            tclcmd.extend([pid, gid, eid, tclfile])
1806
1807            self.log.debug("running local splitter %s", " ".join(tclcmd))
1808            # This is just fantastic.  As a side effect the parser copies
1809            # tb_compat.tcl into the current directory, so that directory
1810            # must be writable by the fedd user.  Doing this in the
1811            # temporary subdir ensures this is the case.
1812            tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
1813                    cwd=tmpdir)
1814            split_data = tclparser.stdout
1815
1816            top = topdl.topology_from_xml(file=split_data, top="experiment")
1817            os.remove(tclfile)
1818
1819        return top
1820
1821    def get_testbed_services(self, req, testbeds):
1822        """
1823        Parse the services section of the request into two dicts mapping
1824        testbed to lists of federated_service objects.  The first dict maps all
1825        exporters of services to those service objects, the second maps
1826        testbeds to service objects only for services requiring portals.
1827        """
1828        # We construct both dicts here because deriving the second is more
1829        # comples than it looks - both the keys and lists can differ, and it's
1830        # much easier to generate both in one pass.
1831        masters = { }
1832        pmasters = { }
1833        for s in req.get('service', []):
1834            # If this is a service request with the importall field
1835            # set, fill it out.
1836
1837            if s.get('importall', False):
1838                s['import'] = [ tb for tb in testbeds \
1839                        if tb not in s.get('export',[])]
1840                del s['importall']
1841
1842            # Add the service to masters
1843            for tb in s.get('export', []):
1844                if s.get('name', None):
1845
1846                    params = { }
1847                    for a in s.get('fedAttr', []):
1848                        params[a.get('attribute', '')] = a.get('value','')
1849
1850                    fser = federated_service(name=s['name'],
1851                            exporter=tb, importers=s.get('import',[]),
1852                            params=params)
1853                    if fser.name == 'hide_hosts' \
1854                            and 'hosts' not in fser.params:
1855                        fser.params['hosts'] = \
1856                                ",".join(tb_hosts.get(fser.exporter, []))
1857                    if tb in masters: masters[tb].append(fser)
1858                    else: masters[tb] = [fser]
1859
1860                    if fser.portal:
1861                        if tb in pmasters: pmasters[tb].append(fser)
1862                        else: pmasters[tb] = [fser]
1863                else:
1864                    self.log.error('Testbed service does not have name " + \
1865                            "and importers')
1866        return masters, pmasters
1867
1868    def generate_keys_and_hosts(self, tmpdir, expid, hosts, tbparams):
1869        """
1870        Create the ssh keys necessary for interconnecting the portal nodes and
1871        the global hosts file for letting each segment know about the IP
1872        addresses in play.  Save these into the repo.  Add attributes to the
1873        autorizer allowing access controllers to download them and return a set
1874        of attributes that inform the segments where to find this stuff.  May
1875        raise service_errors in if there are problems.
1876        """
1877        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1878        gw_secretkey_base = "fed.%s" % self.ssh_type
1879        keydir = os.path.join(tmpdir, 'keys')
1880        gw_pubkey = os.path.join(keydir, gw_pubkey_base)
1881        gw_secretkey = os.path.join(keydir, gw_secretkey_base)
1882
1883        try:
1884            self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1885        except ValueError:
1886            raise service_error(service_error.server_config, 
1887                    "Bad key type (%s)" % self.ssh_type)
1888
1889        self.generate_seer_certs(keydir)
1890
1891        # Copy configuration files into the remote file store
1892        # The config urlpath
1893        configpath = "/%s/config" % expid
1894        # The config file system location
1895        configdir ="%s%s" % ( self.repodir, configpath)
1896        try:
1897            os.makedirs(configdir)
1898        except EnvironmentError, e:
1899            raise service_error(service_error.internal,
1900                    "Cannot create config directory: %s" % e)
1901        try:
1902            f = open("%s/hosts" % configdir, "w")
1903            print >> f, string.join(hosts, '\n')
1904            f.close()
1905        except EnvironmentError, e:
1906            raise service_error(service_error.internal, 
1907                    "Cannot write hosts file: %s" % e)
1908        try:
1909            copy_file(gw_pubkey, os.path.join(configdir, gw_pubkey_base))
1910            copy_file(gw_secretkey, os.path.join(configdir, gw_secretkey_base))
1911            copy_file(os.path.join(keydir, 'ca.pem'), 
1912                    os.path.join(configdir, 'ca.pem'))
1913            copy_file(os.path.join(keydir, 'node.pem'), 
1914                    os.path.join(configdir, 'node.pem'))
1915        except EnvironmentError, e:
1916            raise service_error(service_error.internal, 
1917                    "Cannot copy keyfiles: %s" % e)
1918
1919        # Allow the individual testbeds to access the configuration files,
1920        # again by setting an attribute for the relevant pathnames on each
1921        # allocation principal.  Yeah, that's a long list comprehension.
1922        self.append_experiment_authorization(expid, set([
1923            (tbparams[tb].allocID, "%s/%s" % (configpath, f)) \
1924                    for tb in tbparams.keys() \
1925                        for f in ("hosts", 'ca.pem', 'node.pem', 
1926                            gw_secretkey_base, gw_pubkey_base)]))
1927
1928        attrs = [ 
1929                {
1930                    'attribute': 'ssh_pubkey', 
1931                    'value': '%s/%s/config/%s' % \
1932                            (self.repo_url, expid, gw_pubkey_base)
1933                },
1934                {
1935                    'attribute': 'ssh_secretkey', 
1936                    'value': '%s/%s/config/%s' % \
1937                            (self.repo_url, expid, gw_secretkey_base)
1938                },
1939                {
1940                    'attribute': 'hosts', 
1941                    'value': '%s/%s/config/hosts' % \
1942                            (self.repo_url, expid)
1943                },
1944                {
1945                    'attribute': 'seer_ca_pem', 
1946                    'value': '%s/%s/config/%s' % \
1947                            (self.repo_url, expid, 'ca.pem')
1948                },
1949                {
1950                    'attribute': 'seer_node_pem', 
1951                    'value': '%s/%s/config/%s' % \
1952                            (self.repo_url, expid, 'node.pem')
1953                },
1954            ]
1955        return attrs
1956
1957
1958    def get_vtopo(self, req, fid):
1959        """
1960        Return the stored virtual topology for this experiment
1961        """
1962        rv = None
1963        state = None
1964        self.log.info("vtopo call started for %s" %  fid)
1965
1966        req = req.get('VtopoRequestBody', None)
1967        if not req:
1968            raise service_error(service_error.req,
1969                    "Bad request format (no VtopoRequestBody)")
1970        exp = req.get('experiment', None)
1971        if exp:
1972            if exp.has_key('fedid'):
1973                key = exp['fedid']
1974                keytype = "fedid"
1975            elif exp.has_key('localname'):
1976                key = exp['localname']
1977                keytype = "localname"
1978            else:
1979                raise service_error(service_error.req, "Unknown lookup type")
1980        else:
1981            raise service_error(service_error.req, "No request?")
1982
1983        try:
1984            proof = self.check_experiment_access(fid, key)
1985        except service_error, e:
1986            self.log.info("vtopo call failed for %s: access denied" %  fid)
1987            raise e
1988
1989        self.state_lock.acquire()
1990        # XXX: this needs to be recalculated
1991        if key in self.state:
1992            if self.state[key].top is not None:
1993                vtopo = topdl.topology_to_vtopo(self.state[key].top)
1994                rv = { 'experiment' : {keytype: key },
1995                        'vtopo': vtopo,
1996                        'proof': proof.to_dict(), 
1997                    }
1998            else:
1999                state = self.state[key].status
2000        self.state_lock.release()
2001
2002        if rv: 
2003            self.log.info("vtopo call completed for %s %s " % \
2004                (key, fid))
2005            return rv
2006        else: 
2007            if state:
2008                self.log.info("vtopo call completed for %s %s (Not ready)" % \
2009                    (key, fid))
2010                raise service_error(service_error.partial, 
2011                        "Not ready: %s" % state)
2012            else:
2013                self.log.info("vtopo call completed for %s %s (No experiment)"\
2014                        % (key, fid))
2015                raise service_error(service_error.req, "No such experiment")
2016
2017    def get_vis(self, req, fid):
2018        """
2019        Return the stored visualization for this experiment
2020        """
2021        rv = None
2022        state = None
2023
2024        self.log.info("vis call started for %s" %  fid)
2025        req = req.get('VisRequestBody', None)
2026        if not req:
2027            raise service_error(service_error.req,
2028                    "Bad request format (no VisRequestBody)")
2029        exp = req.get('experiment', None)
2030        if exp:
2031            if exp.has_key('fedid'):
2032                key = exp['fedid']
2033                keytype = "fedid"
2034            elif exp.has_key('localname'):
2035                key = exp['localname']
2036                keytype = "localname"
2037            else:
2038                raise service_error(service_error.req, "Unknown lookup type")
2039        else:
2040            raise service_error(service_error.req, "No request?")
2041
2042        try:
2043            proof = self.check_experiment_access(fid, key)
2044        except service_error, e:
2045            self.log.info("vis call failed for %s: access denied" %  fid)
2046            raise e
2047
2048        self.state_lock.acquire()
2049        # Generate the visualization
2050        if key in self.state:
2051            if self.state[key].top is not None:
2052                try:
2053                    vis = self.genviz(
2054                            topdl.topology_to_vtopo(self.state[key].top))
2055                except service_error, e:
2056                    self.state_lock.release()
2057                    raise e
2058                rv =  { 'experiment' : {keytype: key },
2059                        'vis': vis,
2060                        'proof': proof.to_dict(), 
2061                        }
2062            else:
2063                state = self.state[key].status
2064        self.state_lock.release()
2065
2066        if rv: 
2067            self.log.info("vis call completed for %s %s " % \
2068                (key, fid))
2069            return rv
2070        else:
2071            if state:
2072                self.log.info("vis call completed for %s %s (not ready)" % \
2073                    (key, fid))
2074                raise service_error(service_error.partial, 
2075                        "Not ready: %s" % state)
2076            else:
2077                self.log.info("vis call completed for %s %s (no experiment)" % \
2078                    (key, fid))
2079                raise service_error(service_error.req, "No such experiment")
2080
2081   
2082    def save_federant_information(self, allocated, tbparams, eid, top):
2083        """
2084        Store the various data that have changed in the experiment state
2085        between when it was started and the beginning of resource allocation.
2086        This is basically the information about each local allocation.  This
2087        fills in the values of the placeholder allocation in the state.  It
2088        also collects the access proofs and returns them as dicts for a
2089        response message.
2090        """
2091        self.state_lock.acquire()
2092        exp = self.state[eid]
2093        exp.top = top.clone()
2094        # save federant information
2095        for k in allocated.keys():
2096            exp.add_allocation(tbparams[k])
2097            top.elements.append(topdl.Testbed(uri=tbparams[k].uri,
2098                type="testbed", localname=[k], 
2099                service=[ s.to_topdl() for s in tbparams[k].services]))
2100
2101        # Access proofs for the response message
2102        proofs = [copy.deepcopy(p) for k in tbparams.keys()\
2103                    for p in tbparams[k].proof]
2104        exp.updated()
2105        if self.state_filename: 
2106            self.write_state()
2107        self.state_lock.release()
2108        return proofs
2109
2110    def clear_placeholder(self, eid, expid, tmpdir):
2111        """
2112        Clear the placeholder and remove any allocated temporary dir.
2113        """
2114
2115        self.state_lock.acquire()
2116        del self.state[eid]
2117        del self.state[expid]
2118        if self.state_filename: self.write_state()
2119        self.state_lock.release()
2120        if tmpdir and self.cleanup:
2121            self.remove_dirs(tmpdir)
2122
2123    # end of create_experiment sub-functions
2124
2125    def create_experiment(self, req, fid):
2126        """
2127        The external interface to experiment creation called from the
2128        dispatcher.
2129
2130        Creates a working directory, splits the incoming description using the
2131        splitter script and parses out the various subsections using the
2132        classes above.  Once each sub-experiment is created, use pooled threads
2133        to instantiate them and start it all up.
2134        """
2135
2136        self.log.info("Create experiment call started for %s" % fid)
2137        req = req.get('CreateRequestBody', None)
2138        if req:
2139            key = self.get_experiment_key(req)
2140        else:
2141            raise service_error(service_error.req,
2142                    "Bad request format (no CreateRequestBody)")
2143
2144        # Import information from the requester
2145        # import may partially succeed so always save credentials and warn
2146        if not self.auth.import_credentials(data_list=req.get('credential',[])):
2147            self.log.debug("Failed to import delegation credentials(!)")
2148        self.get_grouper_updates(fid)
2149        self.auth.update()
2150        self.auth.save()
2151
2152        try:
2153            # Make sure that the caller can talk to us
2154            proof = self.check_experiment_access(fid, key)
2155        except service_error, e:
2156            self.log.info("Create experiment call failed for %s: access denied"\
2157                    % fid)
2158            raise e
2159
2160
2161        # Install the testbed map entries supplied with the request into a copy
2162        # of the testbed map.
2163        tbmap = dict(self.tbmap)
2164        for m in req.get('testbedmap', []):
2165            if 'testbed' in m and 'uri' in m:
2166                tbmap[m['testbed']] = m['uri']
2167
2168        # a place to work
2169        try:
2170            tmpdir = tempfile.mkdtemp(prefix="split-")
2171            os.mkdir(tmpdir+"/keys")
2172        except EnvironmentError:
2173            raise service_error(service_error.internal, "Cannot create tmp dir")
2174
2175        tbparams = { }
2176
2177        eid, expid, expcert_file = \
2178                self.get_experiment_ids_and_start(key, tmpdir)
2179
2180        # This catches exceptions to clear the placeholder if necessary
2181        try: 
2182            if not (eid and expid):
2183                raise service_error(service_error.internal, 
2184                        "Cannot find local experiment info!?")
2185
2186            top = self.get_topology(req, tmpdir)
2187            self.confirm_software(top)
2188            # Assign the IPs
2189            hosts, ip_allocator = self.allocate_ips_to_topo(top)
2190            # Find the testbeds to look up
2191            tb_hosts = { }
2192            testbeds = [ ]
2193            for e in top.elements:
2194                if isinstance(e, topdl.Computer):
2195                    tb = e.get_attribute('testbed') or 'default'
2196                    if tb in tb_hosts: tb_hosts[tb].append(e.name)
2197                    else: 
2198                        tb_hosts[tb] = [ e.name ]
2199                        testbeds.append(tb)
2200
2201            masters, pmasters = self.get_testbed_services(req, testbeds)
2202            allocated = { }         # Testbeds we can access
2203            topo ={ }               # Sub topologies
2204            connInfo = { }          # Connection information
2205
2206            self.get_access_to_testbeds(testbeds, fid, allocated, 
2207                    tbparams, masters, tbmap, expid, expcert_file)
2208
2209            # tbactive is the set of testbeds that have NATs in front of their
2210            # portals. They need to initiate connections.
2211            tbactive = set([k for k, v in tbparams.items() \
2212                    if v.get_attribute('nat_portals')])
2213
2214            self.split_topology(top, topo, testbeds)
2215
2216            attrs = self.generate_keys_and_hosts(tmpdir, expid, hosts, tbparams)
2217
2218            part = experiment_partition(self.auth, self.store_url, tbmap,
2219                    self.muxmax, self.direct_transit, tbactive)
2220            part.add_portals(top, topo, eid, pmasters, tbparams, ip_allocator,
2221                    connInfo, expid)
2222
2223            auth_attrs = set()
2224            # Now get access to the dynamic testbeds (those added above)
2225            for tb in [ t for t in topo if t not in allocated]:
2226                self.get_access(tb, tbparams, fid, masters, tbmap, 
2227                        expid, expcert_file)
2228                allocated[tb] = 1
2229                store_keys = topo[tb].get_attribute('store_keys')
2230                # Give the testbed access to keys it exports or imports
2231                if store_keys:
2232                    auth_attrs.update(set([
2233                        (tbparams[tb].allocID, sk) \
2234                                for sk in store_keys.split(" ")]))
2235
2236            if auth_attrs:
2237                self.append_experiment_authorization(expid, auth_attrs)
2238
2239            # transit and disconnected testbeds may not have a connInfo entry.
2240            # Fill in the blanks.
2241            for t in allocated.keys():
2242                if not connInfo.has_key(t):
2243                    connInfo[t] = { }
2244
2245            self.wrangle_software(expid, top, topo, tbparams)
2246
2247            proofs = self.save_federant_information(allocated, tbparams, 
2248                    eid, top)
2249        except service_error, e:
2250            # If something goes wrong in the parse (usually an access error)
2251            # clear the placeholder state.  From here on out the code delays
2252            # exceptions.  Failing at this point returns a fault to the remote
2253            # caller.
2254
2255            self.log.info("Create experiment call failed for %s %s: %s" % 
2256                    (eid, fid, e))
2257            self.clear_placeholder(eid, expid, tmpdir)
2258            raise e
2259
2260        # Start the background swapper and return the starting state.  From
2261        # here on out, the state will stick around a while.
2262
2263        # Create a logger that logs to the experiment's state object as well as
2264        # to the main log file.
2265        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
2266        alloc_collector = self.list_log(self.state[eid].log)
2267        h = logging.StreamHandler(alloc_collector)
2268        # XXX: there should be a global one of these rather than repeating the
2269        # code.
2270        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2271                    '%d %b %y %H:%M:%S'))
2272        alloc_log.addHandler(h)
2273
2274        # Start a thread to do the resource allocation
2275        t  = Thread(target=self.allocate_resources,
2276                args=(allocated, masters, eid, expid, tbparams, 
2277                    top, topo, tmpdir, alloc_log, alloc_collector, attrs,
2278                    connInfo, tbmap, expcert_file),
2279                name=eid)
2280        t.start()
2281
2282        rv = {
2283                'experimentID': [
2284                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2285                ],
2286                'experimentStatus': 'starting',
2287                'proof': [ proof.to_dict() ] + proofs,
2288            }
2289        self.log.info("Create experiment call succeeded for %s %s" % \
2290                (eid, fid))
2291
2292        return rv
2293   
2294    def get_experiment_fedid(self, key):
2295        """
2296        find the fedid associated with the localname key in the state database.
2297        """
2298
2299        rv = None
2300        self.state_lock.acquire()
2301        if key in self.state:
2302            rv = self.state[key].fedid
2303        self.state_lock.release()
2304        return rv
2305
2306    def check_experiment_access(self, fid, key):
2307        """
2308        Confirm that the fid has access to the experiment.  Though a request
2309        may be made in terms of a local name, the access attribute is always
2310        the experiment's fedid.
2311        """
2312        if not isinstance(key, fedid):
2313            key = self.get_experiment_fedid(key)
2314
2315        access_ok, proof = self.auth.check_attribute(fid, key, with_proof=True)
2316
2317        if access_ok:
2318            return proof
2319        else:
2320            raise service_error(service_error.access, "Access Denied",
2321                proof)
2322
2323
2324    def get_handler(self, path, fid):
2325        """
2326        Perhaps surprisingly named, this function handles HTTP GET requests to
2327        this server (SOAP requests are POSTs).
2328        """
2329        self.log.info("Get handler %s %s" % (path, fid))
2330        if len("%s" % fid) == 0:
2331            return (None, None)
2332        # XXX: log proofs?
2333        if self.auth.check_attribute(fid, path):
2334            return ("%s/%s" % (self.repodir, path), "application/binary")
2335        else:
2336            return (None, None)
2337
2338    def update_info(self, key, force=False):
2339        top = None
2340        self.state_lock.acquire()
2341        if key in self.state:
2342            if force or self.state[key].older_than(self.info_cache_limit):
2343                top = self.state[key].top
2344                if top is not None: top = top.clone()
2345                d1, info_params, cert, d2 = \
2346                        self.get_segment_info(self.state[key], need_lock=False)
2347        self.state_lock.release()
2348
2349        if top is None: return
2350
2351        try:
2352            tmpdir = tempfile.mkdtemp(prefix="info-")
2353        except EnvironmentError:
2354            raise service_error(service_error.internal, 
2355                    "Cannot create tmp dir")
2356        cert_file = self.make_temp_certfile(cert, tmpdir)
2357
2358        data = []
2359        try:
2360            for k, (uri, aid) in info_params.items():
2361                info=self.info_segment(log=self.log, testbed=uri,
2362                            cert_file=cert_file, cert_pwd=None,
2363                            trusted_certs=self.trusted_certs,
2364                            caller=self.call_InfoSegment)
2365                info(uri, aid)
2366                data.append(info)
2367        # Clean up the tmpdir no matter what
2368        finally:
2369            if tmpdir: self.remove_dirs(tmpdir)
2370
2371        self.annotate_topology(top, data)
2372        self.state_lock.acquire()
2373        if key in self.state:
2374            self.state[key].top = top
2375            self.state[key].updated()
2376            if self.state_filename: self.write_state()
2377        self.state_lock.release()
2378
2379   
2380    def get_info(self, req, fid):
2381        """
2382        Return all the stored info about this experiment
2383        """
2384        rv = None
2385
2386        self.log.info("Info call started for %s" %  fid)
2387        req = req.get('InfoRequestBody', None)
2388        if not req:
2389            raise service_error(service_error.req,
2390                    "Bad request format (no InfoRequestBody)")
2391        exp = req.get('experiment', None)
2392        legacy = req.get('legacy', False)
2393        fresh = req.get('fresh', False)
2394        if exp:
2395            if exp.has_key('fedid'):
2396                key = exp['fedid']
2397                keytype = "fedid"
2398            elif exp.has_key('localname'):
2399                key = exp['localname']
2400                keytype = "localname"
2401            else:
2402                raise service_error(service_error.req, "Unknown lookup type")
2403        else:
2404            raise service_error(service_error.req, "No request?")
2405
2406        try:
2407            proof = self.check_experiment_access(fid, key)
2408        except service_error, e:
2409            self.log.info("Info call failed for %s: access denied" %  fid)
2410
2411
2412        self.update_info(key, fresh)
2413
2414        self.state_lock.acquire()
2415        if self.state.has_key(key):
2416            rv = self.state[key].get_info()
2417            # Copy the topo if we need legacy annotations
2418            if legacy:
2419                top = self.state[key].top
2420                if top is not None: top = top.clone()
2421        self.state_lock.release()
2422        self.log.info("Gathered Info for %s %s" % (key, fid))
2423
2424        # If the legacy visualization and topology representations are
2425        # requested, calculate them and add them to the return.
2426        if legacy and rv is not None:
2427            self.log.info("Generating legacy Info for %s %s" % (key, fid))
2428            if top is not None:
2429                vtopo = topdl.topology_to_vtopo(top)
2430                if vtopo is not None:
2431                    rv['vtopo'] = vtopo
2432                    try:
2433                        vis = self.genviz(vtopo)
2434                    except service_error, e:
2435                        self.log.debug('Problem generating visualization: %s' \
2436                                % e)
2437                        vis = None
2438                    if vis is not None:
2439                        rv['vis'] = vis
2440        if rv:
2441            self.log.info("Info succeded for %s %s" % (key, fid))
2442            rv['proof'] = proof.to_dict()
2443            return rv
2444        else: 
2445            self.log.info("Info failed for %s %s: no experiment" % (key, fid))
2446            raise service_error(service_error.req, "No such experiment")
2447
2448    def operate_on_segments(self, op_params, cert, op, testbeds, params, 
2449            results):
2450        """
2451        Call OperateSegment on multiple testbeds and gather the results.
2452        op_params contains the parameters needed to contact that testbed, cert
2453        is a certificate containing the fedid to use, op is the operation,
2454        testbeds is a dict mapping testbed name to targets in that testbed,
2455        params are the parameters to include a,d results is a growing list of
2456        the results of the calls.
2457        """
2458        try:
2459            tmpdir = tempfile.mkdtemp(prefix="info-")
2460        except EnvironmentError:
2461            raise service_error(service_error.internal, 
2462                    "Cannot create tmp dir")
2463        cert_file = self.make_temp_certfile(cert, tmpdir)
2464
2465        try:
2466            for tb, targets in testbeds.items():
2467                if tb in op_params:
2468                    uri, aid = op_params[tb]
2469                    operate=self.operation_segment(log=self.log, testbed=uri,
2470                                cert_file=cert_file, cert_pwd=None,
2471                                trusted_certs=self.trusted_certs,
2472                                caller=self.call_OperationSegment)
2473                    if operate(uri, aid, op, targets, params):
2474                        if operate.status is not None:
2475                            results.extend(operate.status)
2476                            continue
2477                # Something went wrong in a weird way.  Add statuses
2478                # that reflect that to results
2479                for t in targets:
2480                    results.append(operation_status(t, 
2481                        operation_status.federant,
2482                        'Unexpected error on %s' % tb))
2483        # Clean up the tmpdir no matter what
2484        finally:
2485            if tmpdir: self.remove_dirs(tmpdir)
2486
2487    def do_operation(self, req, fid):
2488        """
2489        Find the testbeds holding each target and ask them to carry out the
2490        operation.  Return the statuses.
2491        """
2492        # Map an element to the testbed containing it
2493        def element_to_tb(e):
2494            if isinstance(e, topdl.Computer): return e.get_attribute("testbed")
2495            elif isinstance(e, topdl.Testbed): return e.name
2496            else: return None
2497        # If d is an operation_status object, make it a dict
2498        def make_dict(d):
2499            if isinstance(d, dict): return d
2500            elif isinstance(d, operation_status): return d.to_dict()
2501            else: return { }
2502
2503        def element_name(e):
2504            if isinstance(e, topdl.Computer): return e.name
2505            elif isinstance(e, topdl.Testbed): 
2506                if e.localname: return e.localname[0]
2507                else: return None
2508            else: return None
2509
2510        self.log.info("Operation call started for %s" %  fid)
2511        req = req.get('OperationRequestBody', None)
2512        if not req:
2513            raise service_error(service_error.req,
2514                    "Bad request format (no OperationRequestBody)")
2515        exp = req.get('experiment', None)
2516        op = req.get('operation', None)
2517        targets = set(req.get('target', []))
2518        params = req.get('parameter', None)
2519
2520        if exp:
2521            if 'fedid' in exp:
2522                key = exp['fedid']
2523                keytype = "fedid"
2524            elif 'localname' in exp:
2525                key = exp['localname']
2526                keytype = "localname"
2527            else:
2528                raise service_error(service_error.req, "Unknown lookup type")
2529        else:
2530            raise service_error(service_error.req, "No request?")
2531
2532        if op is None or not targets:
2533            raise service_error(service_error.req, "No request?")
2534
2535        try:
2536            proof = self.check_experiment_access(fid, key)
2537        except service_error, e:
2538            self.log.info("Operation call failed for %s: access denied" %  fid)
2539            raise e
2540
2541        self.state_lock.acquire()
2542        if key in self.state:
2543            d1, op_params, cert, d2 = \
2544                    self.get_segment_info(self.state[key], need_lock=False,
2545                            key='tb')
2546            top = self.state[key].top
2547            if top is not None:
2548                top = top.clone()
2549        self.state_lock.release()
2550
2551        if top is None:
2552            self.log.info("Operation call failed for %s: not active" %  fid)
2553            raise service_error(service_error.partial, "No topology yet", 
2554                    proof=proof)
2555
2556        testbeds = { }
2557        results = []
2558        for e in top.elements:
2559            ename = element_name(e)
2560            if ename in targets:
2561                tb = element_to_tb(e)
2562                targets.remove(ename)
2563                if tb is not None:
2564                    if tb in testbeds: testbeds[tb].append(ename)
2565                    else: testbeds[tb] = [ ename ]
2566                else:
2567                    results.append(operation_status(e.name, 
2568                        code=operation_status.no_target, 
2569                        description='Cannot map target to testbed'))
2570
2571        for t in targets:
2572            results.append(operation_status(t, operation_status.no_target))
2573
2574        self.operate_on_segments(op_params, cert, op, testbeds, params,
2575                results)
2576
2577        self.log.info("Operation call succeeded for %s" %  fid)
2578        return { 
2579                'experiment': exp, 
2580                'status': [make_dict(r) for r in results],
2581                'proof': proof.to_dict()
2582                }
2583
2584
2585    def get_multi_info(self, req, fid):
2586        """
2587        Return all the stored info that this fedid can access
2588        """
2589        rv = { 'info': [ ], 'proof': [ ] }
2590
2591        self.log.info("Multi Info call started for %s" %  fid)
2592        self.get_grouper_updates(fid)
2593        self.auth.update()
2594        self.auth.save()
2595        self.state_lock.acquire()
2596        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2597            try:
2598                proof = self.check_experiment_access(fid, key)
2599            except service_error, e:
2600                if e.code == service_error.access:
2601                    continue
2602                else:
2603                    self.log.info("Multi Info call failed for %s: %s" %  \
2604                            (e,fid))
2605                    self.state_lock.release()
2606                    raise e
2607
2608            if self.state.has_key(key):
2609                e = self.state[key].get_info()
2610                e['proof'] = proof.to_dict()
2611                rv['info'].append(e)
2612                rv['proof'].append(proof.to_dict())
2613        self.state_lock.release()
2614        self.log.info("Multi Info call succeeded for %s" %  fid)
2615        return rv
2616
2617    def check_termination_status(self, fed_exp, force):
2618        """
2619        Confirm that the experiment is sin a valid state to stop (or force it)
2620        return the state - invalid states for deletion and force settings cause
2621        exceptions.
2622        """
2623        self.state_lock.acquire()
2624        status = fed_exp.status
2625
2626        if status:
2627            if status in ('starting', 'terminating'):
2628                if not force:
2629                    self.state_lock.release()
2630                    raise service_error(service_error.partial, 
2631                            'Experiment still being created or destroyed')
2632                else:
2633                    self.log.warning('Experiment in %s state ' % status + \
2634                            'being terminated by force.')
2635            self.state_lock.release()
2636            return status
2637        else:
2638            # No status??? trouble
2639            self.state_lock.release()
2640            raise service_error(service_error.internal,
2641                    "Experiment has no status!?")
2642
2643    def get_segment_info(self, fed_exp, need_lock=True, key='aid'):
2644        ids = []
2645        term_params = { }
2646        if need_lock: self.state_lock.acquire()
2647        ids = [ x for x in (fed_exp.localname, fed_exp.fedid) if x is not None ]
2648        expcert = fed_exp.identity
2649        repo = "%s" % fed_exp.fedid
2650
2651        # Collect the allocation/segment ids into a dict keyed by the fedid
2652        # of the allocation that contains a tuple of uri, aid
2653        for i, fed in enumerate(fed_exp.get_all_allocations()):
2654            uri = fed.uri
2655            aid = fed.allocID
2656            if key == 'aid': term_params[aid] = (uri, aid)
2657            elif key == 'tb': term_params[fed.tb] = (uri, aid)
2658
2659        if need_lock: self.state_lock.release()
2660        return ids, term_params, expcert, repo
2661
2662
2663    def get_termination_info(self, fed_exp):
2664        self.state_lock.acquire()
2665        ids, term_params, expcert, repo = self.get_segment_info(fed_exp, False)
2666        # Change the experiment state
2667        fed_exp.status = 'terminating'
2668        fed_exp.updated()
2669        if self.state_filename: self.write_state()
2670        self.state_lock.release()
2671
2672        return ids, term_params, expcert, repo
2673
2674
2675    def deallocate_resources(self, term_params, expcert, status, force, 
2676            dealloc_log):
2677        tmpdir = None
2678        # This try block makes sure the tempdir is cleared
2679        try:
2680            # If no expcert, try the deallocation as the experiment
2681            # controller instance.
2682            if expcert and self.auth_type != 'legacy': 
2683                try:
2684                    tmpdir = tempfile.mkdtemp(prefix="term-")
2685                except EnvironmentError:
2686                    raise service_error(service_error.internal, 
2687                            "Cannot create tmp dir")
2688                cert_file = self.make_temp_certfile(expcert, tmpdir)
2689                pw = None
2690            else: 
2691                cert_file = self.cert_file
2692                pw = self.cert_pwd
2693
2694            # Stop everyone.  NB, wait_for_all waits until a thread starts
2695            # and then completes, so we can't wait if nothing starts.  So,
2696            # no tbparams, no start.
2697            if len(term_params) > 0:
2698                tp = thread_pool(self.nthreads)
2699                for k, (uri, aid) in term_params.items():
2700                    # Create and start a thread to stop the segment
2701                    tp.wait_for_slot()
2702                    t  = pooled_thread(\
2703                            target=self.terminate_segment(log=dealloc_log,
2704                                testbed=uri,
2705                                cert_file=cert_file, 
2706                                cert_pwd=pw,
2707                                trusted_certs=self.trusted_certs,
2708                                caller=self.call_TerminateSegment),
2709                            args=(uri, aid), name=k,
2710                            pdata=tp, trace_file=self.trace_file)
2711                    t.start()
2712                # Wait for completions
2713                tp.wait_for_all_done()
2714
2715            # release the allocations (failed experiments have done this
2716            # already, and starting experiments may be in odd states, so we
2717            # ignore errors releasing those allocations
2718            try: 
2719                for k, (uri, aid)  in term_params.items():
2720                    self.release_access(None, aid, uri=uri,
2721                            cert_file=cert_file, cert_pwd=pw)
2722            except service_error, e:
2723                if status != 'failed' and not force:
2724                    raise e
2725
2726        # Clean up the tmpdir no matter what
2727        finally:
2728            if tmpdir: self.remove_dirs(tmpdir)
2729
2730    def terminate_experiment(self, req, fid):
2731        """
2732        Swap this experiment out on the federants and delete the shared
2733        information
2734        """
2735        self.log.info("Terminate experiment call started for %s" % fid)
2736        tbparams = { }
2737        req = req.get('TerminateRequestBody', None)
2738        if not req:
2739            raise service_error(service_error.req,
2740                    "Bad request format (no TerminateRequestBody)")
2741
2742        key = self.get_experiment_key(req, 'experiment')
2743        try:
2744            proof = self.check_experiment_access(fid, key)
2745        except service_error, e:
2746            self.log.info(
2747                    "Terminate experiment call failed for %s: access denied" \
2748                            % fid)
2749            raise e
2750        exp = req.get('experiment', False)
2751        force = req.get('force', False)
2752
2753        dealloc_list = [ ]
2754
2755
2756        # Create a logger that logs to the dealloc_list as well as to the main
2757        # log file.
2758        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2759        dealloc_log.info("Terminating %s " %key)
2760        h = logging.StreamHandler(self.list_log(dealloc_list))
2761        # XXX: there should be a global one of these rather than repeating the
2762        # code.
2763        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2764                    '%d %b %y %H:%M:%S'))
2765        dealloc_log.addHandler(h)
2766
2767        self.state_lock.acquire()
2768        fed_exp = self.state.get(key, None)
2769        self.state_lock.release()
2770        repo = None
2771
2772        if fed_exp:
2773            status = self.check_termination_status(fed_exp, force)
2774            # get_termination_info updates the experiment state
2775            ids, term_params, expcert, repo = self.get_termination_info(fed_exp)
2776            self.deallocate_resources(term_params, expcert, status, force, 
2777                    dealloc_log)
2778
2779            # Remove the terminated experiment
2780            self.state_lock.acquire()
2781            for id in ids:
2782                self.clear_experiment_authorization(id, need_state_lock=False)
2783                if id in self.state: del self.state[id]
2784
2785            if self.state_filename: self.write_state()
2786            self.state_lock.release()
2787
2788            # Delete any synch points associated with this experiment.  All
2789            # synch points begin with the fedid of the experiment.
2790            fedid_keys = set(["fedid:%s" % f for f in ids \
2791                    if isinstance(f, fedid)])
2792            for k in self.synch_store.all_keys():
2793                try:
2794                    if len(k) > 45 and k[0:46] in fedid_keys:
2795                        self.synch_store.del_value(k)
2796                except synch_store.BadDeletionError:
2797                    pass
2798            self.write_store()
2799
2800            # Remove software and other cached stuff from the filesystem.
2801            if repo:
2802                self.remove_dirs("%s/%s" % (self.repodir, repo))
2803       
2804            self.log.info("Terminate experiment succeeded for %s %s" % \
2805                    (key, fid))
2806            return { 
2807                    'experiment': exp , 
2808                    'deallocationLog': string.join(dealloc_list, ''),
2809                    'proof': [proof.to_dict()],
2810                    }
2811        else:
2812            self.log.info("Terminate experiment failed for %s %s: no state" % \
2813                    (key, fid))
2814            raise service_error(service_error.req, "No saved state")
2815
2816
2817    def GetValue(self, req, fid):
2818        """
2819        Get a value from the synchronized store
2820        """
2821        req = req.get('GetValueRequestBody', None)
2822        if not req:
2823            raise service_error(service_error.req,
2824                    "Bad request format (no GetValueRequestBody)")
2825       
2826        name = req.get('name', None)
2827        wait = req.get('wait', False)
2828        rv = { 'name': name }
2829
2830        if not name:
2831            raise service_error(service_error.req, "No name?")
2832
2833        access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True)
2834
2835        if access_ok:
2836            self.log.debug("[GetValue] asking for %s " % name)
2837            try:
2838                v = self.synch_store.get_value(name, wait)
2839            except synch_store.RevokedKeyError:
2840                # No more synch on this key
2841                raise service_error(service_error.federant, 
2842                        "Synch key %s revoked" % name)
2843            if v is not None:
2844                rv['value'] = v
2845            rv['proof'] = proof.to_dict()
2846            self.log.debug("[GetValue] got %s from %s" % (v, name))
2847            return rv
2848        else:
2849            raise service_error(service_error.access, "Access Denied",
2850                    proof=proof)
2851       
2852
2853    def SetValue(self, req, fid):
2854        """
2855        Set a value in the synchronized store
2856        """
2857        req = req.get('SetValueRequestBody', None)
2858        if not req:
2859            raise service_error(service_error.req,
2860                    "Bad request format (no SetValueRequestBody)")
2861       
2862        name = req.get('name', None)
2863        v = req.get('value', '')
2864
2865        if not name:
2866            raise service_error(service_error.req, "No name?")
2867
2868        access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True)
2869
2870        if access_ok:
2871            try:
2872                self.synch_store.set_value(name, v)
2873                self.write_store()
2874                self.log.debug("[SetValue] set %s to %s" % (name, v))
2875            except synch_store.CollisionError:
2876                # Translate into a service_error
2877                raise service_error(service_error.req,
2878                        "Value already set: %s" %name)
2879            except synch_store.RevokedKeyError:
2880                # No more synch on this key
2881                raise service_error(service_error.federant, 
2882                        "Synch key %s revoked" % name)
2883                return { 'name': name, 'value': v, 'proof': proof.to_dict() }
2884        else:
2885            raise service_error(service_error.access, "Access Denied",
2886                    proof=proof)
Note: See TracBrowser for help on using the repository browser.