source: fedd/federation/experiment_control.py @ 25bf6cc

compt_changes
Last change on this file since 25bf6cc was 25bf6cc, checked in by Ted Faber <faber@…>, 12 years ago

Grouper updates

  • Property mode set to 100644
File size: 94.4 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15import shutil
16import zipfile
17
18import os.path
19
20import traceback
21# For parsing visualization output and splitter output
22import xml.parsers.expat
23
24from threading import Lock, Thread, Condition
25from subprocess import call, Popen, PIPE
26from string import join
27
28from urlparse import urlparse
29from urllib2 import urlopen
30
31from util import *
32from deter import fedid, generate_fedid
33from remote_service import xmlrpc_handler, soap_handler, service_caller
34from service_error import service_error
35from synch_store import synch_store
36from experiment_partition import experiment_partition
37from experiment_control_legacy import experiment_control_legacy
38from authorizer import abac_authorizer
39from thread_pool import thread_pool, pooled_thread
40from experiment_info import experiment_info, allocation_info, federated_service
41from operation_status import operation_status
42
43from deter import topdl
44from deter import ip_allocator
45from deter import ip_addr
46import list_log
47
48
49class nullHandler(logging.Handler):
50    def emit(self, record): pass
51
52fl = logging.getLogger("fedd.experiment_control")
53fl.addHandler(nullHandler())
54
55class experiment_control_local(experiment_control_legacy):
56    """
57    Control of experiments that this system can directly access.
58
59    Includes experiment creation, termination and information dissemination.
60    Thred safe.
61    """
62
63    class ssh_cmd_timeout(RuntimeError): pass
64   
65    call_RequestAccess = service_caller('RequestAccess')
66    call_ReleaseAccess = service_caller('ReleaseAccess')
67    call_StartSegment = service_caller('StartSegment')
68    call_TerminateSegment = service_caller('TerminateSegment')
69    call_InfoSegment = service_caller('InfoSegment')
70    call_OperationSegment = service_caller('OperationSegment')
71    call_Ns2Topdl = service_caller('Ns2Topdl')
72
73    def __init__(self, config=None, auth=None):
74        """
75        Intialize the various attributes, most from the config object
76        """
77
78        def parse_tarfile_list(tf):
79            """
80            Parse a tarfile list from the configuration.  This is a set of
81            paths and tarfiles separated by spaces.
82            """
83            rv = [ ]
84            if tf is not None:
85                tl = tf.split()
86                while len(tl) > 1:
87                    p, t = tl[0:2]
88                    del tl[0:2]
89                    rv.append((p, t))
90            return rv
91
92        self.list_log = list_log.list_log
93
94        self.cert_file = config.get("experiment_control", "cert_file")
95        if self.cert_file:
96            self.cert_pwd = config.get("experiment_control", "cert_pwd")
97        else:
98            self.cert_file = config.get("globals", "cert_file")
99            self.cert_pwd = config.get("globals", "cert_pwd")
100
101        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
102                or config.get("globals", "trusted_certs")
103
104        self.repodir = config.get("experiment_control", "repodir")
105        self.repo_url = config.get("experiment_control", "repo_url", 
106                "https://users.isi.deterlab.net:23235");
107
108        self.exp_stem = "fed-stem"
109        self.log = logging.getLogger("fedd.experiment_control")
110        set_log_level(config, "experiment_control", self.log)
111        self.muxmax = 2
112        self.nthreads = 10
113        self.randomize_experiments = False
114
115        self.splitter = None
116        self.ssh_keygen = "/usr/bin/ssh-keygen"
117        self.ssh_identity_file = None
118
119
120        self.debug = config.getboolean("experiment_control", "create_debug")
121        self.cleanup = not config.getboolean("experiment_control", 
122                "leave_tmpfiles")
123        self.state_filename = config.get("experiment_control", 
124                "experiment_state")
125        self.store_filename = config.get("experiment_control", 
126                "synch_store")
127        self.store_url = config.get("experiment_control", "store_url")
128        self.splitter_url = config.get("experiment_control", "ns2topdl_uri")
129        self.grouper_url = config.get("experiment_control", "grouper_url")
130        self.fedkit = parse_tarfile_list(\
131                config.get("experiment_control", "fedkit"))
132        self.gatewaykit = parse_tarfile_list(\
133                config.get("experiment_control", "gatewaykit"))
134
135        dt = config.get("experiment_control", "direct_transit")
136        self.auth_type = config.get('experiment_control', 'auth_type') \
137                or 'legacy'
138        self.auth_dir = config.get('experiment_control', 'auth_dir')
139        # XXX: document this!
140        self.info_cache_limit = \
141                config.getint('experiment_control', 'info_cache', 600)
142        if dt: self.direct_transit = [ tb.strip() for tb in dt.split(",")]
143        else: self.direct_transit = [ ]
144        # NB for internal master/slave ops, not experiment setup
145        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
146
147        self.overrides = set([])
148        ovr = config.get('experiment_control', 'overrides')
149        if ovr:
150            for o in ovr.split(","):
151                o = o.strip()
152                if o.startswith('fedid:'): o = o[len('fedid:'):]
153                self.overrides.add(fedid(hexstr=o))
154
155        self.state = { }
156        self.state_lock = Lock()
157        self.tclsh = "/usr/local/bin/otclsh"
158        self.tcl_splitter = config.get("ns2topdl", "tcl_splitter") or \
159                config.get("experiment_control", "tcl_splitter",
160                        "/usr/testbed/lib/ns2ir/parse.tcl")
161        mapdb_file = config.get("experiment_control", "mapdb")
162        self.trace_file = sys.stderr
163
164        self.def_expstart = \
165                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
166                "/tmp/federate";
167        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
168                "FEDDIR/hosts";
169        self.def_gwstart = \
170                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
171                "/tmp/bridge.log";
172        self.def_mgwstart = \
173                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
174                "/tmp/bridge.log";
175        self.def_gwimage = "FBSD61-TUNNEL2";
176        self.def_gwtype = "pc";
177        self.local_access = { }
178
179        if self.auth_type == 'legacy':
180            if auth:
181                self.auth = auth
182            else:
183                self.log.error( "[access]: No authorizer initialized, " +\
184                        "creating local one.")
185                auth = authorizer()
186            self.get_access = self.legacy_get_access
187        elif self.auth_type == 'abac':
188            self.auth = abac_authorizer(load=self.auth_dir, 
189                    update=os.path.join(self.auth_dir,'update'))
190        else:
191            raise service_error(service_error.internal, 
192                    "Unknown auth_type: %s" % self.auth_type)
193
194        if mapdb_file:
195            self.read_mapdb(mapdb_file)
196        else:
197            self.log.warn("[experiment_control] No testbed map, using defaults")
198            self.tbmap = { 
199                    'deter':'https://users.isi.deterlab.net:23235',
200                    'emulab':'https://users.isi.deterlab.net:23236',
201                    'ucb':'https://users.isi.deterlab.net:23237',
202                    }
203
204        # Grab saved state.  OK to do this w/o locking because it's read only
205        # and only one thread should be in existence that can see self.state at
206        # this point.
207        if self.state_filename:
208            self.read_state()
209
210        if self.store_filename:
211            self.read_store()
212        else:
213            self.log.warning("No saved synch store")
214            self.synch_store = synch_store
215
216        # Dispatch tables
217        self.soap_services = {\
218                'New': soap_handler('New', self.new_experiment),
219                'Create': soap_handler('Create', self.create_experiment),
220                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
221                'Vis': soap_handler('Vis', self.get_vis),
222                'Info': soap_handler('Info', self.get_info),
223                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
224                'Operation': soap_handler('Operation', self.do_operation),
225                'Terminate': soap_handler('Terminate', 
226                    self.terminate_experiment),
227                'GetValue': soap_handler('GetValue', self.GetValue),
228                'SetValue': soap_handler('SetValue', self.SetValue),
229        }
230
231        self.xmlrpc_services = {\
232                'New': xmlrpc_handler('New', self.new_experiment),
233                'Create': xmlrpc_handler('Create', self.create_experiment),
234                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
235                'Vis': xmlrpc_handler('Vis', self.get_vis),
236                'Info': xmlrpc_handler('Info', self.get_info),
237                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
238                'Terminate': xmlrpc_handler('Terminate',
239                    self.terminate_experiment),
240                'Operation': xmlrpc_handler('Operation', self.do_operation),
241                'GetValue': xmlrpc_handler('GetValue', self.GetValue),
242                'SetValue': xmlrpc_handler('SetValue', self.SetValue),
243        }
244
245    # Call while holding self.state_lock
246    def write_state(self):
247        """
248        Write a new copy of experiment state after copying the existing state
249        to a backup.
250
251        State format is a simple pickling of the state dictionary.
252        """
253        if os.access(self.state_filename, os.W_OK):
254            copy_file(self.state_filename, \
255                    "%s.bak" % self.state_filename)
256        try:
257            f = open(self.state_filename, 'w')
258            pickle.dump(self.state, f)
259        except EnvironmentError, e:
260            self.log.error("Can't write file %s: %s" % \
261                    (self.state_filename, e))
262        except pickle.PicklingError, e:
263            self.log.error("Pickling problem: %s" % e)
264        except TypeError, e:
265            self.log.error("Pickling problem (TypeError): %s" % e)
266
267    @staticmethod
268    def get_alloc_ids(exp):
269        """
270        Used by read_store and read state.  This used to be worse.
271        """
272
273        return [ a.allocID for a in exp.get_all_allocations() ]
274
275
276    # Call while holding self.state_lock
277    def read_state(self):
278        """
279        Read a new copy of experiment state.  Old state is overwritten.
280
281        State format is a simple pickling of the state dictionary.
282        """
283       
284        try:
285            f = open(self.state_filename, "r")
286            self.state = pickle.load(f)
287            self.log.debug("[read_state]: Read state from %s" % \
288                    self.state_filename)
289        except EnvironmentError, e:
290            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
291                    % (self.state_filename, e))
292        except pickle.UnpicklingError, e:
293            self.log.warning(("[read_state]: No saved state: " + \
294                    "Unpickling failed: %s") % e)
295       
296        for s in self.state.values():
297            try:
298
299                eid = s.fedid
300                if eid : 
301                    if self.auth_type == 'legacy':
302                        # XXX: legacy
303                        # Give the owner rights to the experiment
304                        #self.auth.set_attribute(s['owner'], eid)
305                        # And holders of the eid as well
306                        self.auth.set_attribute(eid, eid)
307                        # allow overrides to control experiments as well
308                        for o in self.overrides:
309                            self.auth.set_attribute(o, eid)
310                        # Set permissions to allow reading of the software
311                        # repo, if any, as well.
312                        for a in self.get_alloc_ids(s):
313                            self.auth.set_attribute(a, 'repo/%s' % eid)
314                else:
315                    raise KeyError("No experiment id")
316            except KeyError, e:
317                self.log.warning("[read_state]: State ownership or identity " +\
318                        "misformatted in %s: %s" % (self.state_filename, e))
319
320    def read_mapdb(self, file):
321        """
322        Read a simple colon separated list of mappings for the
323        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
324        also adds testbeds to active if they include , active after
325        their name.
326        """
327
328        self.tbmap = { }
329        self.tbactive = set()
330        lineno =0
331        try:
332            f = open(file, "r")
333            for line in f:
334                lineno += 1
335                line = line.strip()
336                if line.startswith('#') or len(line) == 0:
337                    continue
338                try:
339                    label, url = line.split(':', 1)
340                    if ',' in label:
341                        label, act = label.split(',', 1)
342                        active = (act.strip() == 'active')
343                    else:
344                        active = False
345                    self.tbmap[label] = url
346                    if active: self.tbactive.add(label)
347                except ValueError, e:
348                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
349                            "map db: %s %s" % (lineno, line, e))
350        except EnvironmentError, e:
351            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
352                    "open %s: %s" % (file, e))
353        else:
354            f.close()
355
356    def read_store(self):
357        try:
358            self.synch_store = synch_store()
359            self.synch_store.load(self.store_filename)
360            self.log.debug("[read_store]: Read store from %s" % \
361                    self.store_filename)
362        except EnvironmentError, e:
363            self.log.warning("[read_store]: No saved store: Can't open %s: %s"\
364                    % (self.state_filename, e))
365            self.synch_store = synch_store()
366
367        # Set the initial permissions on data in the store.  XXX: This ad hoc
368        # authorization attribute initialization is getting out of hand.
369        # XXX: legacy
370        if self.auth_type == 'legacy':
371            for k in self.synch_store.all_keys():
372                try:
373                    if k.startswith('fedid:'):
374                        fid = fedid(hexstr=k[6:46])
375                        if self.state.has_key(fid):
376                            for a in self.get_alloc_ids(self.state[fid]):
377                                self.auth.set_attribute(a, k)
378                except ValueError, e:
379                    self.log.warn("Cannot deduce permissions for %s" % k)
380
381
382    def write_store(self):
383        """
384        Write a new copy of synch_store after writing current state
385        to a backup.  We use the internal synch_store pickle method to avoid
386        incinsistent data.
387
388        State format is a simple pickling of the store.
389        """
390        if os.access(self.store_filename, os.W_OK):
391            copy_file(self.store_filename, \
392                    "%s.bak" % self.store_filename)
393        try:
394            self.synch_store.save(self.store_filename)
395        except EnvironmentError, e:
396            self.log.error("Can't write file %s: %s" % \
397                    (self.store_filename, e))
398        except TypeError, e:
399            self.log.error("Pickling problem (TypeError): %s" % e)
400
401    # XXX this may belong somewhere else
402
403    def get_grouper_updates(self, fid):
404        if self.grouper_url is None: return
405        d = tempfile.mkdtemp()
406        try:
407            fstr = "%s" % fid
408            # XXX locking
409            zipname = os.path.join(d, 'grouper.zip')
410            dest = os.path.join(self.auth_dir, 'update')
411            resp = urlopen('%s?uid=%s' % (self.grouper_url, fstr))
412            f = open(zipname, 'w')
413            f.write(resp.read())
414            f.close()
415            zf = zipfile.ZipFile(zipname, 'r')
416            zf.extractall(dest)
417            zf.close()
418        finally:
419            shutil.rmtree(d)
420
421
422    def remove_dirs(self, dir):
423        """
424        Remove the directory tree and all files rooted at dir.  Log any errors,
425        but continue.
426        """
427        self.log.debug("[removedirs]: removing %s" % dir)
428        try:
429            for path, dirs, files in os.walk(dir, topdown=False):
430                for f in files:
431                    os.remove(os.path.join(path, f))
432                for d in dirs:
433                    os.rmdir(os.path.join(path, d))
434            os.rmdir(dir)
435        except EnvironmentError, e:
436            self.log.error("Error deleting directory tree in %s" % e);
437
438    @staticmethod
439    def make_temp_certfile(expcert, tmpdir):
440        """
441        make a protected copy of the access certificate so the experiment
442        controller can act as the experiment principal.  mkstemp is the most
443        secure way to do that. The directory should be created by
444        mkdtemp.  Return the filename.
445        """
446        if expcert and tmpdir:
447            try:
448                certf, certfn = tempfile.mkstemp(suffix=".pem", dir=tmpdir)
449                f = os.fdopen(certf, 'w')
450                print >> f, expcert
451                f.close()
452            except EnvironmentError, e:
453                raise service_error(service_error.internal, 
454                        "Cannot create temp cert file?")
455            return certfn
456        else:
457            return None
458
459       
460    def generate_ssh_keys(self, dest, type="rsa" ):
461        """
462        Generate a set of keys for the gateways to use to talk.
463
464        Keys are of type type and are stored in the required dest file.
465        """
466        valid_types = ("rsa", "dsa")
467        t = type.lower();
468        if t not in valid_types: raise ValueError
469        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
470
471        try:
472            trace = open("/dev/null", "w")
473        except EnvironmentError:
474            raise service_error(service_error.internal,
475                    "Cannot open /dev/null??");
476
477        # May raise CalledProcessError
478        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
479        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
480        if rv != 0:
481            raise service_error(service_error.internal, 
482                    "Cannot generate nonce ssh keys.  %s return code %d" \
483                            % (self.ssh_keygen, rv))
484
485    def generate_seer_certs(self, destdir):
486        '''
487        Create a SEER ca cert and a node cert in destdir/ca.pem and
488        destdir/node.pem respectively.  These will be distributed throughout
489        the federated experiment.  This routine reports errors via
490        service_errors.
491        '''
492        openssl = '/usr/bin/openssl'
493        # All the filenames and parameters we need for openssl calls below
494        ca_key =os.path.join(destdir, 'ca.key') 
495        ca_pem = os.path.join(destdir, 'ca.pem')
496        node_key =os.path.join(destdir, 'node.key') 
497        node_pem = os.path.join(destdir, 'node.pem')
498        node_req = os.path.join(destdir, 'node.req')
499        node_signed = os.path.join(destdir, 'node.signed')
500        days = '%s' % (365 * 10)
501        serial = '%s' % random.randint(0, 1<<16)
502
503        try:
504            # Sequence of calls to create a CA key, create a ca cert, create a
505            # node key, node signing request, and finally a signed node
506            # certificate.
507            sequence = (
508                    (openssl, 'genrsa', '-out', ca_key, '1024'),
509                    (openssl, 'req', '-new', '-x509', '-key', ca_key, '-out', 
510                        ca_pem, '-days', days, '-subj', 
511                        '/C=US/ST=CA/O=DETER/OU=fedd/CN=CA' ),
512                    (openssl, 'genrsa', '-out', node_key, '1024'),
513                    (openssl, 'req', '-new', '-key', node_key, '-out', 
514                        node_req, '-days', days, '-subj', 
515                        '/C=US/ST=CA/O=DETER/OU=fedd/CN=node' ),
516                    (openssl, 'x509', '-CA', ca_pem, '-CAkey', ca_key, 
517                        '-set_serial', serial, '-req', '-in', node_req, 
518                        '-out', node_signed, '-days', days),
519                )
520            # Do all that stuff; bail if there's an error, and push all the
521            # output to dev/null.
522            for cmd in sequence:
523                trace = open("/dev/null", "w")
524                rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
525                if rv != 0:
526                    raise service_error(service_error.internal, 
527                            "Cannot generate SEER certs.  %s return code %d" \
528                                    % (' '.join(cmd), rv))
529            # Concatinate the node key and signed certificate into node.pem
530            f = open(node_pem, 'w')
531            for comp in (node_signed, node_key):
532                g = open(comp, 'r')
533                f.write(g.read())
534                g.close()
535            f.close()
536
537            # Throw out intermediaries.
538            for fn in (ca_key, node_key, node_req, node_signed):
539                os.unlink(fn)
540
541        except EnvironmentError, e:
542            # Any difficulties with the file system wind up here
543            raise service_error(service_error.internal,
544                    "File error on  %s while creating SEER certs: %s" % \
545                            (e.filename, e.strerror))
546
547
548
549    def gentopo(self, str):
550        """
551        Generate the topology data structure from the splitter's XML
552        representation of it.
553
554        The topology XML looks like:
555            <experiment>
556                <nodes>
557                    <node><vname></vname><ips>ip1:ip2</ips></node>
558                </nodes>
559                <lans>
560                    <lan>
561                        <vname></vname><vnode></vnode><ip></ip>
562                        <bandwidth></bandwidth><member>node:port</member>
563                    </lan>
564                </lans>
565        """
566        class topo_parse:
567            """
568            Parse the topology XML and create the dats structure.
569            """
570            def __init__(self):
571                # Typing of the subelements for data conversion
572                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
573                self.int_subelements = ( 'bandwidth',)
574                self.float_subelements = ( 'delay',)
575                # The final data structure
576                self.nodes = [ ]
577                self.lans =  [ ]
578                self.topo = { \
579                        'node': self.nodes,\
580                        'lan' : self.lans,\
581                    }
582                self.element = { }  # Current element being created
583                self.chars = ""     # Last text seen
584
585            def end_element(self, name):
586                # After each sub element the contents is added to the current
587                # element or to the appropriate list.
588                if name == 'node':
589                    self.nodes.append(self.element)
590                    self.element = { }
591                elif name == 'lan':
592                    self.lans.append(self.element)
593                    self.element = { }
594                elif name in self.str_subelements:
595                    self.element[name] = self.chars
596                    self.chars = ""
597                elif name in self.int_subelements:
598                    self.element[name] = int(self.chars)
599                    self.chars = ""
600                elif name in self.float_subelements:
601                    self.element[name] = float(self.chars)
602                    self.chars = ""
603
604            def found_chars(self, data):
605                self.chars += data.rstrip()
606
607
608        tp = topo_parse();
609        parser = xml.parsers.expat.ParserCreate()
610        parser.EndElementHandler = tp.end_element
611        parser.CharacterDataHandler = tp.found_chars
612
613        parser.Parse(str)
614
615        return tp.topo
616       
617
618    def genviz(self, topo):
619        """
620        Generate the visualization the virtual topology
621        """
622
623        neato = "/usr/local/bin/neato"
624        # These are used to parse neato output and to create the visualization
625        # file.
626        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="([\d\.]+),([\d\.]+)"')
627        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
628                "%s</type></node>"
629
630        try:
631            # Node names
632            nodes = [ n['vname'] for n in topo['node'] ]
633            topo_lans = topo['lan']
634        except KeyError, e:
635            raise service_error(service_error.internal, "Bad topology: %s" %e)
636
637        lans = { }
638        links = { }
639
640        # Walk through the virtual topology, organizing the connections into
641        # 2-node connections (links) and more-than-2-node connections (lans).
642        # When a lan is created, it's added to the list of nodes (there's a
643        # node in the visualization for the lan).
644        for l in topo_lans:
645            if links.has_key(l['vname']):
646                if len(links[l['vname']]) < 2:
647                    links[l['vname']].append(l['vnode'])
648                else:
649                    nodes.append(l['vname'])
650                    lans[l['vname']] = links[l['vname']]
651                    del links[l['vname']]
652                    lans[l['vname']].append(l['vnode'])
653            elif lans.has_key(l['vname']):
654                lans[l['vname']].append(l['vnode'])
655            else:
656                links[l['vname']] = [ l['vnode'] ]
657
658
659        # Open up a temporary file for dot to turn into a visualization
660        try:
661            df, dotname = tempfile.mkstemp()
662            dotfile = os.fdopen(df, 'w')
663        except EnvironmentError:
664            raise service_error(service_error.internal,
665                    "Failed to open file in genviz")
666
667        try:
668            dnull = open('/dev/null', 'w')
669        except EnvironmentError:
670            service_error(service_error.internal,
671                    "Failed to open /dev/null in genviz")
672
673        # Generate a dot/neato input file from the links, nodes and lans
674        try:
675            print >>dotfile, "graph G {"
676            for n in nodes:
677                print >>dotfile, '\t"%s"' % n
678            for l in links.keys():
679                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
680            for l in lans.keys():
681                for n in lans[l]:
682                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
683            print >>dotfile, "}"
684            dotfile.close()
685        except TypeError:
686            raise service_error(service_error.internal,
687                    "Single endpoint link in vtopo")
688        except EnvironmentError:
689            raise service_error(service_error.internal, "Cannot write dot file")
690
691        # Use dot to create a visualization
692        try:
693            dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005',
694                '-Gmaxiter=2000', '-Gpack=true', dotname], stdout=PIPE,
695                stderr=dnull, close_fds=True)
696        except EnvironmentError:
697            raise service_error(service_error.internal, 
698                    "Cannot generate visualization: is graphviz available?")
699        dnull.close()
700
701        # Translate dot to vis format
702        vis_nodes = [ ]
703        vis = { 'node': vis_nodes }
704        for line in dot.stdout:
705            m = vis_re.match(line)
706            if m:
707                vn = m.group(1)
708                vis_node = {'name': vn, \
709                        'x': float(m.group(2)),\
710                        'y' : float(m.group(3)),\
711                    }
712                if vn in links.keys() or vn in lans.keys():
713                    vis_node['type'] = 'lan'
714                else:
715                    vis_node['type'] = 'node'
716                vis_nodes.append(vis_node)
717        rv = dot.wait()
718
719        os.remove(dotname)
720        # XXX: graphviz seems to use low return codes for warnings, like
721        # "couldn't find font"
722        if rv < 2 : return vis
723        else: return None
724
725
726    def release_access(self, tb, aid, tbmap=None, uri=None, cert_file=None,
727            cert_pwd=None):
728        """
729        Release access to testbed through fedd
730        """
731
732        if not uri and tbmap:
733            uri = tbmap.get(tb, None)
734        if not uri:
735            raise service_error(service_error.server_config, 
736                    "Unknown testbed: %s" % tb)
737
738        if self.local_access.has_key(uri):
739            resp = self.local_access[uri].ReleaseAccess(\
740                    { 'ReleaseAccessRequestBody' : 
741                        {'allocID': {'fedid': aid}},}, 
742                    fedid(file=cert_file))
743            resp = { 'ReleaseAccessResponseBody': resp } 
744        else:
745            resp = self.call_ReleaseAccess(uri, {'allocID': {'fedid': aid} },
746                    cert_file, cert_pwd, self.trusted_certs)
747
748        # better error coding
749
750    def remote_ns2topdl(self, uri, desc):
751
752        req = {
753                'description' : { 'ns2description': desc },
754            }
755
756        r = self.call_Ns2Topdl(uri, req, self.cert_file, self.cert_pwd, 
757                self.trusted_certs)
758
759        if r.has_key('Ns2TopdlResponseBody'):
760            r = r['Ns2TopdlResponseBody']
761            ed = r.get('experimentdescription', None)
762            if ed.has_key('topdldescription'):
763                return topdl.Topology(**ed['topdldescription'])
764            else:
765                raise service_error(service_error.protocol, 
766                        "Bad splitter response (no output)")
767        else:
768            raise service_error(service_error.protocol, "Bad splitter response")
769
770    class start_segment:
771        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
772                cert_pwd=None, trusted_certs=None, caller=None,
773                log_collector=None):
774            self.log = log
775            self.debug = debug
776            self.cert_file = cert_file
777            self.cert_pwd = cert_pwd
778            self.trusted_certs = None
779            self.caller = caller
780            self.testbed = testbed
781            self.log_collector = log_collector
782            self.response = None
783            self.node = { }
784            self.subs = { }
785            self.tb = { }
786            self.proof = None
787
788        def make_map(self, resp):
789            if 'segmentdescription' not in resp  or \
790                    'topdldescription' not in resp['segmentdescription']:
791                self.log.warn('No topology returned from startsegment')
792                return 
793
794            top = topdl.Topology(
795                    **resp['segmentdescription']['topdldescription'])
796
797            for e in top.elements:
798                if isinstance(e, topdl.Computer):
799                    self.node[e.name] = e
800                elif isinstance(e, topdl.Testbed):
801                    self.tb[e.uri] = e
802            for s in top.substrates:
803                self.subs[s.name] = s
804
805        def __call__(self, uri, aid, topo, masters, attrs=None, connInfo=None):
806            req = {
807                    'allocID': { 'fedid' : aid }, 
808                    'segmentdescription': { 
809                        'topdldescription': topo.to_dict(),
810                    },
811                }
812
813            if connInfo:
814                req['connection'] = connInfo
815
816            import_svcs = [ s for m in masters.values() \
817                    for s in m if self.testbed in s.importers]
818
819            if import_svcs or self.testbed in masters:
820                req['service'] = []
821
822            for s in import_svcs:
823                for r in s.reqs:
824                    sr = copy.deepcopy(r)
825                    sr['visibility'] = 'import';
826                    req['service'].append(sr)
827
828            for s in masters.get(self.testbed, []):
829                for r in s.reqs:
830                    sr = copy.deepcopy(r)
831                    sr['visibility'] = 'export';
832                    req['service'].append(sr)
833
834            if attrs:
835                req['fedAttr'] = attrs
836
837            try:
838                self.log.debug("Calling StartSegment at %s " % uri)
839                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
840                        self.trusted_certs)
841                if r.has_key('StartSegmentResponseBody'):
842                    lval = r['StartSegmentResponseBody'].get('allocationLog',
843                            None)
844                    if lval and self.log_collector:
845                        for line in  lval.splitlines(True):
846                            self.log_collector.write(line)
847                    self.make_map(r['StartSegmentResponseBody'])
848                    if 'proof' in r: self.proof = r['proof']
849                    self.response = r
850                else:
851                    raise service_error(service_error.internal, 
852                            "Bad response!?: %s" %r)
853                return True
854            except service_error, e:
855                self.log.error("Start segment failed on %s: %s" % \
856                        (self.testbed, e))
857                return False
858
859
860
861    class terminate_segment:
862        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
863                cert_pwd=None, trusted_certs=None, caller=None):
864            self.log = log
865            self.debug = debug
866            self.cert_file = cert_file
867            self.cert_pwd = cert_pwd
868            self.trusted_certs = None
869            self.caller = caller
870            self.testbed = testbed
871
872        def __call__(self, uri, aid ):
873            req = {
874                    'allocID': {'fedid': aid }, 
875                }
876            self.log.info("Calling terminate segment")
877            try:
878                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
879                        self.trusted_certs)
880                self.log.info("Terminate segment succeeded")
881                return True
882            except service_error, e:
883                self.log.error("Terminate segment failed on %s: %s" % \
884                        (self.testbed, e))
885                return False
886
887    class info_segment(start_segment):
888        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
889                cert_pwd=None, trusted_certs=None, caller=None,
890                log_collector=None):
891            experiment_control_local.start_segment.__init__(self, debug, 
892                    log, testbed, cert_file, cert_pwd, trusted_certs, 
893                    caller, log_collector)
894
895        def __call__(self, uri, aid):
896            req = { 'allocID': { 'fedid' : aid } }
897
898            try:
899                self.log.debug("Calling InfoSegment at %s " % uri)
900                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
901                        self.trusted_certs)
902                if r.has_key('InfoSegmentResponseBody'):
903                    self.make_map(r['InfoSegmentResponseBody'])
904                    if 'proof' in r: self.proof = r['proof']
905                    self.response = r
906                else:
907                    raise service_error(service_error.internal, 
908                            "Bad response!?: %s" %r)
909                return True
910            except service_error, e:
911                self.log.error("Info segment failed on %s: %s" % \
912                        (self.testbed, e))
913                return False
914
915    class operation_segment:
916        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
917                cert_pwd=None, trusted_certs=None, caller=None,
918                log_collector=None):
919            self.log = log
920            self.debug = debug
921            self.cert_file = cert_file
922            self.cert_pwd = cert_pwd
923            self.trusted_certs = None
924            self.caller = caller
925            self.testbed = testbed
926            self.status = None
927
928        def __call__(self, uri, aid, op, targets, params):
929            req = { 
930                    'allocID': { 'fedid' : aid },
931                    'operation': op,
932                    'target': targets,
933                    }
934            if params: req['parameter'] = params
935
936
937            try:
938                self.log.debug("Calling OperationSegment at %s " % uri)
939                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
940                        self.trusted_certs)
941                if 'OperationSegmentResponseBody' in r:
942                    r = r['OperationSegmentResponseBody']
943                    if 'status' in r:
944                        self.status = r['status']
945                else:
946                    raise service_error(service_error.internal, 
947                            "Bad response!?: %s" %r)
948                return True
949            except service_error, e:
950                self.log.error("Operation segment failed on %s: %s" % \
951                        (self.testbed, e))
952                return False
953
954    def annotate_topology(self, top, data):
955        # These routines do various parts of the annotation
956        def add_new_names(nl, l):
957            """ add any names in nl to the list in l """
958            for n in nl:
959                if n not in l: l.append(n)
960       
961        def merge_services(ne, e):
962            for ns in ne.service:
963                # NB: the else is on the for
964                for s in e.service:
965                    if ns.name == s.name:
966                        s.importer = ns.importer
967                        s.param = ns.param
968                        s.description = ns.description
969                        s.status = ns.status
970                        break
971                else:
972                    e.service.append(ns)
973       
974        def merge_oses(ne, e):
975            """
976            Merge the operating system entries of ne into e
977            """
978            for nos in ne.os:
979                # NB: the else is on the for
980                for os in e.os:
981                    if nos.name == os.name:
982                        os.version = nos.version
983                        os.version = nos.distribution
984                        os.version = nos.distributionversion
985                        for a in nos.attribute:
986                            if os.get_attribute(a.attribute):
987                                os.remove_attribute(a.attribute)
988                            os.set_attribute(a.attribute, a.value)
989                        break
990                else:
991                    # If both nodes have one OS, this is a replacement
992                    if len(ne.os) == 1 and len(e.os) == 1: e.os = ne.os
993                    else: e.os.append(nos)
994
995        # Annotate the topology with embedding info
996        for e in top.elements:
997            if isinstance(e, topdl.Computer):
998                for s in data:
999                    ne = s.node.get(e.name, None)
1000                    if ne is not None:
1001                        add_new_names(ne.localname, e.localname)
1002                        e.status = ne.status
1003                        merge_services(ne, e)
1004                        add_new_names(ne.operation, e.operation)
1005                        if ne.os: merge_oses(ne, e)
1006                        break
1007            elif isinstance(e,topdl.Testbed):
1008                for s in data:
1009                    ne = s.tb.get(e.uri, None)
1010                    if ne is not None:
1011                        add_new_names(ne.localname, e.localname)
1012                        add_new_names(ne.operation, e.operation)
1013                        merge_services(ne, e)
1014                        for a in ne.attribute:
1015                            e.set_attribute(a.attribute, a.value)
1016        # Annotate substrates
1017        for s in top.substrates:
1018            for d in data:
1019                ss = d.subs.get(s.name, None)
1020                if ss is not None:
1021                    if ss.capacity is not None:
1022                        s.capacity = ss.capacity
1023                    if s.latency is not None:
1024                        s.latency = ss.latency
1025
1026
1027
1028    def allocate_resources(self, allocated, masters, eid, expid, 
1029            tbparams, top, topo, tmpdir, alloc_log=None, log_collector=None, 
1030            attrs=None, connInfo={}, tbmap=None, expcert=None):
1031
1032        started = { }           # Testbeds where a sub-experiment started
1033                                # successfully
1034
1035        # XXX
1036        fail_soft = False
1037
1038        if tbmap is None: tbmap = { }
1039
1040        log = alloc_log or self.log
1041
1042        tp = thread_pool(self.nthreads)
1043        threads = [ ]
1044        starters = [ ]
1045
1046        if expcert:
1047            cert = expcert
1048            pw = None
1049        else:
1050            cert = self.cert_file
1051            pw = self.cert_pwd
1052
1053        for tb in allocated.keys():
1054            # Create and start a thread to start the segment, and save it
1055            # to get the return value later
1056            tb_attrs = copy.copy(attrs)
1057            tp.wait_for_slot()
1058            uri = tbparams[tb].uri or tbmap.get(testbed_base(tb), None)
1059            base, suffix = split_testbed(tb)
1060            if suffix:
1061                tb_attrs.append({'attribute': 'experiment_name', 
1062                    'value': "%s-%s" % (eid, suffix)})
1063            else:
1064                tb_attrs.append({'attribute': 'experiment_name', 'value': eid})
1065            if not uri:
1066                raise service_error(service_error.internal, 
1067                        "Unknown testbed %s !?" % tb)
1068
1069            aid = tbparams[tb].allocID
1070            if not aid:
1071                raise service_error(service_error.internal, 
1072                        "No alloc id for testbed %s !?" % tb)
1073
1074            s = self.start_segment(log=log, debug=self.debug,
1075                    testbed=tb, cert_file=cert,
1076                    cert_pwd=pw, trusted_certs=self.trusted_certs,
1077                    caller=self.call_StartSegment,
1078                    log_collector=log_collector)
1079            starters.append(s)
1080            t  = pooled_thread(\
1081                    target=s, name=tb,
1082                    args=(uri, aid, topo[tb], masters, tb_attrs, connInfo[tb]),
1083                    pdata=tp, trace_file=self.trace_file)
1084            threads.append(t)
1085            t.start()
1086
1087        # Wait until all finish (keep pinging the log, though)
1088        mins = 0
1089        revoked = False
1090        while not tp.wait_for_all_done(60.0):
1091            mins += 1
1092            alloc_log.info("Waiting for sub threads (it has been %d mins)" \
1093                    % mins)
1094            if not revoked and \
1095                    len([ t.getName() for t in threads if t.rv == False]) > 0:
1096                # a testbed has failed.  Revoke this experiment's
1097                # synchronizarion values so that sub experiments will not
1098                # deadlock waiting for synchronization that will never happen
1099                self.log.info("A subexperiment has failed to swap in, " + \
1100                        "revoking synch keys")
1101                var_key = "fedid:%s" % expid
1102                for k in self.synch_store.all_keys():
1103                    if len(k) > 45 and k[0:46] == var_key:
1104                        self.synch_store.revoke_key(k)
1105                revoked = True
1106
1107        failed = [ t.getName() for t in threads if not t.rv ]
1108        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1109
1110        # If one failed clean up, unless fail_soft is set
1111        if failed:
1112            if not fail_soft:
1113                tp.clear()
1114                for tb in succeeded:
1115                    # Create and start a thread to stop the segment
1116                    tp.wait_for_slot()
1117                    uri = tbparams[tb].uri
1118                    t  = pooled_thread(\
1119                            target=self.terminate_segment(log=log,
1120                                testbed=tb,
1121                                cert_file=cert, 
1122                                cert_pwd=pw,
1123                                trusted_certs=self.trusted_certs,
1124                                caller=self.call_TerminateSegment),
1125                            args=(uri, tbparams[tb].allocID),
1126                            name=tb,
1127                            pdata=tp, trace_file=self.trace_file)
1128                    t.start()
1129                # Wait until all finish (if any are being stopped)
1130                if succeeded:
1131                    tp.wait_for_all_done()
1132
1133                # release the allocations
1134                for tb in tbparams.keys():
1135                    try:
1136                        self.release_access(tb, tbparams[tb].allocID, 
1137                                tbmap=tbmap, uri=tbparams[tb].uri,
1138                                cert_file=cert, cert_pwd=pw)
1139                    except service_error, e:
1140                        self.log.warn("Error releasing access: %s" % e.desc)
1141                # Remove the placeholder
1142                self.state_lock.acquire()
1143                self.state[eid].status = 'failed'
1144                self.state[eid].updated()
1145                if self.state_filename: self.write_state()
1146                self.state_lock.release()
1147                # Remove the repo dir
1148                self.remove_dirs("%s/%s" %(self.repodir, expid))
1149                # Walk up tmpdir, deleting as we go
1150                if self.cleanup:
1151                    self.remove_dirs(tmpdir)
1152                else:
1153                    log.debug("[start_experiment]: not removing %s" % tmpdir)
1154
1155
1156                log.error("Swap in failed on %s" % ",".join(failed))
1157                return
1158        else:
1159            # Walk through the successes and gather the proofs
1160            proofs = { }
1161            for s in starters:
1162                if s.proof:
1163                    proofs[s.testbed] = s.proof
1164            self.annotate_topology(top, starters)
1165            log.info("[start_segment]: Experiment %s active" % eid)
1166
1167
1168        # Walk up tmpdir, deleting as we go
1169        if self.cleanup:
1170            self.remove_dirs(tmpdir)
1171        else:
1172            log.debug("[start_experiment]: not removing %s" % tmpdir)
1173
1174        # Insert the experiment into our state and update the disk copy.
1175        self.state_lock.acquire()
1176        self.state[expid].status = 'active'
1177        self.state[eid] = self.state[expid]
1178        self.state[eid].top = top
1179        self.state[eid].updated()
1180        # Append startup proofs
1181        for f in self.state[eid].get_all_allocations():
1182            if f.tb in proofs:
1183                f.proof.append(proofs[f.tb])
1184
1185        if self.state_filename: self.write_state()
1186        self.state_lock.release()
1187        return
1188
1189
1190    def add_kit(self, e, kit):
1191        """
1192        Add a Software object created from the list of (install, location)
1193        tuples passed as kit  to the software attribute of an object e.  We
1194        do this enough to break out the code, but it's kind of a hack to
1195        avoid changing the old tuple rep.
1196        """
1197
1198        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1199
1200        if isinstance(e.software, list): e.software.extend(s)
1201        else: e.software = s
1202
1203    def append_experiment_authorization(self, expid, attrs, 
1204            need_state_lock=True):
1205        """
1206        Append the authorization information to system state
1207        """
1208
1209        for p, a in attrs:
1210            self.auth.set_attribute(p, a)
1211        self.auth.save()
1212
1213        if need_state_lock: self.state_lock.acquire()
1214        # XXX: really a no op?
1215        #self.state[expid]['auth'].update(attrs)
1216        if self.state_filename: self.write_state()
1217        if need_state_lock: self.state_lock.release()
1218
1219    def clear_experiment_authorization(self, expid, need_state_lock=True):
1220        """
1221        Attrs is a set of attribute principal pairs that need to be removed
1222        from the authenticator.  Remove them and save the authenticator.
1223        """
1224
1225        if need_state_lock: self.state_lock.acquire()
1226        # XXX: should be a no-op
1227        #if expid in self.state and 'auth' in self.state[expid]:
1228            #for p, a in self.state[expid]['auth']:
1229                #self.auth.unset_attribute(p, a)
1230            #self.state[expid]['auth'] = set()
1231        if self.state_filename: self.write_state()
1232        if need_state_lock: self.state_lock.release()
1233        self.auth.save()
1234
1235
1236    def create_experiment_state(self, fid, req, expid, expcert,
1237            state='starting'):
1238        """
1239        Create the initial entry in the experiment's state.  The expid and
1240        expcert are the experiment's fedid and certifacte that represents that
1241        ID, which are installed in the experiment state.  If the request
1242        includes a suggested local name that is used if possible.  If the local
1243        name is already taken by an experiment owned by this user that has
1244        failed, it is overwritten.  Otherwise new letters are added until a
1245        valid localname is found.  The generated local name is returned.
1246        """
1247
1248        if req.has_key('experimentID') and \
1249                req['experimentID'].has_key('localname'):
1250            overwrite = False
1251            eid = req['experimentID']['localname']
1252            # If there's an old failed experiment here with the same local name
1253            # and accessible by this user, we'll overwrite it, otherwise we'll
1254            # fall through and do the collision avoidance.
1255            old_expid = self.get_experiment_fedid(eid)
1256            if old_expid:
1257                users_experiment = True
1258                try:
1259                    self.check_experiment_access(fid, old_expid)
1260                except service_error, e:
1261                    if e.code == service_error.access: users_experiment = False
1262                    else: raise e
1263                if users_experiment:
1264                    self.state_lock.acquire()
1265                    status = self.state[eid].status
1266                    if status and status == 'failed':
1267                        # remove the old access attributes
1268                        self.clear_experiment_authorization(eid,
1269                                need_state_lock=False)
1270                        overwrite = True
1271                        del self.state[eid]
1272                        del self.state[old_expid]
1273                    self.state_lock.release()
1274                else:
1275                    self.log.info('Experiment %s exists, ' % eid + \
1276                            'but this user cannot access it')
1277            self.state_lock.acquire()
1278            while (self.state.has_key(eid) and not overwrite):
1279                eid += random.choice(string.ascii_letters)
1280            # Initial state
1281            self.state[eid] = experiment_info(fedid=expid, localname=eid, 
1282                    identity=expcert)
1283            self.state[expid] = self.state[eid]
1284            if self.state_filename: self.write_state()
1285            self.state_lock.release()
1286        else:
1287            eid = self.exp_stem
1288            for i in range(0,5):
1289                eid += random.choice(string.ascii_letters)
1290            self.state_lock.acquire()
1291            while (self.state.has_key(eid)):
1292                eid = self.exp_stem
1293                for i in range(0,5):
1294                    eid += random.choice(string.ascii_letters)
1295            # Initial state
1296            self.state[eid] = experiment_info(fedid=expid, localname=eid, 
1297                    identity=expcert)
1298            self.state[expid] = self.state[eid]
1299            if self.state_filename: self.write_state()
1300            self.state_lock.release()
1301
1302        # Let users touch the state.  Authorize this fid and the expid itself
1303        # to touch the experiment, as well as allowing th eoverrides.
1304        self.append_experiment_authorization(eid, 
1305                set([(fid, expid), (expid,expid)] + \
1306                        [ (o, expid) for o in self.overrides]))
1307
1308        return eid
1309
1310
1311    def allocate_ips_to_topo(self, top):
1312        """
1313        Add an ip4_address attribute to all the hosts in the topology, based on
1314        the shared substrates on which they sit.  An /etc/hosts file is also
1315        created and returned as a list of hostfiles entries.  We also return
1316        the allocator, because we may need to allocate IPs to portals
1317        (specifically DRAGON portals).
1318        """
1319        subs = sorted(top.substrates, 
1320                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1321                reverse=True)
1322        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1323        ifs = { }
1324        hosts = [ ]
1325
1326        for idx, s in enumerate(subs):
1327            net_size = len(s.interfaces)+2
1328
1329            a = ips.allocate(net_size)
1330            if a :
1331                base, num = a
1332                if num < net_size: 
1333                    raise service_error(service_error.internal,
1334                            "Allocator returned wrong number of IPs??")
1335            else:
1336                raise service_error(service_error.req, 
1337                        "Cannot allocate IP addresses")
1338            mask = ips.min_alloc
1339            while mask < net_size:
1340                mask *= 2
1341
1342            netmask = ((2**32-1) ^ (mask-1))
1343
1344            base += 1
1345            for i in s.interfaces:
1346                i.attribute.append(
1347                        topdl.Attribute('ip4_address', 
1348                            "%s" % ip_addr(base)))
1349                i.attribute.append(
1350                        topdl.Attribute('ip4_netmask', 
1351                            "%s" % ip_addr(int(netmask))))
1352
1353                hname = i.element.name
1354                if ifs.has_key(hname):
1355                    hosts.append("%s\t%s-%s %s-%d" % \
1356                            (ip_addr(base), hname, s.name, hname,
1357                                ifs[hname]))
1358                else:
1359                    ifs[hname] = 0
1360                    hosts.append("%s\t%s-%s %s-%d %s" % \
1361                            (ip_addr(base), hname, s.name, hname,
1362                                ifs[hname], hname))
1363
1364                ifs[hname] += 1
1365                base += 1
1366        return hosts, ips
1367
1368    def get_access_to_testbeds(self, testbeds, fid, allocated, 
1369            tbparam, masters, tbmap, expid=None, expcert=None):
1370        for tb in testbeds:
1371            self.get_access(tb, tbparam, fid, masters, tbmap, expid,
1372                    expcert)
1373            allocated[tb] = 1
1374
1375    def get_access(self, tb, tbparam,fid, masters, tbmap, expid=None, 
1376            expcert=None):
1377        """
1378        Get access to testbed through fedd and set the parameters for that tb
1379        """
1380        def get_export_project(svcs):
1381            """
1382            Look through for the list of federated_service for this testbed
1383            objects for a project_export service, and extract the project
1384            parameter.
1385            """
1386
1387            pe = [s for s in svcs if s.name=='project_export']
1388            if len(pe) == 1:
1389                return pe[0].params.get('project', None)
1390            elif len(pe) == 0:
1391                return None
1392            else:
1393                raise service_error(service_error.req,
1394                        "More than one project export is not supported")
1395
1396        def add_services(svcs, type, slist, keys):
1397            """
1398            Add the given services to slist.  type is import or export.  Also
1399            add a mapping entry from the assigned id to the original service
1400            record.
1401            """
1402            for i, s in enumerate(svcs):
1403                idx = '%s%d' % (type, i)
1404                keys[idx] = s
1405                sr = {'id': idx, 'name': s.name, 'visibility': type }
1406                if s.params:
1407                    sr['fedAttr'] = [ { 'attribute': k, 'value': v } \
1408                            for k, v in s.params.items()]
1409                slist.append(sr)
1410
1411        uri = tbmap.get(testbed_base(tb), None)
1412        if not uri:
1413            raise service_error(service_error.server_config, 
1414                    "Unknown testbed: %s" % tb)
1415
1416        export_svcs = masters.get(tb,[])
1417        import_svcs = [ s for m in masters.values() \
1418                for s in m \
1419                    if tb in s.importers ]
1420
1421        export_project = get_export_project(export_svcs)
1422        # Compose the credential list so that IDs come before attributes
1423        creds = set()
1424        keys = set()
1425        certs = self.auth.get_creds_for_principal(fid)
1426        # Append credenials about this experiment controller - e.g. that it is
1427        # trusted.
1428        certs.update(self.auth.get_creds_for_principal(
1429            fedid(file=self.cert_file)))
1430        if expid:
1431            certs.update(self.auth.get_creds_for_principal(expid))
1432        for c in certs:
1433            keys.add(c.issuer_cert())
1434            creds.add(c.attribute_cert())
1435        creds = list(keys) + list(creds)
1436
1437        if expcert: cert, pw = expcert, None
1438        else: cert, pw = self.cert_file, self.cert_pw
1439
1440        # Request credentials
1441        req = {
1442                'abac_credential': creds,
1443            }
1444        # Make the service request from the services we're importing and
1445        # exporting.  Keep track of the export request ids so we can
1446        # collect the resulting info from the access response.
1447        e_keys = { }
1448        if import_svcs or export_svcs:
1449            slist = []
1450            add_services(import_svcs, 'import', slist, e_keys)
1451            add_services(export_svcs, 'export', slist, e_keys)
1452            req['service'] = slist
1453
1454        if self.local_access.has_key(uri):
1455            # Local access call
1456            req = { 'RequestAccessRequestBody' : req }
1457            r = self.local_access[uri].RequestAccess(req, 
1458                    fedid(file=self.cert_file))
1459            r = { 'RequestAccessResponseBody' : r }
1460        else:
1461            r = self.call_RequestAccess(uri, req, cert, pw, self.trusted_certs)
1462
1463        if r.has_key('RequestAccessResponseBody'):
1464            # Through to here we have a valid response, not a fault.
1465            # Access denied is a fault, so something better or worse than
1466            # access denied has happened.
1467            r = r['RequestAccessResponseBody']
1468            self.log.debug("[get_access] Access granted")
1469        else:
1470            raise service_error(service_error.protocol,
1471                        "Bad proxy response")
1472        if 'proof' not in r:
1473            raise service_error(service_error.protocol,
1474                        "Bad access response (no access proof)")
1475
1476        tbparam[tb] = allocation_info(allocID=r['allocID'].get('fedid', None),
1477                tb=tb, uri=uri, proof=[r['proof']], 
1478                services=masters.get(tb, None))
1479
1480        # Collect the responses corresponding to the services this testbed
1481        # exports.  These will be the service requests that we will include in
1482        # the start segment requests (with appropriate visibility values) to
1483        # import and export the segments.
1484        for s in r.get('service', []):
1485            id = s.get('id', None)
1486            # Note that this attaches the response to the object in the masters
1487            # data structure.  (The e_keys index disappears when this fcn
1488            # returns)
1489            if id and id in e_keys:
1490                e_keys[id].reqs.append(s)
1491
1492        # Add attributes to parameter space.  We don't allow attributes to
1493        # overlay any parameters already installed.
1494        for a in r.get('fedAttr', []):
1495            try:
1496                if a['attribute']:
1497                    tbparam[tb].set_attribute(a['attribute'], a['value'])
1498            except KeyError:
1499                self.log.error("Bad attribute in response: %s" % a)
1500
1501
1502    def split_topology(self, top, topo, testbeds):
1503        """
1504        Create the sub-topologies that are needed for experiment instantiation.
1505        """
1506        for tb in testbeds:
1507            topo[tb] = top.clone()
1508            # copy in for loop allows deletions from the original
1509            for e in [ e for e in topo[tb].elements]:
1510                etb = e.get_attribute('testbed')
1511                # NB: elements without a testbed attribute won't appear in any
1512                # sub topologies. 
1513                if not etb or etb != tb:
1514                    for i in e.interface:
1515                        for s in i.subs:
1516                            try:
1517                                s.interfaces.remove(i)
1518                            except ValueError:
1519                                raise service_error(service_error.internal,
1520                                        "Can't remove interface??")
1521                    topo[tb].elements.remove(e)
1522            topo[tb].make_indices()
1523
1524    def confirm_software(self, top):
1525        """
1526        Make sure that the software to be loaded in the topo is all available
1527        before we begin making access requests, etc.  This is a subset of
1528        wrangle_software.
1529        """
1530        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1531        pkgs.update([x.location for e in top.elements for x in e.software])
1532
1533        for pkg in pkgs:
1534            loc = pkg
1535
1536            scheme, host, path = urlparse(loc)[0:3]
1537            dest = os.path.basename(path)
1538            if not scheme:
1539                if not loc.startswith('/'):
1540                    loc = "/%s" % loc
1541                loc = "file://%s" %loc
1542            # NB: if scheme was found, loc == pkg
1543            try:
1544                u = urlopen(loc)
1545                u.close()
1546            except Exception, e:
1547                raise service_error(service_error.req, 
1548                        "Cannot open %s: %s" % (loc, e))
1549        return True
1550
1551    def wrangle_software(self, expid, top, topo, tbparams):
1552        """
1553        Copy software out to the repository directory, allocate permissions and
1554        rewrite the segment topologies to look for the software in local
1555        places.
1556        """
1557
1558        # Copy the rpms and tarfiles to a distribution directory from
1559        # which the federants can retrieve them
1560        linkpath = "%s/software" %  expid
1561        softdir ="%s/%s" % ( self.repodir, linkpath)
1562        softmap = { }
1563
1564        # self.fedkit and self.gateway kit are lists of tuples of
1565        # (install_location, download_location) this extracts the download
1566        # locations.
1567        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1568        pkgs.update([x.location for e in top.elements for x in e.software])
1569        try:
1570            os.makedirs(softdir)
1571        except EnvironmentError, e:
1572            raise service_error(
1573                    "Cannot create software directory: %s" % e)
1574        # The actual copying.  Everything's converted into a url for copying.
1575        auth_attrs = set()
1576        for pkg in pkgs:
1577            loc = pkg
1578
1579            scheme, host, path = urlparse(loc)[0:3]
1580            dest = os.path.basename(path)
1581            if not scheme:
1582                if not loc.startswith('/'):
1583                    loc = "/%s" % loc
1584                loc = "file://%s" %loc
1585            # NB: if scheme was found, loc == pkg
1586            try:
1587                u = urlopen(loc)
1588            except Exception, e:
1589                raise service_error(service_error.req, 
1590                        "Cannot open %s: %s" % (loc, e))
1591            try:
1592                f = open("%s/%s" % (softdir, dest) , "w")
1593                self.log.debug("Writing %s/%s" % (softdir,dest) )
1594                data = u.read(4096)
1595                while data:
1596                    f.write(data)
1597                    data = u.read(4096)
1598                f.close()
1599                u.close()
1600            except Exception, e:
1601                raise service_error(service_error.internal,
1602                        "Could not copy %s: %s" % (loc, e))
1603            path = re.sub("/tmp", "", linkpath)
1604            # XXX
1605            softmap[pkg] = \
1606                    "%s/%s/%s" %\
1607                    ( self.repo_url, path, dest)
1608
1609            # Allow the individual segments to access the software by assigning
1610            # an attribute to each testbed allocation that encodes the data to
1611            # be released.  This expression collects the data for each run of
1612            # the loop.
1613            auth_attrs.update([
1614                (tbparams[tb].allocID, "/%s/%s" % ( path, dest)) \
1615                        for tb in tbparams.keys()])
1616
1617        self.append_experiment_authorization(expid, auth_attrs)
1618
1619        # Convert the software locations in the segments into the local
1620        # copies on this host
1621        for soft in [ s for tb in topo.values() \
1622                for e in tb.elements \
1623                    if getattr(e, 'software', False) \
1624                        for s in e.software ]:
1625            if softmap.has_key(soft.location):
1626                soft.location = softmap[soft.location]
1627
1628
1629    def new_experiment(self, req, fid):
1630        """
1631        The external interface to empty initial experiment creation called from
1632        the dispatcher.
1633
1634        Creates a working directory, splits the incoming description using the
1635        splitter script and parses out the avrious subsections using the
1636        lcasses above.  Once each sub-experiment is created, use pooled threads
1637        to instantiate them and start it all up.
1638        """
1639        self.log.info("New experiment call started for %s" % fid)
1640        req = req.get('NewRequestBody', None)
1641        if not req:
1642            raise service_error(service_error.req,
1643                    "Bad request format (no NewRequestBody)")
1644
1645        # import may partially succeed so always save credentials and warn
1646        if not self.auth.import_credentials(data_list=req.get('credential', [])):
1647            self.log.debug("Failed to import delegation credentials(!)")
1648        self.get_grouper_updates(fid)
1649        self.auth.update()
1650        self.auth.save()
1651
1652        try:
1653            access_ok, proof = self.auth.check_attribute(fid, 'new', 
1654                    with_proof=True)
1655        except service_error, e:
1656            self.log.info("New experiment call for %s: access denied" % fid)
1657            raise e
1658
1659
1660        if not access_ok:
1661            self.log.info("New experiment call for %s: Access denied" % fid)
1662            raise service_error(service_error.access, "New access denied",
1663                    proof=[proof])
1664
1665        try:
1666            tmpdir = tempfile.mkdtemp(prefix="split-")
1667        except EnvironmentError:
1668            raise service_error(service_error.internal, "Cannot create tmp dir")
1669
1670        # Generate an ID for the experiment (slice) and a certificate that the
1671        # allocator can use to prove they own it.  We'll ship it back through
1672        # the encrypted connection.  If the requester supplied one, use it.
1673        if 'experimentAccess' in req and 'X509' in req['experimentAccess']:
1674            expcert = req['experimentAccess']['X509']
1675            expid = fedid(certstr=expcert)
1676            self.state_lock.acquire()
1677            if expid in self.state:
1678                self.state_lock.release()
1679                raise service_error(service_error.req, 
1680                        'fedid %s identifies an existing experiment' % expid)
1681            self.state_lock.release()
1682        else:
1683            (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1684
1685        #now we're done with the tmpdir, and it should be empty
1686        if self.cleanup:
1687            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1688            os.rmdir(tmpdir)
1689        else:
1690            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1691
1692        eid = self.create_experiment_state(fid, req, expid, expcert, 
1693                state='empty')
1694
1695        rv = {
1696                'experimentID': [
1697                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1698                ],
1699                'experimentStatus': 'empty',
1700                'experimentAccess': { 'X509' : expcert },
1701                'proof': proof.to_dict(),
1702            }
1703
1704        self.log.info("New experiment call succeeded for %s" % fid)
1705        return rv
1706
1707    # create_experiment sub-functions
1708
1709    @staticmethod
1710    def get_experiment_key(req, field='experimentID'):
1711        """
1712        Parse the experiment identifiers out of the request (the request body
1713        tag has been removed).  Specifically this pulls either the fedid or the
1714        localname out of the experimentID field.  A fedid is preferred.  If
1715        neither is present or the request does not contain the fields,
1716        service_errors are raised.
1717        """
1718        # Get the experiment access
1719        exp = req.get(field, None)
1720        if exp:
1721            if exp.has_key('fedid'):
1722                key = exp['fedid']
1723            elif exp.has_key('localname'):
1724                key = exp['localname']
1725            else:
1726                raise service_error(service_error.req, "Unknown lookup type")
1727        else:
1728            raise service_error(service_error.req, "No request?")
1729
1730        return key
1731
1732    def get_experiment_ids_and_start(self, key, tmpdir):
1733        """
1734        Get the experiment name, id and access certificate from the state, and
1735        set the experiment state to 'starting'.  returns a triple (fedid,
1736        localname, access_cert_file). The access_cert_file is a copy of the
1737        contents of the access certificate, created in the tempdir with
1738        restricted permissions.  If things are confused, raise an exception.
1739        """
1740
1741        expid = eid = None
1742        self.state_lock.acquire()
1743        if key in self.state:
1744            exp = self.state[key]
1745            exp.status = "starting"
1746            exp.updated()
1747            expid = exp.fedid
1748            eid = exp.localname
1749            expcert = exp.identity
1750        self.state_lock.release()
1751
1752        # make a protected copy of the access certificate so the experiment
1753        # controller can act as the experiment principal.
1754        if expcert:
1755            expcert_file = self.make_temp_certfile(expcert, tmpdir)
1756            if not expcert_file:
1757                raise service_error(service_error.internal, 
1758                        "Cannot create temp cert file?")
1759        else:
1760            expcert_file = None
1761
1762        return (eid, expid, expcert_file)
1763
1764    def get_topology(self, req, tmpdir):
1765        """
1766        Get the ns2 content and put it into a file for parsing.  Call the local
1767        or remote parser and return the topdl.Topology.  Errors result in
1768        exceptions.  req is the request and tmpdir is a work directory.
1769        """
1770
1771        # The tcl parser needs to read a file so put the content into that file
1772        descr=req.get('experimentdescription', None)
1773        if descr:
1774            if 'ns2description' in descr:
1775                file_content=descr['ns2description']
1776            elif 'topdldescription' in descr:
1777                return topdl.Topology(**descr['topdldescription'])
1778            else:
1779                raise service_error(service_error.req, 
1780                        'Unknown experiment description type')
1781        else:
1782            raise service_error(service_error.req, "No experiment description")
1783
1784
1785        if self.splitter_url:
1786            self.log.debug("Calling remote topdl translator at %s" % \
1787                    self.splitter_url)
1788            top = self.remote_ns2topdl(self.splitter_url, file_content)
1789        else:
1790            tclfile = os.path.join(tmpdir, "experiment.tcl")
1791            if file_content:
1792                try:
1793                    f = open(tclfile, 'w')
1794                    f.write(file_content)
1795                    f.close()
1796                except EnvironmentError:
1797                    raise service_error(service_error.internal,
1798                            "Cannot write temp experiment description")
1799            else:
1800                raise service_error(service_error.req, 
1801                        "Only ns2descriptions supported")
1802            pid = "dummy"
1803            gid = "dummy"
1804            eid = "dummy"
1805
1806            tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
1807                str(self.muxmax), '-m', 'dummy']
1808
1809            tclcmd.extend([pid, gid, eid, tclfile])
1810
1811            self.log.debug("running local splitter %s", " ".join(tclcmd))
1812            # This is just fantastic.  As a side effect the parser copies
1813            # tb_compat.tcl into the current directory, so that directory
1814            # must be writable by the fedd user.  Doing this in the
1815            # temporary subdir ensures this is the case.
1816            tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
1817                    cwd=tmpdir)
1818            split_data = tclparser.stdout
1819
1820            top = topdl.topology_from_xml(file=split_data, top="experiment")
1821            os.remove(tclfile)
1822
1823        return top
1824
1825    def get_testbed_services(self, req, testbeds):
1826        """
1827        Parse the services section of the request into two dicts mapping
1828        testbed to lists of federated_service objects.  The first dict maps all
1829        exporters of services to those service objects, the second maps
1830        testbeds to service objects only for services requiring portals.
1831        """
1832        # We construct both dicts here because deriving the second is more
1833        # comples than it looks - both the keys and lists can differ, and it's
1834        # much easier to generate both in one pass.
1835        masters = { }
1836        pmasters = { }
1837        for s in req.get('service', []):
1838            # If this is a service request with the importall field
1839            # set, fill it out.
1840
1841            if s.get('importall', False):
1842                s['import'] = [ tb for tb in testbeds \
1843                        if tb not in s.get('export',[])]
1844                del s['importall']
1845
1846            # Add the service to masters
1847            for tb in s.get('export', []):
1848                if s.get('name', None):
1849
1850                    params = { }
1851                    for a in s.get('fedAttr', []):
1852                        params[a.get('attribute', '')] = a.get('value','')
1853
1854                    fser = federated_service(name=s['name'],
1855                            exporter=tb, importers=s.get('import',[]),
1856                            params=params)
1857                    if fser.name == 'hide_hosts' \
1858                            and 'hosts' not in fser.params:
1859                        fser.params['hosts'] = \
1860                                ",".join(tb_hosts.get(fser.exporter, []))
1861                    if tb in masters: masters[tb].append(fser)
1862                    else: masters[tb] = [fser]
1863
1864                    if fser.portal:
1865                        if tb in pmasters: pmasters[tb].append(fser)
1866                        else: pmasters[tb] = [fser]
1867                else:
1868                    self.log.error('Testbed service does not have name " + \
1869                            "and importers')
1870        return masters, pmasters
1871
1872    def generate_keys_and_hosts(self, tmpdir, expid, hosts, tbparams):
1873        """
1874        Create the ssh keys necessary for interconnecting the portal nodes and
1875        the global hosts file for letting each segment know about the IP
1876        addresses in play.  Save these into the repo.  Add attributes to the
1877        autorizer allowing access controllers to download them and return a set
1878        of attributes that inform the segments where to find this stuff.  May
1879        raise service_errors in if there are problems.
1880        """
1881        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1882        gw_secretkey_base = "fed.%s" % self.ssh_type
1883        keydir = os.path.join(tmpdir, 'keys')
1884        gw_pubkey = os.path.join(keydir, gw_pubkey_base)
1885        gw_secretkey = os.path.join(keydir, gw_secretkey_base)
1886
1887        try:
1888            self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1889        except ValueError:
1890            raise service_error(service_error.server_config, 
1891                    "Bad key type (%s)" % self.ssh_type)
1892
1893        self.generate_seer_certs(keydir)
1894
1895        # Copy configuration files into the remote file store
1896        # The config urlpath
1897        configpath = "/%s/config" % expid
1898        # The config file system location
1899        configdir ="%s%s" % ( self.repodir, configpath)
1900        try:
1901            os.makedirs(configdir)
1902        except EnvironmentError, e:
1903            raise service_error(service_error.internal,
1904                    "Cannot create config directory: %s" % e)
1905        try:
1906            f = open("%s/hosts" % configdir, "w")
1907            print >> f, string.join(hosts, '\n')
1908            f.close()
1909        except EnvironmentError, e:
1910            raise service_error(service_error.internal, 
1911                    "Cannot write hosts file: %s" % e)
1912        try:
1913            copy_file(gw_pubkey, os.path.join(configdir, gw_pubkey_base))
1914            copy_file(gw_secretkey, os.path.join(configdir, gw_secretkey_base))
1915            copy_file(os.path.join(keydir, 'ca.pem'), 
1916                    os.path.join(configdir, 'ca.pem'))
1917            copy_file(os.path.join(keydir, 'node.pem'), 
1918                    os.path.join(configdir, 'node.pem'))
1919        except EnvironmentError, e:
1920            raise service_error(service_error.internal, 
1921                    "Cannot copy keyfiles: %s" % e)
1922
1923        # Allow the individual testbeds to access the configuration files,
1924        # again by setting an attribute for the relevant pathnames on each
1925        # allocation principal.  Yeah, that's a long list comprehension.
1926        self.append_experiment_authorization(expid, set([
1927            (tbparams[tb].allocID, "%s/%s" % (configpath, f)) \
1928                    for tb in tbparams.keys() \
1929                        for f in ("hosts", 'ca.pem', 'node.pem', 
1930                            gw_secretkey_base, gw_pubkey_base)]))
1931
1932        attrs = [ 
1933                {
1934                    'attribute': 'ssh_pubkey', 
1935                    'value': '%s/%s/config/%s' % \
1936                            (self.repo_url, expid, gw_pubkey_base)
1937                },
1938                {
1939                    'attribute': 'ssh_secretkey', 
1940                    'value': '%s/%s/config/%s' % \
1941                            (self.repo_url, expid, gw_secretkey_base)
1942                },
1943                {
1944                    'attribute': 'hosts', 
1945                    'value': '%s/%s/config/hosts' % \
1946                            (self.repo_url, expid)
1947                },
1948                {
1949                    'attribute': 'seer_ca_pem', 
1950                    'value': '%s/%s/config/%s' % \
1951                            (self.repo_url, expid, 'ca.pem')
1952                },
1953                {
1954                    'attribute': 'seer_node_pem', 
1955                    'value': '%s/%s/config/%s' % \
1956                            (self.repo_url, expid, 'node.pem')
1957                },
1958            ]
1959        return attrs
1960
1961
1962    def get_vtopo(self, req, fid):
1963        """
1964        Return the stored virtual topology for this experiment
1965        """
1966        rv = None
1967        state = None
1968        self.log.info("vtopo call started for %s" %  fid)
1969
1970        req = req.get('VtopoRequestBody', None)
1971        if not req:
1972            raise service_error(service_error.req,
1973                    "Bad request format (no VtopoRequestBody)")
1974        exp = req.get('experiment', None)
1975        if exp:
1976            if exp.has_key('fedid'):
1977                key = exp['fedid']
1978                keytype = "fedid"
1979            elif exp.has_key('localname'):
1980                key = exp['localname']
1981                keytype = "localname"
1982            else:
1983                raise service_error(service_error.req, "Unknown lookup type")
1984        else:
1985            raise service_error(service_error.req, "No request?")
1986
1987        try:
1988            proof = self.check_experiment_access(fid, key)
1989        except service_error, e:
1990            self.log.info("vtopo call failed for %s: access denied" %  fid)
1991            raise e
1992
1993        self.state_lock.acquire()
1994        # XXX: this needs to be recalculated
1995        if key in self.state:
1996            if self.state[key].top is not None:
1997                vtopo = topdl.topology_to_vtopo(self.state[key].top)
1998                rv = { 'experiment' : {keytype: key },
1999                        'vtopo': vtopo,
2000                        'proof': proof.to_dict(), 
2001                    }
2002            else:
2003                state = self.state[key].status
2004        self.state_lock.release()
2005
2006        if rv: 
2007            self.log.info("vtopo call completed for %s %s " % \
2008                (key, fid))
2009            return rv
2010        else: 
2011            if state:
2012                self.log.info("vtopo call completed for %s %s (Not ready)" % \
2013                    (key, fid))
2014                raise service_error(service_error.partial, 
2015                        "Not ready: %s" % state)
2016            else:
2017                self.log.info("vtopo call completed for %s %s (No experiment)"\
2018                        % (key, fid))
2019                raise service_error(service_error.req, "No such experiment")
2020
2021    def get_vis(self, req, fid):
2022        """
2023        Return the stored visualization for this experiment
2024        """
2025        rv = None
2026        state = None
2027
2028        self.log.info("vis call started for %s" %  fid)
2029        req = req.get('VisRequestBody', None)
2030        if not req:
2031            raise service_error(service_error.req,
2032                    "Bad request format (no VisRequestBody)")
2033        exp = req.get('experiment', None)
2034        if exp:
2035            if exp.has_key('fedid'):
2036                key = exp['fedid']
2037                keytype = "fedid"
2038            elif exp.has_key('localname'):
2039                key = exp['localname']
2040                keytype = "localname"
2041            else:
2042                raise service_error(service_error.req, "Unknown lookup type")
2043        else:
2044            raise service_error(service_error.req, "No request?")
2045
2046        try:
2047            proof = self.check_experiment_access(fid, key)
2048        except service_error, e:
2049            self.log.info("vis call failed for %s: access denied" %  fid)
2050            raise e
2051
2052        self.state_lock.acquire()
2053        # Generate the visualization
2054        if key in self.state:
2055            if self.state[key].top is not None:
2056                try:
2057                    vis = self.genviz(
2058                            topdl.topology_to_vtopo(self.state[key].top))
2059                except service_error, e:
2060                    self.state_lock.release()
2061                    raise e
2062                rv =  { 'experiment' : {keytype: key },
2063                        'vis': vis,
2064                        'proof': proof.to_dict(), 
2065                        }
2066            else:
2067                state = self.state[key].status
2068        self.state_lock.release()
2069
2070        if rv: 
2071            self.log.info("vis call completed for %s %s " % \
2072                (key, fid))
2073            return rv
2074        else:
2075            if state:
2076                self.log.info("vis call completed for %s %s (not ready)" % \
2077                    (key, fid))
2078                raise service_error(service_error.partial, 
2079                        "Not ready: %s" % state)
2080            else:
2081                self.log.info("vis call completed for %s %s (no experiment)" % \
2082                    (key, fid))
2083                raise service_error(service_error.req, "No such experiment")
2084
2085   
2086    def save_federant_information(self, allocated, tbparams, eid, top):
2087        """
2088        Store the various data that have changed in the experiment state
2089        between when it was started and the beginning of resource allocation.
2090        This is basically the information about each local allocation.  This
2091        fills in the values of the placeholder allocation in the state.  It
2092        also collects the access proofs and returns them as dicts for a
2093        response message.
2094        """
2095        self.state_lock.acquire()
2096        exp = self.state[eid]
2097        exp.top = top.clone()
2098        # save federant information
2099        for k in allocated.keys():
2100            exp.add_allocation(tbparams[k])
2101            top.elements.append(topdl.Testbed(uri=tbparams[k].uri,
2102                type="testbed", localname=[k], 
2103                service=[ s.to_topdl() for s in tbparams[k].services]))
2104
2105        # Access proofs for the response message
2106        proofs = [copy.deepcopy(p) for k in tbparams.keys()\
2107                    for p in tbparams[k].proof]
2108        exp.updated()
2109        if self.state_filename: 
2110            self.write_state()
2111        self.state_lock.release()
2112        return proofs
2113
2114    def clear_placeholder(self, eid, expid, tmpdir):
2115        """
2116        Clear the placeholder and remove any allocated temporary dir.
2117        """
2118
2119        self.state_lock.acquire()
2120        del self.state[eid]
2121        del self.state[expid]
2122        if self.state_filename: self.write_state()
2123        self.state_lock.release()
2124        if tmpdir and self.cleanup:
2125            self.remove_dirs(tmpdir)
2126
2127    # end of create_experiment sub-functions
2128
2129    def create_experiment(self, req, fid):
2130        """
2131        The external interface to experiment creation called from the
2132        dispatcher.
2133
2134        Creates a working directory, splits the incoming description using the
2135        splitter script and parses out the various subsections using the
2136        classes above.  Once each sub-experiment is created, use pooled threads
2137        to instantiate them and start it all up.
2138        """
2139
2140        self.log.info("Create experiment call started for %s" % fid)
2141        req = req.get('CreateRequestBody', None)
2142        if req:
2143            key = self.get_experiment_key(req)
2144        else:
2145            raise service_error(service_error.req,
2146                    "Bad request format (no CreateRequestBody)")
2147
2148        # Import information from the requester
2149        # import may partially succeed so always save credentials and warn
2150        if not self.auth.import_credentials(data_list=req.get('credential',[])):
2151            self.log.debug("Failed to import delegation credentials(!)")
2152        self.get_grouper_updates(fid)
2153        self.auth.update()
2154        self.auth.save()
2155
2156        try:
2157            # Make sure that the caller can talk to us
2158            proof = self.check_experiment_access(fid, key)
2159        except service_error, e:
2160            self.log.info("Create experiment call failed for %s: access denied"\
2161                    % fid)
2162            raise e
2163
2164
2165        # Install the testbed map entries supplied with the request into a copy
2166        # of the testbed map.
2167        tbmap = dict(self.tbmap)
2168        tbactive = set(self.tbactive)
2169        for m in req.get('testbedmap', []):
2170            if 'testbed' in m and 'uri' in m:
2171                tbmap[m['testbed']] = m['uri']
2172                if 'active' in m and m['active']: tbactive.add(m['testbed'])
2173
2174        # a place to work
2175        try:
2176            tmpdir = tempfile.mkdtemp(prefix="split-")
2177            os.mkdir(tmpdir+"/keys")
2178        except EnvironmentError:
2179            raise service_error(service_error.internal, "Cannot create tmp dir")
2180
2181        tbparams = { }
2182
2183        eid, expid, expcert_file = \
2184                self.get_experiment_ids_and_start(key, tmpdir)
2185
2186        # This catches exceptions to clear the placeholder if necessary
2187        try: 
2188            if not (eid and expid):
2189                raise service_error(service_error.internal, 
2190                        "Cannot find local experiment info!?")
2191
2192            top = self.get_topology(req, tmpdir)
2193            self.confirm_software(top)
2194            # Assign the IPs
2195            hosts, ip_allocator = self.allocate_ips_to_topo(top)
2196            # Find the testbeds to look up
2197            tb_hosts = { }
2198            testbeds = [ ]
2199            for e in top.elements:
2200                if isinstance(e, topdl.Computer):
2201                    tb = e.get_attribute('testbed') or 'default'
2202                    if tb in tb_hosts: tb_hosts[tb].append(e.name)
2203                    else: 
2204                        tb_hosts[tb] = [ e.name ]
2205                        testbeds.append(tb)
2206
2207            masters, pmasters = self.get_testbed_services(req, testbeds)
2208            allocated = { }         # Testbeds we can access
2209            topo ={ }               # Sub topologies
2210            connInfo = { }          # Connection information
2211
2212            self.split_topology(top, topo, testbeds)
2213
2214            self.get_access_to_testbeds(testbeds, fid, allocated, 
2215                    tbparams, masters, tbmap, expid, expcert_file)
2216
2217            attrs = self.generate_keys_and_hosts(tmpdir, expid, hosts, tbparams)
2218
2219            part = experiment_partition(self.auth, self.store_url, tbmap,
2220                    self.muxmax, self.direct_transit, tbactive)
2221            part.add_portals(top, topo, eid, pmasters, tbparams, ip_allocator,
2222                    connInfo, expid)
2223
2224            auth_attrs = set()
2225            # Now get access to the dynamic testbeds (those added above)
2226            for tb in [ t for t in topo if t not in allocated]:
2227                self.get_access(tb, tbparams, fid, masters, tbmap, 
2228                        expid, expcert_file)
2229                allocated[tb] = 1
2230                store_keys = topo[tb].get_attribute('store_keys')
2231                # Give the testbed access to keys it exports or imports
2232                if store_keys:
2233                    auth_attrs.update(set([
2234                        (tbparams[tb].allocID, sk) \
2235                                for sk in store_keys.split(" ")]))
2236
2237            if auth_attrs:
2238                self.append_experiment_authorization(expid, auth_attrs)
2239
2240            # transit and disconnected testbeds may not have a connInfo entry.
2241            # Fill in the blanks.
2242            for t in allocated.keys():
2243                if not connInfo.has_key(t):
2244                    connInfo[t] = { }
2245
2246            self.wrangle_software(expid, top, topo, tbparams)
2247
2248            proofs = self.save_federant_information(allocated, tbparams, 
2249                    eid, top)
2250        except service_error, e:
2251            # If something goes wrong in the parse (usually an access error)
2252            # clear the placeholder state.  From here on out the code delays
2253            # exceptions.  Failing at this point returns a fault to the remote
2254            # caller.
2255
2256            self.log.info("Create experiment call failed for %s %s: %s" % 
2257                    (eid, fid, e))
2258            self.clear_placeholder(eid, expid, tmpdir)
2259            raise e
2260
2261        # Start the background swapper and return the starting state.  From
2262        # here on out, the state will stick around a while.
2263
2264        # Create a logger that logs to the experiment's state object as well as
2265        # to the main log file.
2266        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
2267        alloc_collector = self.list_log(self.state[eid].log)
2268        h = logging.StreamHandler(alloc_collector)
2269        # XXX: there should be a global one of these rather than repeating the
2270        # code.
2271        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2272                    '%d %b %y %H:%M:%S'))
2273        alloc_log.addHandler(h)
2274
2275        # Start a thread to do the resource allocation
2276        t  = Thread(target=self.allocate_resources,
2277                args=(allocated, masters, eid, expid, tbparams, 
2278                    top, topo, tmpdir, alloc_log, alloc_collector, attrs,
2279                    connInfo, tbmap, expcert_file),
2280                name=eid)
2281        t.start()
2282
2283        rv = {
2284                'experimentID': [
2285                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2286                ],
2287                'experimentStatus': 'starting',
2288                'proof': [ proof.to_dict() ] + proofs,
2289            }
2290        self.log.info("Create experiment call succeeded for %s %s" % \
2291                (eid, fid))
2292
2293        return rv
2294   
2295    def get_experiment_fedid(self, key):
2296        """
2297        find the fedid associated with the localname key in the state database.
2298        """
2299
2300        rv = None
2301        self.state_lock.acquire()
2302        if key in self.state:
2303            rv = self.state[key].fedid
2304        self.state_lock.release()
2305        return rv
2306
2307    def check_experiment_access(self, fid, key):
2308        """
2309        Confirm that the fid has access to the experiment.  Though a request
2310        may be made in terms of a local name, the access attribute is always
2311        the experiment's fedid.
2312        """
2313        if not isinstance(key, fedid):
2314            key = self.get_experiment_fedid(key)
2315
2316        access_ok, proof = self.auth.check_attribute(fid, key, with_proof=True)
2317
2318        if access_ok:
2319            return proof
2320        else:
2321            raise service_error(service_error.access, "Access Denied",
2322                proof)
2323
2324
2325    def get_handler(self, path, fid):
2326        """
2327        Perhaps surprisingly named, this function handles HTTP GET requests to
2328        this server (SOAP requests are POSTs).
2329        """
2330        self.log.info("Get handler %s %s" % (path, fid))
2331        if len("%s" % fid) == 0:
2332            return (None, None)
2333        # XXX: log proofs?
2334        if self.auth.check_attribute(fid, path):
2335            return ("%s/%s" % (self.repodir, path), "application/binary")
2336        else:
2337            return (None, None)
2338
2339    def update_info(self, key, force=False):
2340        top = None
2341        self.state_lock.acquire()
2342        if key in self.state:
2343            if force or self.state[key].older_than(self.info_cache_limit):
2344                top = self.state[key].top
2345                if top is not None: top = top.clone()
2346                d1, info_params, cert, d2 = \
2347                        self.get_segment_info(self.state[key], need_lock=False)
2348        self.state_lock.release()
2349
2350        if top is None: return
2351
2352        try:
2353            tmpdir = tempfile.mkdtemp(prefix="info-")
2354        except EnvironmentError:
2355            raise service_error(service_error.internal, 
2356                    "Cannot create tmp dir")
2357        cert_file = self.make_temp_certfile(cert, tmpdir)
2358
2359        data = []
2360        try:
2361            for k, (uri, aid) in info_params.items():
2362                info=self.info_segment(log=self.log, testbed=uri,
2363                            cert_file=cert_file, cert_pwd=None,
2364                            trusted_certs=self.trusted_certs,
2365                            caller=self.call_InfoSegment)
2366                info(uri, aid)
2367                data.append(info)
2368        # Clean up the tmpdir no matter what
2369        finally:
2370            if tmpdir: self.remove_dirs(tmpdir)
2371
2372        self.annotate_topology(top, data)
2373        self.state_lock.acquire()
2374        if key in self.state:
2375            self.state[key].top = top
2376            self.state[key].updated()
2377            if self.state_filename: self.write_state()
2378        self.state_lock.release()
2379
2380   
2381    def get_info(self, req, fid):
2382        """
2383        Return all the stored info about this experiment
2384        """
2385        rv = None
2386
2387        self.log.info("Info call started for %s" %  fid)
2388        req = req.get('InfoRequestBody', None)
2389        if not req:
2390            raise service_error(service_error.req,
2391                    "Bad request format (no InfoRequestBody)")
2392        exp = req.get('experiment', None)
2393        legacy = req.get('legacy', False)
2394        fresh = req.get('fresh', False)
2395        if exp:
2396            if exp.has_key('fedid'):
2397                key = exp['fedid']
2398                keytype = "fedid"
2399            elif exp.has_key('localname'):
2400                key = exp['localname']
2401                keytype = "localname"
2402            else:
2403                raise service_error(service_error.req, "Unknown lookup type")
2404        else:
2405            raise service_error(service_error.req, "No request?")
2406
2407        try:
2408            proof = self.check_experiment_access(fid, key)
2409        except service_error, e:
2410            self.log.info("Info call failed for %s: access denied" %  fid)
2411
2412
2413        self.update_info(key, fresh)
2414
2415        self.state_lock.acquire()
2416        if self.state.has_key(key):
2417            rv = self.state[key].get_info()
2418            # Copy the topo if we need legacy annotations
2419            if legacy:
2420                top = self.state[key].top
2421                if top is not None: top = top.clone()
2422        self.state_lock.release()
2423        self.log.info("Gathered Info for %s %s" % (key, fid))
2424
2425        # If the legacy visualization and topology representations are
2426        # requested, calculate them and add them to the return.
2427        if legacy and rv is not None:
2428            self.log.info("Generating legacy Info for %s %s" % (key, fid))
2429            if top is not None:
2430                vtopo = topdl.topology_to_vtopo(top)
2431                if vtopo is not None:
2432                    rv['vtopo'] = vtopo
2433                    try:
2434                        vis = self.genviz(vtopo)
2435                    except service_error, e:
2436                        self.log.debug('Problem generating visualization: %s' \
2437                                % e)
2438                        vis = None
2439                    if vis is not None:
2440                        rv['vis'] = vis
2441        if rv:
2442            self.log.info("Info succeded for %s %s" % (key, fid))
2443            rv['proof'] = proof.to_dict()
2444            return rv
2445        else: 
2446            self.log.info("Info failed for %s %s: no experiment" % (key, fid))
2447            raise service_error(service_error.req, "No such experiment")
2448
2449    def operate_on_segments(self, op_params, cert, op, testbeds, params, 
2450            results):
2451        """
2452        Call OperateSegment on multiple testbeds and gather the results.
2453        op_params contains the parameters needed to contact that testbed, cert
2454        is a certificate containing the fedid to use, op is the operation,
2455        testbeds is a dict mapping testbed name to targets in that testbed,
2456        params are the parameters to include a,d results is a growing list of
2457        the results of the calls.
2458        """
2459        try:
2460            tmpdir = tempfile.mkdtemp(prefix="info-")
2461        except EnvironmentError:
2462            raise service_error(service_error.internal, 
2463                    "Cannot create tmp dir")
2464        cert_file = self.make_temp_certfile(cert, tmpdir)
2465
2466        try:
2467            for tb, targets in testbeds.items():
2468                if tb in op_params:
2469                    uri, aid = op_params[tb]
2470                    operate=self.operation_segment(log=self.log, testbed=uri,
2471                                cert_file=cert_file, cert_pwd=None,
2472                                trusted_certs=self.trusted_certs,
2473                                caller=self.call_OperationSegment)
2474                    if operate(uri, aid, op, targets, params):
2475                        if operate.status is not None:
2476                            results.extend(operate.status)
2477                            continue
2478                # Something went wrong in a weird way.  Add statuses
2479                # that reflect that to results
2480                for t in targets:
2481                    results.append(operation_status(t, 
2482                        operation_status.federant,
2483                        'Unexpected error on %s' % tb))
2484        # Clean up the tmpdir no matter what
2485        finally:
2486            if tmpdir: self.remove_dirs(tmpdir)
2487
2488    def do_operation(self, req, fid):
2489        """
2490        Find the testbeds holding each target and ask them to carry out the
2491        operation.  Return the statuses.
2492        """
2493        # Map an element to the testbed containing it
2494        def element_to_tb(e):
2495            if isinstance(e, topdl.Computer): return e.get_attribute("testbed")
2496            elif isinstance(e, topdl.Testbed): return e.name
2497            else: return None
2498        # If d is an operation_status object, make it a dict
2499        def make_dict(d):
2500            if isinstance(d, dict): return d
2501            elif isinstance(d, operation_status): return d.to_dict()
2502            else: return { }
2503
2504        def element_name(e):
2505            if isinstance(e, topdl.Computer): return e.name
2506            elif isinstance(e, topdl.Testbed): 
2507                if e.localname: return e.localname[0]
2508                else: return None
2509            else: return None
2510
2511        self.log.info("Operation call started for %s" %  fid)
2512        req = req.get('OperationRequestBody', None)
2513        if not req:
2514            raise service_error(service_error.req,
2515                    "Bad request format (no OperationRequestBody)")
2516        exp = req.get('experiment', None)
2517        op = req.get('operation', None)
2518        targets = set(req.get('target', []))
2519        params = req.get('parameter', None)
2520
2521        if exp:
2522            if 'fedid' in exp:
2523                key = exp['fedid']
2524                keytype = "fedid"
2525            elif 'localname' in exp:
2526                key = exp['localname']
2527                keytype = "localname"
2528            else:
2529                raise service_error(service_error.req, "Unknown lookup type")
2530        else:
2531            raise service_error(service_error.req, "No request?")
2532
2533        if op is None or not targets:
2534            raise service_error(service_error.req, "No request?")
2535
2536        try:
2537            proof = self.check_experiment_access(fid, key)
2538        except service_error, e:
2539            self.log.info("Operation call failed for %s: access denied" %  fid)
2540            raise e
2541
2542        self.state_lock.acquire()
2543        if key in self.state:
2544            d1, op_params, cert, d2 = \
2545                    self.get_segment_info(self.state[key], need_lock=False,
2546                            key='tb')
2547            top = self.state[key].top
2548            if top is not None:
2549                top = top.clone()
2550        self.state_lock.release()
2551
2552        if top is None:
2553            self.log.info("Operation call failed for %s: not active" %  fid)
2554            raise service_error(service_error.partial, "No topology yet", 
2555                    proof=proof)
2556
2557        testbeds = { }
2558        results = []
2559        for e in top.elements:
2560            ename = element_name(e)
2561            if ename in targets:
2562                tb = element_to_tb(e)
2563                targets.remove(ename)
2564                if tb is not None:
2565                    if tb in testbeds: testbeds[tb].append(ename)
2566                    else: testbeds[tb] = [ ename ]
2567                else:
2568                    results.append(operation_status(e.name, 
2569                        code=operation_status.no_target, 
2570                        description='Cannot map target to testbed'))
2571
2572        for t in targets:
2573            results.append(operation_status(t, operation_status.no_target))
2574
2575        self.operate_on_segments(op_params, cert, op, testbeds, params,
2576                results)
2577
2578        self.log.info("Operation call succeeded for %s" %  fid)
2579        return { 
2580                'experiment': exp, 
2581                'status': [make_dict(r) for r in results],
2582                'proof': proof.to_dict()
2583                }
2584
2585
2586    def get_multi_info(self, req, fid):
2587        """
2588        Return all the stored info that this fedid can access
2589        """
2590        rv = { 'info': [ ], 'proof': [ ] }
2591
2592        self.log.info("Multi Info call started for %s" %  fid)
2593        self.get_grouper_updates(fid)
2594        self.auth.update()
2595        self.auth.save()
2596        self.state_lock.acquire()
2597        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2598            try:
2599                proof = self.check_experiment_access(fid, key)
2600            except service_error, e:
2601                if e.code == service_error.access:
2602                    continue
2603                else:
2604                    self.log.info("Multi Info call failed for %s: %s" %  \
2605                            (e,fid))
2606                    self.state_lock.release()
2607                    raise e
2608
2609            if self.state.has_key(key):
2610                e = self.state[key].get_info()
2611                e['proof'] = proof.to_dict()
2612                rv['info'].append(e)
2613                rv['proof'].append(proof.to_dict())
2614        self.state_lock.release()
2615        self.log.info("Multi Info call succeeded for %s" %  fid)
2616        return rv
2617
2618    def check_termination_status(self, fed_exp, force):
2619        """
2620        Confirm that the experiment is sin a valid state to stop (or force it)
2621        return the state - invalid states for deletion and force settings cause
2622        exceptions.
2623        """
2624        self.state_lock.acquire()
2625        status = fed_exp.status
2626
2627        if status:
2628            if status in ('starting', 'terminating'):
2629                if not force:
2630                    self.state_lock.release()
2631                    raise service_error(service_error.partial, 
2632                            'Experiment still being created or destroyed')
2633                else:
2634                    self.log.warning('Experiment in %s state ' % status + \
2635                            'being terminated by force.')
2636            self.state_lock.release()
2637            return status
2638        else:
2639            # No status??? trouble
2640            self.state_lock.release()
2641            raise service_error(service_error.internal,
2642                    "Experiment has no status!?")
2643
2644    def get_segment_info(self, fed_exp, need_lock=True, key='aid'):
2645        ids = []
2646        term_params = { }
2647        if need_lock: self.state_lock.acquire()
2648        ids = [ x for x in (fed_exp.localname, fed_exp.fedid) if x is not None ]
2649        expcert = fed_exp.identity
2650        repo = "%s" % fed_exp.fedid
2651
2652        # Collect the allocation/segment ids into a dict keyed by the fedid
2653        # of the allocation that contains a tuple of uri, aid
2654        for i, fed in enumerate(fed_exp.get_all_allocations()):
2655            uri = fed.uri
2656            aid = fed.allocID
2657            if key == 'aid': term_params[aid] = (uri, aid)
2658            elif key == 'tb': term_params[fed.tb] = (uri, aid)
2659
2660        if need_lock: self.state_lock.release()
2661        return ids, term_params, expcert, repo
2662
2663
2664    def get_termination_info(self, fed_exp):
2665        self.state_lock.acquire()
2666        ids, term_params, expcert, repo = self.get_segment_info(fed_exp, False)
2667        # Change the experiment state
2668        fed_exp.status = 'terminating'
2669        fed_exp.updated()
2670        if self.state_filename: self.write_state()
2671        self.state_lock.release()
2672
2673        return ids, term_params, expcert, repo
2674
2675
2676    def deallocate_resources(self, term_params, expcert, status, force, 
2677            dealloc_log):
2678        tmpdir = None
2679        # This try block makes sure the tempdir is cleared
2680        try:
2681            # If no expcert, try the deallocation as the experiment
2682            # controller instance.
2683            if expcert and self.auth_type != 'legacy': 
2684                try:
2685                    tmpdir = tempfile.mkdtemp(prefix="term-")
2686                except EnvironmentError:
2687                    raise service_error(service_error.internal, 
2688                            "Cannot create tmp dir")
2689                cert_file = self.make_temp_certfile(expcert, tmpdir)
2690                pw = None
2691            else: 
2692                cert_file = self.cert_file
2693                pw = self.cert_pwd
2694
2695            # Stop everyone.  NB, wait_for_all waits until a thread starts
2696            # and then completes, so we can't wait if nothing starts.  So,
2697            # no tbparams, no start.
2698            if len(term_params) > 0:
2699                tp = thread_pool(self.nthreads)
2700                for k, (uri, aid) in term_params.items():
2701                    # Create and start a thread to stop the segment
2702                    tp.wait_for_slot()
2703                    t  = pooled_thread(\
2704                            target=self.terminate_segment(log=dealloc_log,
2705                                testbed=uri,
2706                                cert_file=cert_file, 
2707                                cert_pwd=pw,
2708                                trusted_certs=self.trusted_certs,
2709                                caller=self.call_TerminateSegment),
2710                            args=(uri, aid), name=k,
2711                            pdata=tp, trace_file=self.trace_file)
2712                    t.start()
2713                # Wait for completions
2714                tp.wait_for_all_done()
2715
2716            # release the allocations (failed experiments have done this
2717            # already, and starting experiments may be in odd states, so we
2718            # ignore errors releasing those allocations
2719            try: 
2720                for k, (uri, aid)  in term_params.items():
2721                    self.release_access(None, aid, uri=uri,
2722                            cert_file=cert_file, cert_pwd=pw)
2723            except service_error, e:
2724                if status != 'failed' and not force:
2725                    raise e
2726
2727        # Clean up the tmpdir no matter what
2728        finally:
2729            if tmpdir: self.remove_dirs(tmpdir)
2730
2731    def terminate_experiment(self, req, fid):
2732        """
2733        Swap this experiment out on the federants and delete the shared
2734        information
2735        """
2736        self.log.info("Terminate experiment call started for %s" % fid)
2737        tbparams = { }
2738        req = req.get('TerminateRequestBody', None)
2739        if not req:
2740            raise service_error(service_error.req,
2741                    "Bad request format (no TerminateRequestBody)")
2742
2743        key = self.get_experiment_key(req, 'experiment')
2744        try:
2745            proof = self.check_experiment_access(fid, key)
2746        except service_error, e:
2747            self.log.info(
2748                    "Terminate experiment call failed for %s: access denied" \
2749                            % fid)
2750            raise e
2751        exp = req.get('experiment', False)
2752        force = req.get('force', False)
2753
2754        dealloc_list = [ ]
2755
2756
2757        # Create a logger that logs to the dealloc_list as well as to the main
2758        # log file.
2759        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2760        dealloc_log.info("Terminating %s " %key)
2761        h = logging.StreamHandler(self.list_log(dealloc_list))
2762        # XXX: there should be a global one of these rather than repeating the
2763        # code.
2764        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2765                    '%d %b %y %H:%M:%S'))
2766        dealloc_log.addHandler(h)
2767
2768        self.state_lock.acquire()
2769        fed_exp = self.state.get(key, None)
2770        self.state_lock.release()
2771        repo = None
2772
2773        if fed_exp:
2774            status = self.check_termination_status(fed_exp, force)
2775            # get_termination_info updates the experiment state
2776            ids, term_params, expcert, repo = self.get_termination_info(fed_exp)
2777            self.deallocate_resources(term_params, expcert, status, force, 
2778                    dealloc_log)
2779
2780            # Remove the terminated experiment
2781            self.state_lock.acquire()
2782            for id in ids:
2783                self.clear_experiment_authorization(id, need_state_lock=False)
2784                if id in self.state: del self.state[id]
2785
2786            if self.state_filename: self.write_state()
2787            self.state_lock.release()
2788
2789            # Delete any synch points associated with this experiment.  All
2790            # synch points begin with the fedid of the experiment.
2791            fedid_keys = set(["fedid:%s" % f for f in ids \
2792                    if isinstance(f, fedid)])
2793            for k in self.synch_store.all_keys():
2794                try:
2795                    if len(k) > 45 and k[0:46] in fedid_keys:
2796                        self.synch_store.del_value(k)
2797                except synch_store.BadDeletionError:
2798                    pass
2799            self.write_store()
2800
2801            # Remove software and other cached stuff from the filesystem.
2802            if repo:
2803                self.remove_dirs("%s/%s" % (self.repodir, repo))
2804       
2805            self.log.info("Terminate experiment succeeded for %s %s" % \
2806                    (key, fid))
2807            return { 
2808                    'experiment': exp , 
2809                    'deallocationLog': string.join(dealloc_list, ''),
2810                    'proof': [proof.to_dict()],
2811                    }
2812        else:
2813            self.log.info("Terminate experiment failed for %s %s: no state" % \
2814                    (key, fid))
2815            raise service_error(service_error.req, "No saved state")
2816
2817
2818    def GetValue(self, req, fid):
2819        """
2820        Get a value from the synchronized store
2821        """
2822        req = req.get('GetValueRequestBody', None)
2823        if not req:
2824            raise service_error(service_error.req,
2825                    "Bad request format (no GetValueRequestBody)")
2826       
2827        name = req.get('name', None)
2828        wait = req.get('wait', False)
2829        rv = { 'name': name }
2830
2831        if not name:
2832            raise service_error(service_error.req, "No name?")
2833
2834        access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True)
2835
2836        if access_ok:
2837            self.log.debug("[GetValue] asking for %s " % name)
2838            try:
2839                v = self.synch_store.get_value(name, wait)
2840            except synch_store.RevokedKeyError:
2841                # No more synch on this key
2842                raise service_error(service_error.federant, 
2843                        "Synch key %s revoked" % name)
2844            if v is not None:
2845                rv['value'] = v
2846            rv['proof'] = proof.to_dict()
2847            self.log.debug("[GetValue] got %s from %s" % (v, name))
2848            return rv
2849        else:
2850            raise service_error(service_error.access, "Access Denied",
2851                    proof=proof)
2852       
2853
2854    def SetValue(self, req, fid):
2855        """
2856        Set a value in the synchronized store
2857        """
2858        req = req.get('SetValueRequestBody', None)
2859        if not req:
2860            raise service_error(service_error.req,
2861                    "Bad request format (no SetValueRequestBody)")
2862       
2863        name = req.get('name', None)
2864        v = req.get('value', '')
2865
2866        if not name:
2867            raise service_error(service_error.req, "No name?")
2868
2869        access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True)
2870
2871        if access_ok:
2872            try:
2873                self.synch_store.set_value(name, v)
2874                self.write_store()
2875                self.log.debug("[SetValue] set %s to %s" % (name, v))
2876            except synch_store.CollisionError:
2877                # Translate into a service_error
2878                raise service_error(service_error.req,
2879                        "Value already set: %s" %name)
2880            except synch_store.RevokedKeyError:
2881                # No more synch on this key
2882                raise service_error(service_error.federant, 
2883                        "Synch key %s revoked" % name)
2884                return { 'name': name, 'value': v, 'proof': proof.to_dict() }
2885        else:
2886            raise service_error(service_error.access, "Access Denied",
2887                    proof=proof)
Note: See TracBrowser for help on using the repository browser.