source: fedd/federation/experiment_control.py @ 995ad61

Last change on this file since 995ad61 was ab662b6, checked in by Ted Faber <faber@…>, 12 years ago

Avoid an internal network at DETER. This will have to come out

  • Property mode set to 100644
File size: 97.7 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15import shutil
16import zipfile
17
18import os.path
19
20import traceback
21# For parsing visualization output and splitter output
22import xml.parsers.expat
23
24from threading import Lock, Thread, Condition
25from subprocess import call, Popen, PIPE
26from string import join
27
28from urlparse import urlparse
29from urllib2 import urlopen, URLError
30
31from util import *
32from deter import fedid, generate_fedid
33from remote_service import xmlrpc_handler, soap_handler, service_caller
34from service_error import service_error
35from synch_store import synch_store
36from experiment_partition import experiment_partition
37from experiment_control_legacy import experiment_control_legacy
38from authorizer import abac_authorizer
39from thread_pool import thread_pool, pooled_thread
40from experiment_info import experiment_info, allocation_info, federated_service
41from operation_status import operation_status
42
43from deter import topdl
44from deter import ip_allocator
45from deter import ip_addr
46import list_log
47
48
49class nullHandler(logging.Handler):
50    def emit(self, record): pass
51
52fl = logging.getLogger("fedd.experiment_control")
53fl.addHandler(nullHandler())
54
55class experiment_control_local(experiment_control_legacy):
56    """
57    Control of experiments that this system can directly access.
58
59    Includes experiment creation, termination and information dissemination.
60    Thred safe.
61    """
62
63    class ssh_cmd_timeout(RuntimeError): pass
64   
65    call_RequestAccess = service_caller('RequestAccess')
66    call_ReleaseAccess = service_caller('ReleaseAccess')
67    call_StartSegment = service_caller('StartSegment')
68    call_TerminateSegment = service_caller('TerminateSegment')
69    call_InfoSegment = service_caller('InfoSegment')
70    call_OperationSegment = service_caller('OperationSegment')
71    call_Ns2Topdl = service_caller('Ns2Topdl')
72
73    def __init__(self, config=None, auth=None):
74        """
75        Intialize the various attributes, most from the config object
76        """
77
78        def parse_tarfile_list(tf):
79            """
80            Parse a tarfile list from the configuration.  This is a set of
81            paths and tarfiles separated by spaces.
82            """
83            rv = [ ]
84            if tf is not None:
85                tl = tf.split()
86                while len(tl) > 1:
87                    p, t = tl[0:2]
88                    del tl[0:2]
89                    rv.append((p, t))
90            return rv
91
92        self.list_log = list_log.list_log
93
94        self.cert_file = config.get("experiment_control", "cert_file")
95        if self.cert_file:
96            self.cert_pwd = config.get("experiment_control", "cert_pwd")
97        else:
98            self.cert_file = config.get("globals", "cert_file")
99            self.cert_pwd = config.get("globals", "cert_pwd")
100
101        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
102                or config.get("globals", "trusted_certs")
103
104        self.repodir = config.get("experiment_control", "repodir")
105        self.repo_url = config.get("experiment_control", "repo_url", 
106                "https://users.isi.deterlab.net:23235");
107
108        self.exp_stem = "fed-stem"
109        self.log = logging.getLogger("fedd.experiment_control")
110        set_log_level(config, "experiment_control", self.log)
111        self.muxmax = 2
112        self.nthreads = 10
113        self.randomize_experiments = False
114
115        self.splitter = None
116        self.ssh_keygen = "/usr/bin/ssh-keygen"
117        self.ssh_identity_file = None
118
119
120        self.debug = config.getboolean("experiment_control", "create_debug")
121        self.cleanup = not config.getboolean("experiment_control", 
122                "leave_tmpfiles")
123        self.state_filename = config.get("experiment_control", 
124                "experiment_state")
125        self.store_filename = config.get("experiment_control", 
126                "synch_store")
127        self.store_url = config.get("experiment_control", "store_url")
128        self.splitter_url = config.get("experiment_control", "ns2topdl_uri")
129        self.grouper_url = config.get("experiment_control", "grouper_url")
130        self.fedkit = parse_tarfile_list(\
131                config.get("experiment_control", "fedkit"))
132        self.gatewaykit = parse_tarfile_list(\
133                config.get("experiment_control", "gatewaykit"))
134
135        dt = config.get("experiment_control", "direct_transit")
136        self.auth_type = config.get('experiment_control', 'auth_type') \
137                or 'legacy'
138        self.auth_dir = config.get('experiment_control', 'auth_dir')
139        # XXX: document this!
140        self.info_cache_limit = \
141                config.getint('experiment_control', 'info_cache', 600)
142        if dt: self.direct_transit = [ tb.strip() for tb in dt.split(",")]
143        else: self.direct_transit = [ ]
144        # NB for internal master/slave ops, not experiment setup
145        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
146
147        self.overrides = set([])
148        ovr = config.get('experiment_control', 'overrides')
149        if ovr:
150            for o in ovr.split(","):
151                o = o.strip()
152                if o.startswith('fedid:'): o = o[len('fedid:'):]
153                self.overrides.add(fedid(hexstr=o))
154
155        self.state = { }
156        self.state_lock = Lock()
157        self.tclsh = "/usr/local/bin/otclsh"
158        self.tcl_splitter = config.get("ns2topdl", "tcl_splitter") or \
159                config.get("experiment_control", "tcl_splitter",
160                        "/usr/testbed/lib/ns2ir/parse.tcl")
161        mapdb_file = config.get("experiment_control", "mapdb")
162        self.trace_file = sys.stderr
163
164        self.def_expstart = \
165                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
166                "/tmp/federate";
167        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
168                "FEDDIR/hosts";
169        self.def_gwstart = \
170                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
171                "/tmp/bridge.log";
172        self.def_mgwstart = \
173                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
174                "/tmp/bridge.log";
175        self.def_gwimage = "FBSD61-TUNNEL2";
176        self.def_gwtype = "pc";
177        self.local_access = { }
178
179        if self.auth_type == 'legacy':
180            if auth:
181                self.auth = auth
182            else:
183                self.log.error( "[access]: No authorizer initialized, " +\
184                        "creating local one.")
185                auth = authorizer()
186            self.get_access = self.legacy_get_access
187        elif self.auth_type == 'abac':
188            self.auth = abac_authorizer(load=self.auth_dir, 
189                    update=os.path.join(self.auth_dir,'update'))
190        else:
191            raise service_error(service_error.internal, 
192                    "Unknown auth_type: %s" % self.auth_type)
193
194        if mapdb_file:
195            self.read_mapdb(mapdb_file)
196        else:
197            self.log.warn("[experiment_control] No testbed map, using defaults")
198            self.tbmap = { 
199                    'deter':'https://users.isi.deterlab.net:23235',
200                    'emulab':'https://users.isi.deterlab.net:23236',
201                    'ucb':'https://users.isi.deterlab.net:23237',
202                    }
203
204        # Grab saved state.  OK to do this w/o locking because it's read only
205        # and only one thread should be in existence that can see self.state at
206        # this point.
207        if self.state_filename:
208            self.read_state()
209
210        if self.store_filename:
211            self.read_store()
212        else:
213            self.log.warning("No saved synch store")
214            self.synch_store = synch_store
215
216        # Dispatch tables
217        self.soap_services = {\
218                'New': soap_handler('New', self.new_experiment),
219                'Create': soap_handler('Create', self.create_experiment),
220                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
221                'Vis': soap_handler('Vis', self.get_vis),
222                'Info': soap_handler('Info', self.get_info),
223                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
224                'Operation': soap_handler('Operation', self.do_operation),
225                'Terminate': soap_handler('Terminate', 
226                    self.terminate_experiment),
227                'GetValue': soap_handler('GetValue', self.GetValue),
228                'SetValue': soap_handler('SetValue', self.SetValue),
229        }
230
231        self.xmlrpc_services = {\
232                'New': xmlrpc_handler('New', self.new_experiment),
233                'Create': xmlrpc_handler('Create', self.create_experiment),
234                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
235                'Vis': xmlrpc_handler('Vis', self.get_vis),
236                'Info': xmlrpc_handler('Info', self.get_info),
237                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
238                'Terminate': xmlrpc_handler('Terminate',
239                    self.terminate_experiment),
240                'Operation': xmlrpc_handler('Operation', self.do_operation),
241                'GetValue': xmlrpc_handler('GetValue', self.GetValue),
242                'SetValue': xmlrpc_handler('SetValue', self.SetValue),
243        }
244
245    # Call while holding self.state_lock
246    def write_state(self):
247        """
248        Write a new copy of experiment state after copying the existing state
249        to a backup.
250
251        State format is a simple pickling of the state dictionary.
252        """
253        if os.access(self.state_filename, os.W_OK):
254            copy_file(self.state_filename, \
255                    "%s.bak" % self.state_filename)
256        try:
257            f = open(self.state_filename, 'w')
258            pickle.dump(self.state, f)
259        except EnvironmentError, e:
260            self.log.error("Can't write file %s: %s" % \
261                    (self.state_filename, e))
262        except pickle.PicklingError, e:
263            self.log.error("Pickling problem: %s" % e)
264        except TypeError, e:
265            self.log.error("Pickling problem (TypeError): %s" % e)
266
267    @staticmethod
268    def get_alloc_ids(exp):
269        """
270        Used by read_store and read state.  This used to be worse.
271        """
272
273        return [ a.allocID for a in exp.get_all_allocations() ]
274
275
276    # Call while holding self.state_lock
277    def read_state(self):
278        """
279        Read a new copy of experiment state.  Old state is overwritten.
280
281        State format is a simple pickling of the state dictionary.
282        """
283       
284        try:
285            f = open(self.state_filename, "r")
286            self.state = pickle.load(f)
287            self.log.debug("[read_state]: Read state from %s" % \
288                    self.state_filename)
289        except EnvironmentError, e:
290            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
291                    % (self.state_filename, e))
292        except pickle.UnpicklingError, e:
293            self.log.warning(("[read_state]: No saved state: " + \
294                    "Unpickling failed: %s") % e)
295       
296        for s in self.state.values():
297            try:
298
299                eid = s.fedid
300                if eid : 
301                    if self.auth_type == 'legacy':
302                        # XXX: legacy
303                        # Give the owner rights to the experiment
304                        #self.auth.set_attribute(s['owner'], eid)
305                        # And holders of the eid as well
306                        self.auth.set_attribute(eid, eid)
307                        # allow overrides to control experiments as well
308                        for o in self.overrides:
309                            self.auth.set_attribute(o, eid)
310                        # Set permissions to allow reading of the software
311                        # repo, if any, as well.
312                        for a in self.get_alloc_ids(s):
313                            self.auth.set_attribute(a, 'repo/%s' % eid)
314                else:
315                    raise KeyError("No experiment id")
316            except KeyError, e:
317                self.log.warning("[read_state]: State ownership or identity " +\
318                        "misformatted in %s: %s" % (self.state_filename, e))
319
320    def read_mapdb(self, file):
321        """
322        Read a simple colon separated list of mappings for the
323        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
324        also adds testbeds to active if they include , active after
325        their name.
326        """
327
328        self.tbmap = { }
329        lineno =0
330        try:
331            f = open(file, "r")
332            for line in f:
333                lineno += 1
334                line = line.strip()
335                if line.startswith('#') or len(line) == 0:
336                    continue
337                try:
338                    label, url = line.split(':', 1)
339                    self.tbmap[label] = url
340                except ValueError, e:
341                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
342                            "map db: %s %s" % (lineno, line, e))
343        except EnvironmentError, e:
344            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
345                    "open %s: %s" % (file, e))
346        else:
347            f.close()
348
349    def read_store(self):
350        try:
351            self.synch_store = synch_store()
352            self.synch_store.load(self.store_filename)
353            self.log.debug("[read_store]: Read store from %s" % \
354                    self.store_filename)
355        except EnvironmentError, e:
356            self.log.warning("[read_store]: No saved store: Can't open %s: %s"\
357                    % (self.state_filename, e))
358            self.synch_store = synch_store()
359
360        # Set the initial permissions on data in the store.  XXX: This ad hoc
361        # authorization attribute initialization is getting out of hand.
362        # XXX: legacy
363        if self.auth_type == 'legacy':
364            for k in self.synch_store.all_keys():
365                try:
366                    if k.startswith('fedid:'):
367                        fid = fedid(hexstr=k[6:46])
368                        if self.state.has_key(fid):
369                            for a in self.get_alloc_ids(self.state[fid]):
370                                self.auth.set_attribute(a, k)
371                except ValueError, e:
372                    self.log.warn("Cannot deduce permissions for %s" % k)
373
374
375    def write_store(self):
376        """
377        Write a new copy of synch_store after writing current state
378        to a backup.  We use the internal synch_store pickle method to avoid
379        incinsistent data.
380
381        State format is a simple pickling of the store.
382        """
383        if os.access(self.store_filename, os.W_OK):
384            copy_file(self.store_filename, \
385                    "%s.bak" % self.store_filename)
386        try:
387            self.synch_store.save(self.store_filename)
388        except EnvironmentError, e:
389            self.log.error("Can't write file %s: %s" % \
390                    (self.store_filename, e))
391        except TypeError, e:
392            self.log.error("Pickling problem (TypeError): %s" % e)
393
394    # XXX this may belong somewhere else
395
396    def get_grouper_updates(self, fid):
397        if self.grouper_url is None: return
398        d = tempfile.mkdtemp()
399        try:
400            fstr = "%s" % fid
401            # XXX locking
402            zipname = os.path.join(d, 'grouper.zip')
403            dest = os.path.join(self.auth_dir, 'update')
404            resp = urlopen('%s?uid=%s' % (self.grouper_url, fstr))
405            f = open(zipname, 'w')
406            f.write(resp.read())
407            f.close()
408            zf = zipfile.ZipFile(zipname, 'r')
409            zf.extractall(dest)
410            zf.close()
411        except URLError, e:
412            self.log.error("Cannot connect to grouper: %s" % e)
413            pass
414        finally:
415            shutil.rmtree(d)
416
417
418    def remove_dirs(self, dir):
419        """
420        Remove the directory tree and all files rooted at dir.  Log any errors,
421        but continue.
422        """
423        self.log.debug("[removedirs]: removing %s" % dir)
424        try:
425            for path, dirs, files in os.walk(dir, topdown=False):
426                for f in files:
427                    os.remove(os.path.join(path, f))
428                for d in dirs:
429                    os.rmdir(os.path.join(path, d))
430            os.rmdir(dir)
431        except EnvironmentError, e:
432            self.log.error("Error deleting directory tree in %s" % e);
433
434    @staticmethod
435    def make_temp_certfile(expcert, tmpdir):
436        """
437        make a protected copy of the access certificate so the experiment
438        controller can act as the experiment principal.  mkstemp is the most
439        secure way to do that. The directory should be created by
440        mkdtemp.  Return the filename.
441        """
442        if expcert and tmpdir:
443            try:
444                certf, certfn = tempfile.mkstemp(suffix=".pem", dir=tmpdir)
445                f = os.fdopen(certf, 'w')
446                print >> f, expcert
447                f.close()
448            except EnvironmentError, e:
449                raise service_error(service_error.internal, 
450                        "Cannot create temp cert file?")
451            return certfn
452        else:
453            return None
454
455       
456    def generate_ssh_keys(self, dest, type="rsa" ):
457        """
458        Generate a set of keys for the gateways to use to talk.
459
460        Keys are of type type and are stored in the required dest file.
461        """
462        valid_types = ("rsa", "dsa")
463        t = type.lower();
464        if t not in valid_types: raise ValueError
465        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
466
467        try:
468            trace = open("/dev/null", "w")
469        except EnvironmentError:
470            raise service_error(service_error.internal,
471                    "Cannot open /dev/null??");
472
473        # May raise CalledProcessError
474        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
475        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
476        if rv != 0:
477            raise service_error(service_error.internal, 
478                    "Cannot generate nonce ssh keys.  %s return code %d" \
479                            % (self.ssh_keygen, rv))
480
481    def generate_seer_certs(self, destdir):
482        '''
483        Create a SEER ca cert and a node cert in destdir/ca.pem and
484        destdir/node.pem respectively.  These will be distributed throughout
485        the federated experiment.  This routine reports errors via
486        service_errors.
487        '''
488        openssl = '/usr/bin/openssl'
489        # All the filenames and parameters we need for openssl calls below
490        ca_key =os.path.join(destdir, 'ca.key') 
491        ca_pem = os.path.join(destdir, 'ca.pem')
492        node_key =os.path.join(destdir, 'node.key') 
493        node_pem = os.path.join(destdir, 'node.pem')
494        node_req = os.path.join(destdir, 'node.req')
495        node_signed = os.path.join(destdir, 'node.signed')
496        days = '%s' % (365 * 10)
497        serial = '%s' % random.randint(0, 1<<16)
498
499        try:
500            # Sequence of calls to create a CA key, create a ca cert, create a
501            # node key, node signing request, and finally a signed node
502            # certificate.
503            sequence = (
504                    (openssl, 'genrsa', '-out', ca_key, '1024'),
505                    (openssl, 'req', '-new', '-x509', '-key', ca_key, '-out', 
506                        ca_pem, '-days', days, '-subj', 
507                        '/C=US/ST=CA/O=DETER/OU=fedd/CN=CA' ),
508                    (openssl, 'genrsa', '-out', node_key, '1024'),
509                    (openssl, 'req', '-new', '-key', node_key, '-out', 
510                        node_req, '-days', days, '-subj', 
511                        '/C=US/ST=CA/O=DETER/OU=fedd/CN=node' ),
512                    (openssl, 'x509', '-CA', ca_pem, '-CAkey', ca_key, 
513                        '-set_serial', serial, '-req', '-in', node_req, 
514                        '-out', node_signed, '-days', days),
515                )
516            # Do all that stuff; bail if there's an error, and push all the
517            # output to dev/null.
518            for cmd in sequence:
519                trace = open("/dev/null", "w")
520                rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
521                if rv != 0:
522                    raise service_error(service_error.internal, 
523                            "Cannot generate SEER certs.  %s return code %d" \
524                                    % (' '.join(cmd), rv))
525            # Concatinate the node key and signed certificate into node.pem
526            f = open(node_pem, 'w')
527            for comp in (node_signed, node_key):
528                g = open(comp, 'r')
529                f.write(g.read())
530                g.close()
531            f.close()
532
533            # Throw out intermediaries.
534            for fn in (ca_key, node_key, node_req, node_signed):
535                os.unlink(fn)
536
537        except EnvironmentError, e:
538            # Any difficulties with the file system wind up here
539            raise service_error(service_error.internal,
540                    "File error on  %s while creating SEER certs: %s" % \
541                            (e.filename, e.strerror))
542
543
544
545    def gentopo(self, str):
546        """
547        Generate the topology data structure from the splitter's XML
548        representation of it.
549
550        The topology XML looks like:
551            <experiment>
552                <nodes>
553                    <node><vname></vname><ips>ip1:ip2</ips></node>
554                </nodes>
555                <lans>
556                    <lan>
557                        <vname></vname><vnode></vnode><ip></ip>
558                        <bandwidth></bandwidth><member>node:port</member>
559                    </lan>
560                </lans>
561        """
562        class topo_parse:
563            """
564            Parse the topology XML and create the dats structure.
565            """
566            def __init__(self):
567                # Typing of the subelements for data conversion
568                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
569                self.int_subelements = ( 'bandwidth',)
570                self.float_subelements = ( 'delay',)
571                # The final data structure
572                self.nodes = [ ]
573                self.lans =  [ ]
574                self.topo = { \
575                        'node': self.nodes,\
576                        'lan' : self.lans,\
577                    }
578                self.element = { }  # Current element being created
579                self.chars = ""     # Last text seen
580
581            def end_element(self, name):
582                # After each sub element the contents is added to the current
583                # element or to the appropriate list.
584                if name == 'node':
585                    self.nodes.append(self.element)
586                    self.element = { }
587                elif name == 'lan':
588                    self.lans.append(self.element)
589                    self.element = { }
590                elif name in self.str_subelements:
591                    self.element[name] = self.chars
592                    self.chars = ""
593                elif name in self.int_subelements:
594                    self.element[name] = int(self.chars)
595                    self.chars = ""
596                elif name in self.float_subelements:
597                    self.element[name] = float(self.chars)
598                    self.chars = ""
599
600            def found_chars(self, data):
601                self.chars += data.rstrip()
602
603
604        tp = topo_parse();
605        parser = xml.parsers.expat.ParserCreate()
606        parser.EndElementHandler = tp.end_element
607        parser.CharacterDataHandler = tp.found_chars
608
609        parser.Parse(str)
610
611        return tp.topo
612       
613
614    def genviz(self, topo):
615        """
616        Generate the visualization the virtual topology
617        """
618
619        neato = "/usr/local/bin/neato"
620        # These are used to parse neato output and to create the visualization
621        # file.
622        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="([\d\.]+),([\d\.]+)"')
623        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
624                "%s</type></node>"
625
626        try:
627            # Node names
628            nodes = [ n['vname'] for n in topo['node'] ]
629            topo_lans = topo['lan']
630        except KeyError, e:
631            raise service_error(service_error.internal, "Bad topology: %s" %e)
632
633        lans = { }
634        links = { }
635
636        # Walk through the virtual topology, organizing the connections into
637        # 2-node connections (links) and more-than-2-node connections (lans).
638        # When a lan is created, it's added to the list of nodes (there's a
639        # node in the visualization for the lan).
640        for l in topo_lans:
641            if links.has_key(l['vname']):
642                if len(links[l['vname']]) < 2:
643                    links[l['vname']].append(l['vnode'])
644                else:
645                    nodes.append(l['vname'])
646                    lans[l['vname']] = links[l['vname']]
647                    del links[l['vname']]
648                    lans[l['vname']].append(l['vnode'])
649            elif lans.has_key(l['vname']):
650                lans[l['vname']].append(l['vnode'])
651            else:
652                links[l['vname']] = [ l['vnode'] ]
653
654
655        # Open up a temporary file for dot to turn into a visualization
656        try:
657            df, dotname = tempfile.mkstemp()
658            dotfile = os.fdopen(df, 'w')
659        except EnvironmentError:
660            raise service_error(service_error.internal,
661                    "Failed to open file in genviz")
662
663        try:
664            dnull = open('/dev/null', 'w')
665        except EnvironmentError:
666            service_error(service_error.internal,
667                    "Failed to open /dev/null in genviz")
668
669        # Generate a dot/neato input file from the links, nodes and lans
670        try:
671            print >>dotfile, "graph G {"
672            for n in nodes:
673                print >>dotfile, '\t"%s"' % n
674            for l in links.keys():
675                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
676            for l in lans.keys():
677                for n in lans[l]:
678                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
679            print >>dotfile, "}"
680            dotfile.close()
681        except TypeError:
682            raise service_error(service_error.internal,
683                    "Single endpoint link in vtopo")
684        except EnvironmentError:
685            raise service_error(service_error.internal, "Cannot write dot file")
686
687        # Use dot to create a visualization
688        try:
689            dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005',
690                '-Gmaxiter=2000', '-Gpack=true', dotname], stdout=PIPE,
691                stderr=dnull, close_fds=True)
692        except EnvironmentError:
693            raise service_error(service_error.internal, 
694                    "Cannot generate visualization: is graphviz available?")
695        dnull.close()
696
697        # Translate dot to vis format
698        vis_nodes = [ ]
699        vis = { 'node': vis_nodes }
700        for line in dot.stdout:
701            m = vis_re.match(line)
702            if m:
703                vn = m.group(1)
704                vis_node = {'name': vn, \
705                        'x': float(m.group(2)),\
706                        'y' : float(m.group(3)),\
707                    }
708                if vn in links.keys() or vn in lans.keys():
709                    vis_node['type'] = 'lan'
710                else:
711                    vis_node['type'] = 'node'
712                vis_nodes.append(vis_node)
713        rv = dot.wait()
714
715        os.remove(dotname)
716        # XXX: graphviz seems to use low return codes for warnings, like
717        # "couldn't find font"
718        if rv < 2 : return vis
719        else: return None
720
721
722    def release_access(self, tb, aid, tbmap=None, uri=None, cert_file=None,
723            cert_pwd=None):
724        """
725        Release access to testbed through fedd
726        """
727
728        if not uri and tbmap:
729            uri = tbmap.get(tb, None)
730        if not uri:
731            raise service_error(service_error.server_config, 
732                    "Unknown testbed: %s" % tb)
733
734        if self.local_access.has_key(uri):
735            resp = self.local_access[uri].ReleaseAccess(\
736                    { 'ReleaseAccessRequestBody' : 
737                        {'allocID': {'fedid': aid}},}, 
738                    fedid(file=cert_file))
739            resp = { 'ReleaseAccessResponseBody': resp } 
740        else:
741            resp = self.call_ReleaseAccess(uri, {'allocID': {'fedid': aid} },
742                    cert_file, cert_pwd, self.trusted_certs)
743
744        # better error coding
745
746    def remote_ns2topdl(self, uri, desc):
747
748        req = {
749                'description' : { 'ns2description': desc },
750            }
751
752        r = self.call_Ns2Topdl(uri, req, self.cert_file, self.cert_pwd, 
753                self.trusted_certs)
754
755        if r.has_key('Ns2TopdlResponseBody'):
756            r = r['Ns2TopdlResponseBody']
757            ed = r.get('experimentdescription', None)
758            if ed.has_key('topdldescription'):
759                return topdl.Topology(**ed['topdldescription'])
760            else:
761                raise service_error(service_error.protocol, 
762                        "Bad splitter response (no output)")
763        else:
764            raise service_error(service_error.protocol, "Bad splitter response")
765
766    class start_segment:
767        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
768                cert_pwd=None, trusted_certs=None, caller=None,
769                log_collector=None):
770            self.log = log
771            self.debug = debug
772            self.cert_file = cert_file
773            self.cert_pwd = cert_pwd
774            self.trusted_certs = None
775            self.caller = caller
776            self.testbed = testbed
777            self.log_collector = log_collector
778            self.response = None
779            self.node = { }
780            self.subs = { }
781            self.tb = { }
782            self.proof = None
783
784        def make_map(self, resp):
785            if 'segmentdescription' not in resp  or \
786                    'topdldescription' not in resp['segmentdescription']:
787                self.log.warn('No topology returned from startsegment')
788                return 
789
790            top = topdl.Topology(
791                    **resp['segmentdescription']['topdldescription'])
792
793            for e in top.elements:
794                if isinstance(e, topdl.Computer):
795                    self.node[e.name] = e
796                elif isinstance(e, topdl.Testbed):
797                    self.tb[e.uri] = e
798            for s in top.substrates:
799                self.subs[s.name] = s
800
801        def __call__(self, uri, aid, topo, masters, attrs=None, connInfo=None):
802            req = {
803                    'allocID': { 'fedid' : aid }, 
804                    'segmentdescription': { 
805                        'topdldescription': topo.to_dict(),
806                    },
807                }
808
809            if connInfo:
810                req['connection'] = connInfo
811
812            import_svcs = [ s for m in masters.values() \
813                    for s in m if self.testbed in s.importers]
814
815            if import_svcs or self.testbed in masters:
816                req['service'] = []
817
818            for s in import_svcs:
819                for r in s.reqs:
820                    sr = copy.deepcopy(r)
821                    sr['visibility'] = 'import';
822                    req['service'].append(sr)
823
824            for s in masters.get(self.testbed, []):
825                for r in s.reqs:
826                    sr = copy.deepcopy(r)
827                    sr['visibility'] = 'export';
828                    req['service'].append(sr)
829
830            if attrs:
831                req['fedAttr'] = attrs
832
833            try:
834                self.log.debug("Calling StartSegment at %s " % uri)
835                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
836                        self.trusted_certs)
837                if r.has_key('StartSegmentResponseBody'):
838                    lval = r['StartSegmentResponseBody'].get('allocationLog',
839                            None)
840                    if lval and self.log_collector:
841                        for line in  lval.splitlines(True):
842                            self.log_collector.write(line)
843                    self.make_map(r['StartSegmentResponseBody'])
844                    if 'proof' in r: self.proof = r['proof']
845                    self.response = r
846                else:
847                    raise service_error(service_error.internal, 
848                            "Bad response!?: %s" %r)
849                return True
850            except service_error, e:
851                self.log.error("Start segment failed on %s: %s" % \
852                        (self.testbed, e))
853                return False
854
855
856
857    class terminate_segment:
858        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
859                cert_pwd=None, trusted_certs=None, caller=None):
860            self.log = log
861            self.debug = debug
862            self.cert_file = cert_file
863            self.cert_pwd = cert_pwd
864            self.trusted_certs = None
865            self.caller = caller
866            self.testbed = testbed
867
868        def __call__(self, uri, aid ):
869            req = {
870                    'allocID': {'fedid': aid }, 
871                }
872            self.log.info("Calling terminate segment")
873            try:
874                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
875                        self.trusted_certs)
876                self.log.info("Terminate segment succeeded")
877                return True
878            except service_error, e:
879                self.log.error("Terminate segment failed on %s: %s" % \
880                        (self.testbed, e))
881                return False
882
883    class info_segment(start_segment):
884        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
885                cert_pwd=None, trusted_certs=None, caller=None,
886                log_collector=None):
887            experiment_control_local.start_segment.__init__(self, debug, 
888                    log, testbed, cert_file, cert_pwd, trusted_certs, 
889                    caller, log_collector)
890
891        def __call__(self, uri, aid):
892            req = { 'allocID': { 'fedid' : aid } }
893
894            try:
895                self.log.debug("Calling InfoSegment at %s " % uri)
896                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
897                        self.trusted_certs)
898                if r.has_key('InfoSegmentResponseBody'):
899                    self.make_map(r['InfoSegmentResponseBody'])
900                    if 'proof' in r: self.proof = r['proof']
901                    self.response = r
902                else:
903                    raise service_error(service_error.internal, 
904                            "Bad response!?: %s" %r)
905                return True
906            except service_error, e:
907                self.log.error("Info segment failed on %s: %s" % \
908                        (self.testbed, e))
909                return False
910
911    class operation_segment:
912        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
913                cert_pwd=None, trusted_certs=None, caller=None,
914                log_collector=None):
915            self.log = log
916            self.debug = debug
917            self.cert_file = cert_file
918            self.cert_pwd = cert_pwd
919            self.trusted_certs = None
920            self.caller = caller
921            self.testbed = testbed
922            self.status = None
923
924        def __call__(self, uri, aid, op, targets, params):
925            req = { 
926                    'allocID': { 'fedid' : aid },
927                    'operation': op,
928                    'target': targets,
929                    }
930            if params: req['parameter'] = params
931
932
933            try:
934                self.log.debug("Calling OperationSegment at %s " % uri)
935                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
936                        self.trusted_certs)
937                if 'OperationSegmentResponseBody' in r:
938                    r = r['OperationSegmentResponseBody']
939                    if 'status' in r:
940                        self.status = r['status']
941                else:
942                    raise service_error(service_error.internal, 
943                            "Bad response!?: %s" %r)
944                return True
945            except service_error, e:
946                self.log.error("Operation segment failed on %s: %s" % \
947                        (self.testbed, e))
948                return False
949
950    def annotate_topology(self, top, data):
951        # These routines do various parts of the annotation
952        def add_new_names(nl, l):
953            """ add any names in nl to the list in l """
954            for n in nl:
955                if n not in l: l.append(n)
956       
957        def merge_services(ne, e):
958            for ns in ne.service:
959                # NB: the else is on the for
960                for s in e.service:
961                    if ns.name == s.name:
962                        s.importer = ns.importer
963                        s.param = ns.param
964                        s.description = ns.description
965                        s.status = ns.status
966                        break
967                else:
968                    e.service.append(ns)
969       
970        def merge_oses(ne, e):
971            """
972            Merge the operating system entries of ne into e
973            """
974            for nos in ne.os:
975                # NB: the else is on the for
976                for os in e.os:
977                    if nos.name == os.name:
978                        os.version = nos.version
979                        os.version = nos.distribution
980                        os.version = nos.distributionversion
981                        for a in nos.attribute:
982                            if os.get_attribute(a.attribute):
983                                os.remove_attribute(a.attribute)
984                            os.set_attribute(a.attribute, a.value)
985                        break
986                else:
987                    # If both nodes have one OS, this is a replacement
988                    if len(ne.os) == 1 and len(e.os) == 1: e.os = ne.os
989                    else: e.os.append(nos)
990
991        # Annotate the topology with embedding info
992        for e in top.elements:
993            if isinstance(e, topdl.Computer):
994                for s in data:
995                    ne = s.node.get(e.name, None)
996                    if ne is not None:
997                        add_new_names(ne.localname, e.localname)
998                        e.status = ne.status
999                        merge_services(ne, e)
1000                        add_new_names(ne.operation, e.operation)
1001                        if ne.os: merge_oses(ne, e)
1002                        break
1003            elif isinstance(e,topdl.Testbed):
1004                for s in data:
1005                    ne = s.tb.get(e.uri, None)
1006                    if ne is not None:
1007                        add_new_names(ne.localname, e.localname)
1008                        add_new_names(ne.operation, e.operation)
1009                        merge_services(ne, e)
1010                        for a in ne.attribute:
1011                            e.set_attribute(a.attribute, a.value)
1012        # Annotate substrates
1013        for s in top.substrates:
1014            for d in data:
1015                ss = d.subs.get(s.name, None)
1016                if ss is not None:
1017                    if ss.capacity is not None:
1018                        s.capacity = ss.capacity
1019                    if s.latency is not None:
1020                        s.latency = ss.latency
1021
1022
1023
1024    def allocate_resources(self, allocated, masters, eid, expid, 
1025            tbparams, top, topo, tmpdir, alloc_log=None, log_collector=None, 
1026            attrs=None, connInfo={}, tbmap=None, expcert=None):
1027
1028        started = { }           # Testbeds where a sub-experiment started
1029                                # successfully
1030
1031        # XXX
1032        fail_soft = False
1033
1034        if tbmap is None: tbmap = { }
1035
1036        log = alloc_log or self.log
1037
1038        tp = thread_pool(self.nthreads)
1039        threads = [ ]
1040        starters = [ ]
1041
1042        if expcert:
1043            cert = expcert
1044            pw = None
1045        else:
1046            cert = self.cert_file
1047            pw = self.cert_pwd
1048
1049        for tb in allocated.keys():
1050            # Create and start a thread to start the segment, and save it
1051            # to get the return value later
1052            tb_attrs = copy.copy(attrs)
1053            tp.wait_for_slot()
1054            uri = tbparams[tb].uri or tbmap.get(testbed_base(tb), None)
1055            base, suffix = split_testbed(tb)
1056            if suffix:
1057                tb_attrs.append({'attribute': 'experiment_name', 
1058                    'value': "%s-%s" % (eid, suffix)})
1059            else:
1060                tb_attrs.append({'attribute': 'experiment_name', 'value': eid})
1061            if not uri:
1062                raise service_error(service_error.internal, 
1063                        "Unknown testbed %s !?" % tb)
1064
1065            aid = tbparams[tb].allocID
1066            if not aid:
1067                raise service_error(service_error.internal, 
1068                        "No alloc id for testbed %s !?" % tb)
1069
1070            s = self.start_segment(log=log, debug=self.debug,
1071                    testbed=tb, cert_file=cert,
1072                    cert_pwd=pw, trusted_certs=self.trusted_certs,
1073                    caller=self.call_StartSegment,
1074                    log_collector=log_collector)
1075            starters.append(s)
1076            t  = pooled_thread(\
1077                    target=s, name=tb,
1078                    args=(uri, aid, topo[tb], masters, tb_attrs, connInfo[tb]),
1079                    pdata=tp, trace_file=self.trace_file)
1080            threads.append(t)
1081            t.start()
1082
1083        # Wait until all finish (keep pinging the log, though)
1084        mins = 0
1085        revoked = False
1086        while not tp.wait_for_all_done(60.0):
1087            mins += 1
1088            alloc_log.info("Waiting for sub threads (it has been %d mins)" \
1089                    % mins)
1090            if not revoked and \
1091                    len([ t.getName() for t in threads if t.rv == False]) > 0:
1092                # a testbed has failed.  Revoke this experiment's
1093                # synchronizarion values so that sub experiments will not
1094                # deadlock waiting for synchronization that will never happen
1095                self.log.info("A subexperiment has failed to swap in, " + \
1096                        "revoking synch keys")
1097                var_key = "fedid:%s" % expid
1098                for k in self.synch_store.all_keys():
1099                    if len(k) > 45 and k[0:46] == var_key:
1100                        self.synch_store.revoke_key(k)
1101                revoked = True
1102
1103        failed = [ t.getName() for t in threads if not t.rv ]
1104        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1105
1106        # If one failed clean up, unless fail_soft is set
1107        if failed:
1108            if not fail_soft:
1109                tp.clear()
1110                for tb in succeeded:
1111                    # Create and start a thread to stop the segment
1112                    tp.wait_for_slot()
1113                    uri = tbparams[tb].uri
1114                    t  = pooled_thread(\
1115                            target=self.terminate_segment(log=log,
1116                                testbed=tb,
1117                                cert_file=cert, 
1118                                cert_pwd=pw,
1119                                trusted_certs=self.trusted_certs,
1120                                caller=self.call_TerminateSegment),
1121                            args=(uri, tbparams[tb].allocID),
1122                            name=tb,
1123                            pdata=tp, trace_file=self.trace_file)
1124                    t.start()
1125                # Wait until all finish (if any are being stopped)
1126                if succeeded:
1127                    tp.wait_for_all_done()
1128
1129                # release the allocations
1130                for tb in tbparams.keys():
1131                    try:
1132                        self.release_access(tb, tbparams[tb].allocID, 
1133                                tbmap=tbmap, uri=tbparams[tb].uri,
1134                                cert_file=cert, cert_pwd=pw)
1135                    except service_error, e:
1136                        self.log.warn("Error releasing access: %s" % e.desc)
1137                # Remove the placeholder
1138                self.state_lock.acquire()
1139                self.state[eid].status = 'failed'
1140                self.state[eid].updated()
1141                if self.state_filename: self.write_state()
1142                self.state_lock.release()
1143                # Remove the repo dir
1144                self.remove_dirs("%s/%s" %(self.repodir, expid))
1145                # Walk up tmpdir, deleting as we go
1146                if self.cleanup:
1147                    self.remove_dirs(tmpdir)
1148                else:
1149                    log.debug("[start_experiment]: not removing %s" % tmpdir)
1150
1151
1152                log.error("Swap in failed on %s" % ",".join(failed))
1153                return
1154        else:
1155            # Walk through the successes and gather the proofs
1156            proofs = { }
1157            for s in starters:
1158                if s.proof:
1159                    proofs[s.testbed] = s.proof
1160            self.annotate_topology(top, starters)
1161            log.info("[start_segment]: Experiment %s active" % eid)
1162
1163
1164        # Walk up tmpdir, deleting as we go
1165        if self.cleanup:
1166            self.remove_dirs(tmpdir)
1167        else:
1168            log.debug("[start_experiment]: not removing %s" % tmpdir)
1169
1170        # Insert the experiment into our state and update the disk copy.
1171        self.state_lock.acquire()
1172        self.state[expid].status = 'active'
1173        self.state[eid] = self.state[expid]
1174        self.state[eid].top = top
1175        self.state[eid].updated()
1176        # Append startup proofs
1177        for f in self.state[eid].get_all_allocations():
1178            if f.tb in proofs:
1179                f.proof.append(proofs[f.tb])
1180
1181        if self.state_filename: self.write_state()
1182        self.state_lock.release()
1183        return
1184
1185
1186    def add_kit(self, e, kit):
1187        """
1188        Add a Software object created from the list of (install, location)
1189        tuples passed as kit  to the software attribute of an object e.  We
1190        do this enough to break out the code, but it's kind of a hack to
1191        avoid changing the old tuple rep.
1192        """
1193
1194        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1195
1196        if isinstance(e.software, list): e.software.extend(s)
1197        else: e.software = s
1198
1199    def append_experiment_authorization(self, expid, attrs, 
1200            need_state_lock=True):
1201        """
1202        Append the authorization information to system state
1203        """
1204
1205        for p, a in attrs:
1206            # PNNL Debug
1207            self.log.debug("Append auth: %s %s" % (p, a))
1208            if p and a:
1209                # Improperly configured overrides can add bad principals.
1210                self.auth.set_attribute(p, a)
1211            else:
1212                self.log.debug("Bad append experiment_authorization %s %s" \
1213                        % (p, a))
1214        self.auth.save()
1215
1216        if need_state_lock: self.state_lock.acquire()
1217        # XXX: really a no op?
1218        #self.state[expid]['auth'].update(attrs)
1219        if self.state_filename: self.write_state()
1220        if need_state_lock: self.state_lock.release()
1221
1222    def clear_experiment_authorization(self, expid, need_state_lock=True):
1223        """
1224        Attrs is a set of attribute principal pairs that need to be removed
1225        from the authenticator.  Remove them and save the authenticator.
1226        """
1227
1228        if need_state_lock: self.state_lock.acquire()
1229        # XXX: should be a no-op
1230        #if expid in self.state and 'auth' in self.state[expid]:
1231            #for p, a in self.state[expid]['auth']:
1232                #self.auth.unset_attribute(p, a)
1233            #self.state[expid]['auth'] = set()
1234        if self.state_filename: self.write_state()
1235        if need_state_lock: self.state_lock.release()
1236        self.auth.save()
1237
1238
1239    def create_experiment_state(self, fid, req, expid, expcert,
1240            state='starting'):
1241        """
1242        Create the initial entry in the experiment's state.  The expid and
1243        expcert are the experiment's fedid and certifacte that represents that
1244        ID, which are installed in the experiment state.  If the request
1245        includes a suggested local name that is used if possible.  If the local
1246        name is already taken by an experiment owned by this user that has
1247        failed, it is overwritten.  Otherwise new letters are added until a
1248        valid localname is found.  The generated local name is returned.
1249        """
1250
1251        if req.has_key('experimentID') and \
1252                req['experimentID'].has_key('localname'):
1253            overwrite = False
1254            eid = req['experimentID']['localname']
1255            # If there's an old failed experiment here with the same local name
1256            # and accessible by this user, we'll overwrite it, otherwise we'll
1257            # fall through and do the collision avoidance.
1258            old_expid = self.get_experiment_fedid(eid)
1259            if old_expid:
1260                users_experiment = True
1261                try:
1262                    self.check_experiment_access(fid, old_expid)
1263                except service_error, e:
1264                    if e.code == service_error.access: users_experiment = False
1265                    else: raise e
1266                if users_experiment:
1267                    self.state_lock.acquire()
1268                    status = self.state[eid].status
1269                    if status and status == 'failed':
1270                        # remove the old access attributes
1271                        self.clear_experiment_authorization(eid,
1272                                need_state_lock=False)
1273                        overwrite = True
1274                        del self.state[eid]
1275                        del self.state[old_expid]
1276                    self.state_lock.release()
1277                else:
1278                    self.log.info('Experiment %s exists, ' % eid + \
1279                            'but this user cannot access it')
1280            self.state_lock.acquire()
1281            while (self.state.has_key(eid) and not overwrite):
1282                eid += random.choice(string.ascii_letters)
1283            # Initial state
1284            self.state[eid] = experiment_info(fedid=expid, localname=eid, 
1285                    identity=expcert)
1286            self.state[expid] = self.state[eid]
1287            if self.state_filename: self.write_state()
1288            self.state_lock.release()
1289        else:
1290            eid = self.exp_stem
1291            for i in range(0,5):
1292                eid += random.choice(string.ascii_letters)
1293            self.state_lock.acquire()
1294            while (self.state.has_key(eid)):
1295                eid = self.exp_stem
1296                for i in range(0,5):
1297                    eid += random.choice(string.ascii_letters)
1298            # Initial state
1299            self.state[eid] = experiment_info(fedid=expid, localname=eid, 
1300                    identity=expcert)
1301            self.state[expid] = self.state[eid]
1302            if self.state_filename: self.write_state()
1303            self.state_lock.release()
1304
1305        # Let users touch the state.  Authorize this fid and the expid itself
1306        # to touch the experiment, as well as allowing th eoverrides.
1307        self.append_experiment_authorization(eid, 
1308                set([(fid, expid), (expid,expid)] + \
1309                        [ (o, expid) for o in self.overrides]))
1310
1311        return eid
1312
1313
1314    def allocate_ips_to_topo(self, top):
1315        """
1316        Add an ip4_address attribute to all the hosts in the topology that have
1317        not already been assigned one by the user, based on the shared
1318        substrates on which they sit.  An /etc/hosts file is also created and
1319        returned as a list of hostfiles entries.  We also return the allocator,
1320        because we may need to allocate IPs to portals
1321        (specifically DRAGON portals).
1322        """
1323        subs = sorted(top.substrates, 
1324                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1325                reverse=True)
1326        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1327        ifs = { }
1328        hosts = [ ]
1329
1330        assigned = { }
1331        assigned_subs = set()
1332        # Look for IP networks that were already assigned to the experiment by
1333        # the user.  We assume that users are smart in making that assignment,
1334        # but we check them later.
1335        for e in top.elements:
1336            if not isinstance(e, topdl.Computer): continue
1337            for inf in e.interface:
1338                a = inf.get_attribute('ip4_address')
1339                if not a: continue
1340                for s in inf.substrate:
1341                    assigned_subs.add(s)
1342                a = ip_addr(a)
1343                n = ip_addr(inf.get_attribute('ip4_netmask'))
1344
1345                sz = 0x100000000 - long(n)
1346                net = (long(a) & long(n)) & 0xffffffff
1347                if net in assigned:
1348                    if sz > assigned[net]: assigned[net] = sz
1349                assigned[net] = sz
1350
1351        # XXX: hack attack - always avoid 10.0.23.0 which is special on DETER
1352        assigned[long(ip_addr('10.0.23.0'))] = 256
1353        # end of hack
1354
1355        # Walk all substrates, smallest to largest, assigning IP addresses to
1356        # the unassigned and checking the assigned.  Build an /etc/hosts file
1357        # as well.
1358        for idx, s in enumerate(subs):
1359            if s.name not in assigned_subs:
1360                net_size = len(s.interfaces)+2
1361
1362                # Get an ip allocation for this substrate.  Make sure the
1363                # allocation does not collide with any user allocations.
1364                ok = False
1365                while not ok:
1366                    a = ips.allocate(net_size)
1367                    if not a:
1368                        raise service_error(service_error.req, 
1369                                "Cannot allocate IP addresses")
1370                    base, num = a
1371                    if num < net_size: 
1372                        raise service_error(service_error.internal,
1373                                "Allocator returned wrong number of IPs??")
1374                    # Check for overlap.  An assigned network could contain an
1375                    # endpoint of the new allocation or the new allocation
1376                    # could contain an endpoint of an assigned network.
1377                    # NB the else is attached to the for - if the loop is not
1378                    # exited by a break, ok = True.
1379                    for n, sz in assigned.items():
1380                        if base >= n and base < n + sz:
1381                            ok = False
1382                            break
1383                        if base + num > n and base + num  < n + sz:
1384                            ok = False
1385                            break
1386                        if n >= base and n < base + num:
1387                            ok = False
1388                            break
1389                        if n+sz > base and n+sz < base + num:
1390                            ok = False
1391                            break
1392                    else:
1393                        ok = True
1394                           
1395                mask = ips.min_alloc
1396                while mask < net_size:
1397                    mask *= 2
1398
1399                netmask = ((2**32-1) ^ (mask-1))
1400            else:
1401                base = 0
1402
1403            # Add the attributes and build hosts
1404            base += 1
1405            for i in s.interfaces:
1406                # Add address and masks to interfaces on unassigned substrates
1407                if s.name not in assigned_subs:
1408                    i.attribute.append(
1409                            topdl.Attribute('ip4_address', 
1410                                "%s" % ip_addr(base)))
1411                    i.attribute.append(
1412                            topdl.Attribute('ip4_netmask', 
1413                                "%s" % ip_addr(int(netmask))))
1414
1415                # Make /etc/hosts entries
1416                addr = i.get_attribute('ip4_address')
1417                if addr is None:
1418                    raise service_error(service_error.req, 
1419                            "Partially assigned substrate %s" %s.name)
1420                hname = i.element.name
1421                if hname in ifs:
1422                    hosts.append("%s\t%s-%s %s-%d" % \
1423                            (addr, hname, s.name, hname, ifs[hname]))
1424                else:
1425                    # First IP allocated it the default ip according to hosts,
1426                    # so the extra alias is here.
1427                    ifs[hname] = 0
1428                    hosts.append("%s\t%s-%s %s-%d %s" % \
1429                            (addr, hname, s.name, hname, ifs[hname], hname))
1430
1431                ifs[hname] += 1
1432                base += 1
1433        return hosts, ips
1434
1435    def get_access_to_testbeds(self, testbeds, fid, allocated, 
1436            tbparam, masters, tbmap, expid=None, expcert=None):
1437        for tb in testbeds:
1438            self.get_access(tb, tbparam, fid, masters, tbmap, expid,
1439                    expcert)
1440            allocated[tb] = 1
1441
1442    def get_access(self, tb, tbparam,fid, masters, tbmap, expid=None, 
1443            expcert=None):
1444        """
1445        Get access to testbed through fedd and set the parameters for that tb
1446        """
1447        def get_export_project(svcs):
1448            """
1449            Look through for the list of federated_service for this testbed
1450            objects for a project_export service, and extract the project
1451            parameter.
1452            """
1453
1454            pe = [s for s in svcs if s.name=='project_export']
1455            if len(pe) == 1:
1456                return pe[0].params.get('project', None)
1457            elif len(pe) == 0:
1458                return None
1459            else:
1460                raise service_error(service_error.req,
1461                        "More than one project export is not supported")
1462
1463        def add_services(svcs, type, slist, keys):
1464            """
1465            Add the given services to slist.  type is import or export.  Also
1466            add a mapping entry from the assigned id to the original service
1467            record.
1468            """
1469            for i, s in enumerate(svcs):
1470                idx = '%s%d' % (type, i)
1471                keys[idx] = s
1472                sr = {'id': idx, 'name': s.name, 'visibility': type }
1473                if s.params:
1474                    sr['fedAttr'] = [ { 'attribute': k, 'value': v } \
1475                            for k, v in s.params.items()]
1476                slist.append(sr)
1477
1478        uri = tbmap.get(testbed_base(tb), None)
1479        if not uri:
1480            raise service_error(service_error.server_config, 
1481                    "Unknown testbed: %s" % tb)
1482
1483        export_svcs = masters.get(tb,[])
1484        import_svcs = [ s for m in masters.values() \
1485                for s in m \
1486                    if tb in s.importers ]
1487
1488        export_project = get_export_project(export_svcs)
1489        # Compose the credential list so that IDs come before attributes
1490        creds = set()
1491        keys = set()
1492        certs = self.auth.get_creds_for_principal(fid)
1493        # Append credenials about this experiment controller - e.g. that it is
1494        # trusted.
1495        certs.update(self.auth.get_creds_for_principal(
1496            fedid(file=self.cert_file)))
1497        if expid:
1498            certs.update(self.auth.get_creds_for_principal(expid))
1499        for c in certs:
1500            keys.add(c.issuer_cert())
1501            creds.add(c.attribute_cert())
1502        creds = list(keys) + list(creds)
1503
1504        if expcert: cert, pw = expcert, None
1505        else: cert, pw = self.cert_file, self.cert_pw
1506
1507        # Request credentials
1508        req = {
1509                'abac_credential': creds,
1510            }
1511        # Make the service request from the services we're importing and
1512        # exporting.  Keep track of the export request ids so we can
1513        # collect the resulting info from the access response.
1514        e_keys = { }
1515        if import_svcs or export_svcs:
1516            slist = []
1517            add_services(import_svcs, 'import', slist, e_keys)
1518            add_services(export_svcs, 'export', slist, e_keys)
1519            req['service'] = slist
1520
1521        if self.local_access.has_key(uri):
1522            # Local access call
1523            req = { 'RequestAccessRequestBody' : req }
1524            r = self.local_access[uri].RequestAccess(req, 
1525                    fedid(file=self.cert_file))
1526            r = { 'RequestAccessResponseBody' : r }
1527        else:
1528            r = self.call_RequestAccess(uri, req, cert, pw, self.trusted_certs)
1529
1530        if r.has_key('RequestAccessResponseBody'):
1531            # Through to here we have a valid response, not a fault.
1532            # Access denied is a fault, so something better or worse than
1533            # access denied has happened.
1534            r = r['RequestAccessResponseBody']
1535            self.log.debug("[get_access] Access granted")
1536        else:
1537            raise service_error(service_error.protocol,
1538                        "Bad proxy response")
1539        if 'proof' not in r:
1540            raise service_error(service_error.protocol,
1541                        "Bad access response (no access proof)")
1542
1543        tbparam[tb] = allocation_info(allocID=r['allocID'].get('fedid', None),
1544                tb=tb, uri=uri, proof=[r['proof']], 
1545                services=masters.get(tb, None))
1546
1547        # Collect the responses corresponding to the services this testbed
1548        # exports.  These will be the service requests that we will include in
1549        # the start segment requests (with appropriate visibility values) to
1550        # import and export the segments.
1551        for s in r.get('service', []):
1552            id = s.get('id', None)
1553            # Note that this attaches the response to the object in the masters
1554            # data structure.  (The e_keys index disappears when this fcn
1555            # returns)
1556            if id and id in e_keys:
1557                e_keys[id].reqs.append(s)
1558
1559        # Add attributes to parameter space.  We don't allow attributes to
1560        # overlay any parameters already installed.
1561        for a in r.get('fedAttr', []):
1562            try:
1563                if a['attribute']:
1564                    tbparam[tb].set_attribute(a['attribute'], a['value'])
1565            except KeyError:
1566                self.log.error("Bad attribute in response: %s" % a)
1567
1568
1569    def split_topology(self, top, topo, testbeds):
1570        """
1571        Create the sub-topologies that are needed for experiment instantiation.
1572        """
1573        for tb in testbeds:
1574            topo[tb] = top.clone()
1575            # copy in for loop allows deletions from the original
1576            for e in [ e for e in topo[tb].elements]:
1577                etb = e.get_attribute('testbed')
1578                # NB: elements without a testbed attribute won't appear in any
1579                # sub topologies. 
1580                if not etb or etb != tb:
1581                    for i in e.interface:
1582                        for s in i.subs:
1583                            try:
1584                                s.interfaces.remove(i)
1585                            except ValueError:
1586                                raise service_error(service_error.internal,
1587                                        "Can't remove interface??")
1588                    topo[tb].elements.remove(e)
1589            topo[tb].make_indices()
1590
1591    def confirm_software(self, top):
1592        """
1593        Make sure that the software to be loaded in the topo is all available
1594        before we begin making access requests, etc.  This is a subset of
1595        wrangle_software.
1596        """
1597        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1598        pkgs.update([x.location for e in top.elements for x in e.software])
1599
1600        for pkg in pkgs:
1601            loc = pkg
1602
1603            scheme, host, path = urlparse(loc)[0:3]
1604            dest = os.path.basename(path)
1605            if not scheme:
1606                if not loc.startswith('/'):
1607                    loc = "/%s" % loc
1608                loc = "file://%s" %loc
1609            # NB: if scheme was found, loc == pkg
1610            try:
1611                u = urlopen(loc)
1612                u.close()
1613            except Exception, e:
1614                raise service_error(service_error.req, 
1615                        "Cannot open %s: %s" % (loc, e))
1616        return True
1617
1618    def wrangle_software(self, expid, top, topo, tbparams):
1619        """
1620        Copy software out to the repository directory, allocate permissions and
1621        rewrite the segment topologies to look for the software in local
1622        places.
1623        """
1624
1625        # XXX: PNNL debug
1626        self.log.debug("calling wrangle software")
1627        # Copy the rpms and tarfiles to a distribution directory from
1628        # which the federants can retrieve them
1629        linkpath = "%s/software" %  expid
1630        softdir ="%s/%s" % ( self.repodir, linkpath)
1631        softmap = { }
1632
1633        # self.fedkit and self.gateway kit are lists of tuples of
1634        # (install_location, download_location) this extracts the download
1635        # locations.
1636        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1637        pkgs.update([x.location for e in top.elements for x in e.software])
1638        try:
1639            # XXX: PNNL debug
1640            self.log.debug("wrangle software: makedirs")
1641            os.makedirs(softdir)
1642            # XXX: PNNL debug
1643            self.log.debug("wrangle software: makedirs success")
1644        except EnvironmentError, e:
1645            raise service_error(
1646                    "Cannot create software directory: %s" % e)
1647        # The actual copying.  Everything's converted into a url for copying.
1648        auth_attrs = set()
1649        for pkg in pkgs:
1650            loc = pkg
1651
1652            scheme, host, path = urlparse(loc)[0:3]
1653            dest = os.path.basename(path)
1654            if not scheme:
1655                if not loc.startswith('/'):
1656                    loc = "/%s" % loc
1657                loc = "file://%s" %loc
1658            # NB: if scheme was found, loc == pkg
1659            try:
1660                u = urlopen(loc)
1661            except Exception, e:
1662                raise service_error(service_error.req, 
1663                        "Cannot open %s: %s" % (loc, e))
1664            try:
1665                f = open("%s/%s" % (softdir, dest) , "w")
1666                self.log.debug("Writing %s/%s" % (softdir,dest) )
1667                data = u.read(4096)
1668                while data:
1669                    f.write(data)
1670                    data = u.read(4096)
1671                f.close()
1672                u.close()
1673            except Exception, e:
1674                raise service_error(service_error.internal,
1675                        "Could not copy %s: %s" % (loc, e))
1676            path = re.sub("/tmp", "", linkpath)
1677            # XXX
1678            softmap[pkg] = \
1679                    "%s/%s/%s" %\
1680                    ( self.repo_url, path, dest)
1681
1682            # Allow the individual segments to access the software by assigning
1683            # an attribute to each testbed allocation that encodes the data to
1684            # be released.  This expression collects the data for each run of
1685            # the loop.
1686            auth_attrs.update([
1687                (tbparams[tb].allocID, "/%s/%s" % ( path, dest)) \
1688                        for tb in tbparams.keys()])
1689
1690        self.append_experiment_authorization(expid, auth_attrs)
1691
1692        # Convert the software locations in the segments into the local
1693        # copies on this host
1694        for soft in [ s for tb in topo.values() \
1695                for e in tb.elements \
1696                    if getattr(e, 'software', False) \
1697                        for s in e.software ]:
1698            if softmap.has_key(soft.location):
1699                soft.location = softmap[soft.location]
1700
1701
1702    def new_experiment(self, req, fid):
1703        """
1704        The external interface to empty initial experiment creation called from
1705        the dispatcher.
1706
1707        Creates a working directory, splits the incoming description using the
1708        splitter script and parses out the avrious subsections using the
1709        lcasses above.  Once each sub-experiment is created, use pooled threads
1710        to instantiate them and start it all up.
1711        """
1712        self.log.info("New experiment call started for %s" % fid)
1713        req = req.get('NewRequestBody', None)
1714        if not req:
1715            raise service_error(service_error.req,
1716                    "Bad request format (no NewRequestBody)")
1717
1718        # import may partially succeed so always save credentials and warn
1719        if not self.auth.import_credentials(data_list=req.get('credential', [])):
1720            self.log.debug("Failed to import delegation credentials(!)")
1721        self.get_grouper_updates(fid)
1722        self.auth.update()
1723        self.auth.save()
1724
1725        try:
1726            access_ok, proof = self.auth.check_attribute(fid, 'new', 
1727                    with_proof=True)
1728        except service_error, e:
1729            self.log.info("New experiment call for %s: access denied" % fid)
1730            raise e
1731
1732
1733        if not access_ok:
1734            self.log.info("New experiment call for %s: Access denied" % fid)
1735            raise service_error(service_error.access, "New access denied",
1736                    proof=[proof])
1737
1738        try:
1739            tmpdir = tempfile.mkdtemp(prefix="split-")
1740        except EnvironmentError:
1741            raise service_error(service_error.internal, "Cannot create tmp dir")
1742
1743        # Generate an ID for the experiment (slice) and a certificate that the
1744        # allocator can use to prove they own it.  We'll ship it back through
1745        # the encrypted connection.  If the requester supplied one, use it.
1746        if 'experimentAccess' in req and 'X509' in req['experimentAccess']:
1747            expcert = req['experimentAccess']['X509']
1748            expid = fedid(certstr=expcert)
1749            self.state_lock.acquire()
1750            if expid in self.state:
1751                self.state_lock.release()
1752                raise service_error(service_error.req, 
1753                        'fedid %s identifies an existing experiment' % expid)
1754            self.state_lock.release()
1755        else:
1756            (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1757
1758        #now we're done with the tmpdir, and it should be empty
1759        if self.cleanup:
1760            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1761            os.rmdir(tmpdir)
1762        else:
1763            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1764
1765        eid = self.create_experiment_state(fid, req, expid, expcert, 
1766                state='empty')
1767
1768        rv = {
1769                'experimentID': [
1770                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1771                ],
1772                'experimentStatus': 'empty',
1773                'experimentAccess': { 'X509' : expcert },
1774                'proof': proof.to_dict(),
1775            }
1776
1777        self.log.info("New experiment call succeeded for %s" % fid)
1778        return rv
1779
1780    # create_experiment sub-functions
1781
1782    @staticmethod
1783    def get_experiment_key(req, field='experimentID'):
1784        """
1785        Parse the experiment identifiers out of the request (the request body
1786        tag has been removed).  Specifically this pulls either the fedid or the
1787        localname out of the experimentID field.  A fedid is preferred.  If
1788        neither is present or the request does not contain the fields,
1789        service_errors are raised.
1790        """
1791        # Get the experiment access
1792        exp = req.get(field, None)
1793        if exp:
1794            if exp.has_key('fedid'):
1795                key = exp['fedid']
1796            elif exp.has_key('localname'):
1797                key = exp['localname']
1798            else:
1799                raise service_error(service_error.req, "Unknown lookup type")
1800        else:
1801            raise service_error(service_error.req, "No request?")
1802
1803        return key
1804
1805    def get_experiment_ids_and_start(self, key, tmpdir):
1806        """
1807        Get the experiment name, id and access certificate from the state, and
1808        set the experiment state to 'starting'.  returns a triple (fedid,
1809        localname, access_cert_file). The access_cert_file is a copy of the
1810        contents of the access certificate, created in the tempdir with
1811        restricted permissions.  If things are confused, raise an exception.
1812        """
1813
1814        expid = eid = None
1815        self.state_lock.acquire()
1816        if key in self.state:
1817            exp = self.state[key]
1818            exp.status = "starting"
1819            exp.updated()
1820            expid = exp.fedid
1821            eid = exp.localname
1822            expcert = exp.identity
1823        self.state_lock.release()
1824
1825        # make a protected copy of the access certificate so the experiment
1826        # controller can act as the experiment principal.
1827        if expcert:
1828            expcert_file = self.make_temp_certfile(expcert, tmpdir)
1829            if not expcert_file:
1830                raise service_error(service_error.internal, 
1831                        "Cannot create temp cert file?")
1832        else:
1833            expcert_file = None
1834
1835        return (eid, expid, expcert_file)
1836
1837    def get_topology(self, req, tmpdir):
1838        """
1839        Get the ns2 content and put it into a file for parsing.  Call the local
1840        or remote parser and return the topdl.Topology.  Errors result in
1841        exceptions.  req is the request and tmpdir is a work directory.
1842        """
1843
1844        # The tcl parser needs to read a file so put the content into that file
1845        descr=req.get('experimentdescription', None)
1846        if descr:
1847            if 'ns2description' in descr:
1848                file_content=descr['ns2description']
1849            elif 'topdldescription' in descr:
1850                return topdl.Topology(**descr['topdldescription'])
1851            else:
1852                raise service_error(service_error.req, 
1853                        'Unknown experiment description type')
1854        else:
1855            raise service_error(service_error.req, "No experiment description")
1856
1857
1858        if self.splitter_url:
1859            self.log.debug("Calling remote topdl translator at %s" % \
1860                    self.splitter_url)
1861            top = self.remote_ns2topdl(self.splitter_url, file_content)
1862        else:
1863            tclfile = os.path.join(tmpdir, "experiment.tcl")
1864            if file_content:
1865                try:
1866                    f = open(tclfile, 'w')
1867                    f.write(file_content)
1868                    f.close()
1869                except EnvironmentError:
1870                    raise service_error(service_error.internal,
1871                            "Cannot write temp experiment description")
1872            else:
1873                raise service_error(service_error.req, 
1874                        "Only ns2descriptions supported")
1875            pid = "dummy"
1876            gid = "dummy"
1877            eid = "dummy"
1878
1879            tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
1880                str(self.muxmax), '-m', 'dummy']
1881
1882            tclcmd.extend([pid, gid, eid, tclfile])
1883
1884            self.log.debug("running local splitter %s", " ".join(tclcmd))
1885            # This is just fantastic.  As a side effect the parser copies
1886            # tb_compat.tcl into the current directory, so that directory
1887            # must be writable by the fedd user.  Doing this in the
1888            # temporary subdir ensures this is the case.
1889            tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
1890                    cwd=tmpdir)
1891            split_data = tclparser.stdout
1892
1893            top = topdl.topology_from_xml(file=split_data, top="experiment")
1894            os.remove(tclfile)
1895
1896        return top
1897
1898    def get_testbed_services(self, req, testbeds):
1899        """
1900        Parse the services section of the request into two dicts mapping
1901        testbed to lists of federated_service objects.  The first dict maps all
1902        exporters of services to those service objects, the second maps
1903        testbeds to service objects only for services requiring portals.
1904        """
1905
1906        # Sanity check the services.  Exports or imports from unknown testbeds
1907        # cause an exception.
1908        for s in req.get('service', []):
1909            for t in s.get('import', []):
1910                if t not in testbeds:
1911                    raise service_error(service_error.req, 
1912                            'Service import to unknown testbed: %s' %t)
1913            for t in s.get('export', []):
1914                if t not in testbeds:
1915                    raise service_error(service_error.req, 
1916                            'Service export from unknown testbed: %s' %t)
1917
1918
1919        # We construct both dicts here because deriving the second is more
1920        # complex than it looks - both the keys and lists can differ, and it's
1921        # much easier to generate both in one pass.
1922        masters = { }
1923        pmasters = { }
1924        for s in req.get('service', []):
1925            # If this is a service request with the importall field
1926            # set, fill it out.
1927
1928            if s.get('importall', False):
1929                s['import'] = [ tb for tb in testbeds \
1930                        if tb not in s.get('export',[])]
1931                del s['importall']
1932
1933            # Add the service to masters
1934            for tb in s.get('export', []):
1935                if s.get('name', None):
1936
1937                    params = { }
1938                    for a in s.get('fedAttr', []):
1939                        params[a.get('attribute', '')] = a.get('value','')
1940
1941                    fser = federated_service(name=s['name'],
1942                            exporter=tb, importers=s.get('import',[]),
1943                            params=params)
1944                    if fser.name == 'hide_hosts' \
1945                            and 'hosts' not in fser.params:
1946                        fser.params['hosts'] = \
1947                                ",".join(tb_hosts.get(fser.exporter, []))
1948                    if tb in masters: masters[tb].append(fser)
1949                    else: masters[tb] = [fser]
1950
1951                    if fser.portal:
1952                        if tb in pmasters: pmasters[tb].append(fser)
1953                        else: pmasters[tb] = [fser]
1954                else:
1955                    self.log.error('Testbed service does not have name " + \
1956                            "and importers')
1957        return masters, pmasters
1958
1959    def generate_keys_and_hosts(self, tmpdir, expid, hosts, tbparams):
1960        """
1961        Create the ssh keys necessary for interconnecting the portal nodes and
1962        the global hosts file for letting each segment know about the IP
1963        addresses in play.  Save these into the repo.  Add attributes to the
1964        autorizer allowing access controllers to download them and return a set
1965        of attributes that inform the segments where to find this stuff.  May
1966        raise service_errors in if there are problems.
1967        """
1968        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1969        gw_secretkey_base = "fed.%s" % self.ssh_type
1970        keydir = os.path.join(tmpdir, 'keys')
1971        gw_pubkey = os.path.join(keydir, gw_pubkey_base)
1972        gw_secretkey = os.path.join(keydir, gw_secretkey_base)
1973
1974        try:
1975            self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1976        except ValueError:
1977            raise service_error(service_error.server_config, 
1978                    "Bad key type (%s)" % self.ssh_type)
1979
1980        self.generate_seer_certs(keydir)
1981
1982        # Copy configuration files into the remote file store
1983        # The config urlpath
1984        configpath = "/%s/config" % expid
1985        # The config file system location
1986        configdir ="%s%s" % ( self.repodir, configpath)
1987        try:
1988            os.makedirs(configdir)
1989        except EnvironmentError, e:
1990            raise service_error(service_error.internal,
1991                    "Cannot create config directory: %s" % e)
1992        try:
1993            f = open("%s/hosts" % configdir, "w")
1994            print >> f, string.join(hosts, '\n')
1995            f.close()
1996        except EnvironmentError, e:
1997            raise service_error(service_error.internal, 
1998                    "Cannot write hosts file: %s" % e)
1999        try:
2000            copy_file(gw_pubkey, os.path.join(configdir, gw_pubkey_base))
2001            copy_file(gw_secretkey, os.path.join(configdir, gw_secretkey_base))
2002            copy_file(os.path.join(keydir, 'ca.pem'), 
2003                    os.path.join(configdir, 'ca.pem'))
2004            copy_file(os.path.join(keydir, 'node.pem'), 
2005                    os.path.join(configdir, 'node.pem'))
2006        except EnvironmentError, e:
2007            raise service_error(service_error.internal, 
2008                    "Cannot copy keyfiles: %s" % e)
2009
2010        # Allow the individual testbeds to access the configuration files,
2011        # again by setting an attribute for the relevant pathnames on each
2012        # allocation principal.  Yeah, that's a long list comprehension.
2013        self.append_experiment_authorization(expid, set([
2014            (tbparams[tb].allocID, "%s/%s" % (configpath, f)) \
2015                    for tb in tbparams.keys() \
2016                        for f in ("hosts", 'ca.pem', 'node.pem', 
2017                            gw_secretkey_base, gw_pubkey_base)]))
2018
2019        attrs = [ 
2020                {
2021                    'attribute': 'ssh_pubkey', 
2022                    'value': '%s/%s/config/%s' % \
2023                            (self.repo_url, expid, gw_pubkey_base)
2024                },
2025                {
2026                    'attribute': 'ssh_secretkey', 
2027                    'value': '%s/%s/config/%s' % \
2028                            (self.repo_url, expid, gw_secretkey_base)
2029                },
2030                {
2031                    'attribute': 'hosts', 
2032                    'value': '%s/%s/config/hosts' % \
2033                            (self.repo_url, expid)
2034                },
2035                {
2036                    'attribute': 'seer_ca_pem', 
2037                    'value': '%s/%s/config/%s' % \
2038                            (self.repo_url, expid, 'ca.pem')
2039                },
2040                {
2041                    'attribute': 'seer_node_pem', 
2042                    'value': '%s/%s/config/%s' % \
2043                            (self.repo_url, expid, 'node.pem')
2044                },
2045            ]
2046        return attrs
2047
2048
2049    def get_vtopo(self, req, fid):
2050        """
2051        Return the stored virtual topology for this experiment
2052        """
2053        rv = None
2054        state = None
2055        self.log.info("vtopo call started for %s" %  fid)
2056
2057        req = req.get('VtopoRequestBody', None)
2058        if not req:
2059            raise service_error(service_error.req,
2060                    "Bad request format (no VtopoRequestBody)")
2061        exp = req.get('experiment', None)
2062        if exp:
2063            if exp.has_key('fedid'):
2064                key = exp['fedid']
2065                keytype = "fedid"
2066            elif exp.has_key('localname'):
2067                key = exp['localname']
2068                keytype = "localname"
2069            else:
2070                raise service_error(service_error.req, "Unknown lookup type")
2071        else:
2072            raise service_error(service_error.req, "No request?")
2073
2074        try:
2075            proof = self.check_experiment_access(fid, key)
2076        except service_error, e:
2077            self.log.info("vtopo call failed for %s: access denied" %  fid)
2078            raise e
2079
2080        self.state_lock.acquire()
2081        # XXX: this needs to be recalculated
2082        if key in self.state:
2083            if self.state[key].top is not None:
2084                vtopo = topdl.topology_to_vtopo(self.state[key].top)
2085                rv = { 'experiment' : {keytype: key },
2086                        'vtopo': vtopo,
2087                        'proof': proof.to_dict(), 
2088                    }
2089            else:
2090                state = self.state[key].status
2091        self.state_lock.release()
2092
2093        if rv: 
2094            self.log.info("vtopo call completed for %s %s " % \
2095                (key, fid))
2096            return rv
2097        else: 
2098            if state:
2099                self.log.info("vtopo call completed for %s %s (Not ready)" % \
2100                    (key, fid))
2101                raise service_error(service_error.partial, 
2102                        "Not ready: %s" % state)
2103            else:
2104                self.log.info("vtopo call completed for %s %s (No experiment)"\
2105                        % (key, fid))
2106                raise service_error(service_error.req, "No such experiment")
2107
2108    def get_vis(self, req, fid):
2109        """
2110        Return the stored visualization for this experiment
2111        """
2112        rv = None
2113        state = None
2114
2115        self.log.info("vis call started for %s" %  fid)
2116        req = req.get('VisRequestBody', None)
2117        if not req:
2118            raise service_error(service_error.req,
2119                    "Bad request format (no VisRequestBody)")
2120        exp = req.get('experiment', None)
2121        if exp:
2122            if exp.has_key('fedid'):
2123                key = exp['fedid']
2124                keytype = "fedid"
2125            elif exp.has_key('localname'):
2126                key = exp['localname']
2127                keytype = "localname"
2128            else:
2129                raise service_error(service_error.req, "Unknown lookup type")
2130        else:
2131            raise service_error(service_error.req, "No request?")
2132
2133        try:
2134            proof = self.check_experiment_access(fid, key)
2135        except service_error, e:
2136            self.log.info("vis call failed for %s: access denied" %  fid)
2137            raise e
2138
2139        self.state_lock.acquire()
2140        # Generate the visualization
2141        if key in self.state:
2142            if self.state[key].top is not None:
2143                try:
2144                    vis = self.genviz(
2145                            topdl.topology_to_vtopo(self.state[key].top))
2146                except service_error, e:
2147                    self.state_lock.release()
2148                    raise e
2149                rv =  { 'experiment' : {keytype: key },
2150                        'vis': vis,
2151                        'proof': proof.to_dict(), 
2152                        }
2153            else:
2154                state = self.state[key].status
2155        self.state_lock.release()
2156
2157        if rv: 
2158            self.log.info("vis call completed for %s %s " % \
2159                (key, fid))
2160            return rv
2161        else:
2162            if state:
2163                self.log.info("vis call completed for %s %s (not ready)" % \
2164                    (key, fid))
2165                raise service_error(service_error.partial, 
2166                        "Not ready: %s" % state)
2167            else:
2168                self.log.info("vis call completed for %s %s (no experiment)" % \
2169                    (key, fid))
2170                raise service_error(service_error.req, "No such experiment")
2171
2172   
2173    def save_federant_information(self, allocated, tbparams, eid, top):
2174        """
2175        Store the various data that have changed in the experiment state
2176        between when it was started and the beginning of resource allocation.
2177        This is basically the information about each local allocation.  This
2178        fills in the values of the placeholder allocation in the state.  It
2179        also collects the access proofs and returns them as dicts for a
2180        response message.
2181        """
2182        self.state_lock.acquire()
2183        exp = self.state[eid]
2184        exp.top = top.clone()
2185        # save federant information
2186        for k in allocated.keys():
2187            exp.add_allocation(tbparams[k])
2188            top.elements.append(topdl.Testbed(uri=tbparams[k].uri,
2189                type="testbed", localname=[k], 
2190                service=[ s.to_topdl() for s in tbparams[k].services]))
2191
2192        # Access proofs for the response message
2193        proofs = [copy.deepcopy(p) for k in tbparams.keys()\
2194                    for p in tbparams[k].proof]
2195        exp.updated()
2196        if self.state_filename: 
2197            self.write_state()
2198        self.state_lock.release()
2199        return proofs
2200
2201    def clear_placeholder(self, eid, expid, tmpdir):
2202        """
2203        Clear the placeholder and remove any allocated temporary dir.
2204        """
2205
2206        self.state_lock.acquire()
2207        del self.state[eid]
2208        del self.state[expid]
2209        if self.state_filename: self.write_state()
2210        self.state_lock.release()
2211        if tmpdir and self.cleanup:
2212            self.remove_dirs(tmpdir)
2213
2214    # end of create_experiment sub-functions
2215
2216    def create_experiment(self, req, fid):
2217        """
2218        The external interface to experiment creation called from the
2219        dispatcher.
2220
2221        Creates a working directory, splits the incoming description using the
2222        splitter script and parses out the various subsections using the
2223        classes above.  Once each sub-experiment is created, use pooled threads
2224        to instantiate them and start it all up.
2225        """
2226
2227        self.log.info("Create experiment call started for %s" % fid)
2228        req = req.get('CreateRequestBody', None)
2229        if req:
2230            key = self.get_experiment_key(req)
2231        else:
2232            raise service_error(service_error.req,
2233                    "Bad request format (no CreateRequestBody)")
2234
2235        # Import information from the requester
2236        # import may partially succeed so always save credentials and warn
2237        if not self.auth.import_credentials(data_list=req.get('credential',[])):
2238            self.log.debug("Failed to import delegation credentials(!)")
2239        self.get_grouper_updates(fid)
2240        self.auth.update()
2241        self.auth.save()
2242
2243        try:
2244            # Make sure that the caller can talk to us
2245            proof = self.check_experiment_access(fid, key)
2246        except service_error, e:
2247            self.log.info("Create experiment call failed for %s: access denied"\
2248                    % fid)
2249            raise e
2250
2251
2252        # Install the testbed map entries supplied with the request into a copy
2253        # of the testbed map.
2254        tbmap = dict(self.tbmap)
2255        for m in req.get('testbedmap', []):
2256            if 'testbed' in m and 'uri' in m:
2257                tbmap[m['testbed']] = m['uri']
2258
2259        # a place to work
2260        try:
2261            tmpdir = tempfile.mkdtemp(prefix="split-")
2262            os.mkdir(tmpdir+"/keys")
2263        except EnvironmentError:
2264            raise service_error(service_error.internal, "Cannot create tmp dir")
2265
2266        tbparams = { }
2267
2268        eid, expid, expcert_file = \
2269                self.get_experiment_ids_and_start(key, tmpdir)
2270
2271        # This catches exceptions to clear the placeholder if necessary
2272        try: 
2273            if not (eid and expid):
2274                raise service_error(service_error.internal, 
2275                        "Cannot find local experiment info!?")
2276
2277            top = self.get_topology(req, tmpdir)
2278            self.confirm_software(top)
2279            # Assign the IPs
2280            hosts, ip_allocator = self.allocate_ips_to_topo(top)
2281            # Find the testbeds to look up
2282            tb_hosts = { }
2283            testbeds = [ ]
2284            for e in top.elements:
2285                if isinstance(e, topdl.Computer):
2286                    tb = e.get_attribute('testbed') or 'default'
2287                    if tb in tb_hosts: tb_hosts[tb].append(e.name)
2288                    else: 
2289                        tb_hosts[tb] = [ e.name ]
2290                        testbeds.append(tb)
2291
2292            masters, pmasters = self.get_testbed_services(req, testbeds)
2293            allocated = { }         # Testbeds we can access
2294            topo ={ }               # Sub topologies
2295            connInfo = { }          # Connection information
2296
2297            self.get_access_to_testbeds(testbeds, fid, allocated, 
2298                    tbparams, masters, tbmap, expid, expcert_file)
2299
2300            # tbactive is the set of testbeds that have NATs in front of their
2301            # portals. They need to initiate connections.
2302            tbactive = set([k for k, v in tbparams.items() \
2303                    if v.get_attribute('nat_portals')])
2304
2305            self.split_topology(top, topo, testbeds)
2306
2307            attrs = self.generate_keys_and_hosts(tmpdir, expid, hosts, tbparams)
2308            # XXX: PNNL debug
2309            self.log.debug("Back from generate keys")
2310
2311            part = experiment_partition(self.auth, self.store_url, tbmap,
2312                    self.muxmax, self.direct_transit, tbactive)
2313            part.add_portals(top, topo, eid, pmasters, tbparams, ip_allocator,
2314                    connInfo, expid)
2315            # XXX: PNNL debug
2316            self.log.debug("Back from add portals")
2317
2318            auth_attrs = set()
2319            # Now get access to the dynamic testbeds (those added above)
2320            for tb in [ t for t in topo if t not in allocated]:
2321                # XXX: PNNL debug
2322                self.log.debug("dynamic testbeds %s" %tb)
2323                self.get_access(tb, tbparams, fid, masters, tbmap, 
2324                        expid, expcert_file)
2325                allocated[tb] = 1
2326                store_keys = topo[tb].get_attribute('store_keys')
2327                # Give the testbed access to keys it exports or imports
2328                if store_keys:
2329                    auth_attrs.update(set([
2330                        (tbparams[tb].allocID, sk) \
2331                                for sk in store_keys.split(" ")]))
2332
2333            # XXX: PNNL debug
2334            self.log.debug("done with dynamic testbeds %s" % auth_attrs)
2335            if auth_attrs:
2336                self.append_experiment_authorization(expid, auth_attrs)
2337
2338            # transit and disconnected testbeds may not have a connInfo entry.
2339            # Fill in the blanks.
2340            for t in allocated.keys():
2341                if not connInfo.has_key(t):
2342                    connInfo[t] = { }
2343
2344            self.wrangle_software(expid, top, topo, tbparams)
2345
2346            proofs = self.save_federant_information(allocated, tbparams, 
2347                    eid, top)
2348        except service_error, e:
2349            # If something goes wrong in the parse (usually an access error)
2350            # clear the placeholder state.  From here on out the code delays
2351            # exceptions.  Failing at this point returns a fault to the remote
2352            # caller.
2353
2354            self.log.info("Create experiment call failed for %s %s: %s" % 
2355                    (eid, fid, e))
2356            self.clear_placeholder(eid, expid, tmpdir)
2357            raise e
2358
2359        # Start the background swapper and return the starting state.  From
2360        # here on out, the state will stick around a while.
2361
2362        # Create a logger that logs to the experiment's state object as well as
2363        # to the main log file.
2364        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
2365        alloc_collector = self.list_log(self.state[eid].log)
2366        h = logging.StreamHandler(alloc_collector)
2367        # XXX: there should be a global one of these rather than repeating the
2368        # code.
2369        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2370                    '%d %b %y %H:%M:%S'))
2371        alloc_log.addHandler(h)
2372
2373        # Start a thread to do the resource allocation
2374        t  = Thread(target=self.allocate_resources,
2375                args=(allocated, masters, eid, expid, tbparams, 
2376                    top, topo, tmpdir, alloc_log, alloc_collector, attrs,
2377                    connInfo, tbmap, expcert_file),
2378                name=eid)
2379        t.start()
2380
2381        rv = {
2382                'experimentID': [
2383                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2384                ],
2385                'experimentStatus': 'starting',
2386                'proof': [ proof.to_dict() ] + proofs,
2387            }
2388        self.log.info("Create experiment call succeeded for %s %s" % \
2389                (eid, fid))
2390
2391        return rv
2392   
2393    def get_experiment_fedid(self, key):
2394        """
2395        find the fedid associated with the localname key in the state database.
2396        """
2397
2398        rv = None
2399        self.state_lock.acquire()
2400        if key in self.state:
2401            rv = self.state[key].fedid
2402        self.state_lock.release()
2403        return rv
2404
2405    def check_experiment_access(self, fid, key):
2406        """
2407        Confirm that the fid has access to the experiment.  Though a request
2408        may be made in terms of a local name, the access attribute is always
2409        the experiment's fedid.
2410        """
2411        if not isinstance(key, fedid):
2412            key = self.get_experiment_fedid(key)
2413
2414        access_ok, proof = self.auth.check_attribute(fid, key, with_proof=True)
2415
2416        if access_ok:
2417            return proof
2418        else:
2419            raise service_error(service_error.access, "Access Denied",
2420                proof)
2421
2422
2423    def get_handler(self, path, fid):
2424        """
2425        Perhaps surprisingly named, this function handles HTTP GET requests to
2426        this server (SOAP requests are POSTs).
2427        """
2428        self.log.info("Get handler %s %s" % (path, fid))
2429        if len("%s" % fid) == 0:
2430            return (None, None)
2431        # XXX: log proofs?
2432        if self.auth.check_attribute(fid, path):
2433            return ("%s/%s" % (self.repodir, path), "application/binary")
2434        else:
2435            return (None, None)
2436
2437    def update_info(self, key, force=False):
2438        top = None
2439        self.state_lock.acquire()
2440        if key in self.state:
2441            if force or self.state[key].older_than(self.info_cache_limit):
2442                top = self.state[key].top
2443                if top is not None: top = top.clone()
2444                d1, info_params, cert, d2 = \
2445                        self.get_segment_info(self.state[key], need_lock=False)
2446        self.state_lock.release()
2447
2448        if top is None: return
2449
2450        try:
2451            tmpdir = tempfile.mkdtemp(prefix="info-")
2452        except EnvironmentError:
2453            raise service_error(service_error.internal, 
2454                    "Cannot create tmp dir")
2455        cert_file = self.make_temp_certfile(cert, tmpdir)
2456
2457        data = []
2458        try:
2459            for k, (uri, aid) in info_params.items():
2460                info=self.info_segment(log=self.log, testbed=uri,
2461                            cert_file=cert_file, cert_pwd=None,
2462                            trusted_certs=self.trusted_certs,
2463                            caller=self.call_InfoSegment)
2464                info(uri, aid)
2465                data.append(info)
2466        # Clean up the tmpdir no matter what
2467        finally:
2468            if tmpdir: self.remove_dirs(tmpdir)
2469
2470        self.annotate_topology(top, data)
2471        self.state_lock.acquire()
2472        if key in self.state:
2473            self.state[key].top = top
2474            self.state[key].updated()
2475            if self.state_filename: self.write_state()
2476        self.state_lock.release()
2477
2478   
2479    def get_info(self, req, fid):
2480        """
2481        Return all the stored info about this experiment
2482        """
2483        rv = None
2484
2485        self.log.info("Info call started for %s" %  fid)
2486        req = req.get('InfoRequestBody', None)
2487        if not req:
2488            raise service_error(service_error.req,
2489                    "Bad request format (no InfoRequestBody)")
2490        exp = req.get('experiment', None)
2491        legacy = req.get('legacy', False)
2492        fresh = req.get('fresh', False)
2493        if exp:
2494            if exp.has_key('fedid'):
2495                key = exp['fedid']
2496                keytype = "fedid"
2497            elif exp.has_key('localname'):
2498                key = exp['localname']
2499                keytype = "localname"
2500            else:
2501                raise service_error(service_error.req, "Unknown lookup type")
2502        else:
2503            raise service_error(service_error.req, "No request?")
2504
2505        try:
2506            proof = self.check_experiment_access(fid, key)
2507        except service_error, e:
2508            self.log.info("Info call failed for %s: access denied" %  fid)
2509
2510
2511        self.update_info(key, fresh)
2512
2513        self.state_lock.acquire()
2514        if self.state.has_key(key):
2515            rv = self.state[key].get_info()
2516            # Copy the topo if we need legacy annotations
2517            if legacy:
2518                top = self.state[key].top
2519                if top is not None: top = top.clone()
2520        self.state_lock.release()
2521        self.log.info("Gathered Info for %s %s" % (key, fid))
2522
2523        # If the legacy visualization and topology representations are
2524        # requested, calculate them and add them to the return.
2525        if legacy and rv is not None:
2526            self.log.info("Generating legacy Info for %s %s" % (key, fid))
2527            if top is not None:
2528                vtopo = topdl.topology_to_vtopo(top)
2529                if vtopo is not None:
2530                    rv['vtopo'] = vtopo
2531                    try:
2532                        vis = self.genviz(vtopo)
2533                    except service_error, e:
2534                        self.log.debug('Problem generating visualization: %s' \
2535                                % e)
2536                        vis = None
2537                    if vis is not None:
2538                        rv['vis'] = vis
2539        if rv:
2540            self.log.info("Info succeded for %s %s" % (key, fid))
2541            rv['proof'] = proof.to_dict()
2542            return rv
2543        else: 
2544            self.log.info("Info failed for %s %s: no experiment" % (key, fid))
2545            raise service_error(service_error.req, "No such experiment")
2546
2547    def operate_on_segments(self, op_params, cert, op, testbeds, params, 
2548            results):
2549        """
2550        Call OperateSegment on multiple testbeds and gather the results.
2551        op_params contains the parameters needed to contact that testbed, cert
2552        is a certificate containing the fedid to use, op is the operation,
2553        testbeds is a dict mapping testbed name to targets in that testbed,
2554        params are the parameters to include a,d results is a growing list of
2555        the results of the calls.
2556        """
2557        try:
2558            tmpdir = tempfile.mkdtemp(prefix="info-")
2559        except EnvironmentError:
2560            raise service_error(service_error.internal, 
2561                    "Cannot create tmp dir")
2562        cert_file = self.make_temp_certfile(cert, tmpdir)
2563
2564        try:
2565            for tb, targets in testbeds.items():
2566                if tb in op_params:
2567                    uri, aid = op_params[tb]
2568                    operate=self.operation_segment(log=self.log, testbed=uri,
2569                                cert_file=cert_file, cert_pwd=None,
2570                                trusted_certs=self.trusted_certs,
2571                                caller=self.call_OperationSegment)
2572                    if operate(uri, aid, op, targets, params):
2573                        if operate.status is not None:
2574                            results.extend(operate.status)
2575                            continue
2576                # Something went wrong in a weird way.  Add statuses
2577                # that reflect that to results
2578                for t in targets:
2579                    results.append(operation_status(t, 
2580                        operation_status.federant,
2581                        'Unexpected error on %s' % tb))
2582        # Clean up the tmpdir no matter what
2583        finally:
2584            if tmpdir: self.remove_dirs(tmpdir)
2585
2586    def do_operation(self, req, fid):
2587        """
2588        Find the testbeds holding each target and ask them to carry out the
2589        operation.  Return the statuses.
2590        """
2591        # Map an element to the testbed containing it
2592        def element_to_tb(e):
2593            if isinstance(e, topdl.Computer): return e.get_attribute("testbed")
2594            elif isinstance(e, topdl.Testbed): return e.name
2595            else: return None
2596        # If d is an operation_status object, make it a dict
2597        def make_dict(d):
2598            if isinstance(d, dict): return d
2599            elif isinstance(d, operation_status): return d.to_dict()
2600            else: return { }
2601
2602        def element_name(e):
2603            if isinstance(e, topdl.Computer): return e.name
2604            elif isinstance(e, topdl.Testbed): 
2605                if e.localname: return e.localname[0]
2606                else: return None
2607            else: return None
2608
2609        self.log.info("Operation call started for %s" %  fid)
2610        req = req.get('OperationRequestBody', None)
2611        if not req:
2612            raise service_error(service_error.req,
2613                    "Bad request format (no OperationRequestBody)")
2614        exp = req.get('experiment', None)
2615        op = req.get('operation', None)
2616        targets = set(req.get('target', []))
2617        params = req.get('parameter', None)
2618
2619        if exp:
2620            if 'fedid' in exp:
2621                key = exp['fedid']
2622                keytype = "fedid"
2623            elif 'localname' in exp:
2624                key = exp['localname']
2625                keytype = "localname"
2626            else:
2627                raise service_error(service_error.req, "Unknown lookup type")
2628        else:
2629            raise service_error(service_error.req, "No request?")
2630
2631        if op is None or not targets:
2632            raise service_error(service_error.req, "No request?")
2633
2634        try:
2635            proof = self.check_experiment_access(fid, key)
2636        except service_error, e:
2637            self.log.info("Operation call failed for %s: access denied" %  fid)
2638            raise e
2639
2640        self.state_lock.acquire()
2641        if key in self.state:
2642            d1, op_params, cert, d2 = \
2643                    self.get_segment_info(self.state[key], need_lock=False,
2644                            key='tb')
2645            top = self.state[key].top
2646            if top is not None:
2647                top = top.clone()
2648        self.state_lock.release()
2649
2650        if top is None:
2651            self.log.info("Operation call failed for %s: not active" %  fid)
2652            raise service_error(service_error.partial, "No topology yet", 
2653                    proof=proof)
2654
2655        testbeds = { }
2656        results = []
2657        for e in top.elements:
2658            ename = element_name(e)
2659            if ename in targets:
2660                tb = element_to_tb(e)
2661                targets.remove(ename)
2662                if tb is not None:
2663                    if tb in testbeds: testbeds[tb].append(ename)
2664                    else: testbeds[tb] = [ ename ]
2665                else:
2666                    results.append(operation_status(e.name, 
2667                        code=operation_status.no_target, 
2668                        description='Cannot map target to testbed'))
2669
2670        for t in targets:
2671            results.append(operation_status(t, operation_status.no_target))
2672
2673        self.operate_on_segments(op_params, cert, op, testbeds, params,
2674                results)
2675
2676        self.log.info("Operation call succeeded for %s" %  fid)
2677        return { 
2678                'experiment': exp, 
2679                'status': [make_dict(r) for r in results],
2680                'proof': proof.to_dict()
2681                }
2682
2683
2684    def get_multi_info(self, req, fid):
2685        """
2686        Return all the stored info that this fedid can access
2687        """
2688        rv = { 'info': [ ], 'proof': [ ] }
2689
2690        self.log.info("Multi Info call started for %s" %  fid)
2691        self.get_grouper_updates(fid)
2692        self.auth.update()
2693        self.auth.save()
2694        self.state_lock.acquire()
2695        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2696            try:
2697                proof = self.check_experiment_access(fid, key)
2698            except service_error, e:
2699                if e.code == service_error.access:
2700                    continue
2701                else:
2702                    self.log.info("Multi Info call failed for %s: %s" %  \
2703                            (e,fid))
2704                    self.state_lock.release()
2705                    raise e
2706
2707            if self.state.has_key(key):
2708                e = self.state[key].get_info()
2709                e['proof'] = proof.to_dict()
2710                rv['info'].append(e)
2711                rv['proof'].append(proof.to_dict())
2712        self.state_lock.release()
2713        self.log.info("Multi Info call succeeded for %s" %  fid)
2714        return rv
2715
2716    def check_termination_status(self, fed_exp, force):
2717        """
2718        Confirm that the experiment is sin a valid state to stop (or force it)
2719        return the state - invalid states for deletion and force settings cause
2720        exceptions.
2721        """
2722        self.state_lock.acquire()
2723        status = fed_exp.status
2724
2725        if status:
2726            if status in ('starting', 'terminating'):
2727                if not force:
2728                    self.state_lock.release()
2729                    raise service_error(service_error.partial, 
2730                            'Experiment still being created or destroyed')
2731                else:
2732                    self.log.warning('Experiment in %s state ' % status + \
2733                            'being terminated by force.')
2734            self.state_lock.release()
2735            return status
2736        else:
2737            # No status??? trouble
2738            self.state_lock.release()
2739            raise service_error(service_error.internal,
2740                    "Experiment has no status!?")
2741
2742    def get_segment_info(self, fed_exp, need_lock=True, key='aid'):
2743        ids = []
2744        term_params = { }
2745        if need_lock: self.state_lock.acquire()
2746        ids = [ x for x in (fed_exp.localname, fed_exp.fedid) if x is not None ]
2747        expcert = fed_exp.identity
2748        repo = "%s" % fed_exp.fedid
2749
2750        # Collect the allocation/segment ids into a dict keyed by the fedid
2751        # of the allocation that contains a tuple of uri, aid
2752        for i, fed in enumerate(fed_exp.get_all_allocations()):
2753            uri = fed.uri
2754            aid = fed.allocID
2755            if key == 'aid': term_params[aid] = (uri, aid)
2756            elif key == 'tb': term_params[fed.tb] = (uri, aid)
2757
2758        if need_lock: self.state_lock.release()
2759        return ids, term_params, expcert, repo
2760
2761
2762    def get_termination_info(self, fed_exp):
2763        self.state_lock.acquire()
2764        ids, term_params, expcert, repo = self.get_segment_info(fed_exp, False)
2765        # Change the experiment state
2766        fed_exp.status = 'terminating'
2767        fed_exp.updated()
2768        if self.state_filename: self.write_state()
2769        self.state_lock.release()
2770
2771        return ids, term_params, expcert, repo
2772
2773
2774    def deallocate_resources(self, term_params, expcert, status, force, 
2775            dealloc_log):
2776        tmpdir = None
2777        # This try block makes sure the tempdir is cleared
2778        try:
2779            # If no expcert, try the deallocation as the experiment
2780            # controller instance.
2781            if expcert and self.auth_type != 'legacy': 
2782                try:
2783                    tmpdir = tempfile.mkdtemp(prefix="term-")
2784                except EnvironmentError:
2785                    raise service_error(service_error.internal, 
2786                            "Cannot create tmp dir")
2787                cert_file = self.make_temp_certfile(expcert, tmpdir)
2788                pw = None
2789            else: 
2790                cert_file = self.cert_file
2791                pw = self.cert_pwd
2792
2793            # Stop everyone.  NB, wait_for_all waits until a thread starts
2794            # and then completes, so we can't wait if nothing starts.  So,
2795            # no tbparams, no start.
2796            if len(term_params) > 0:
2797                tp = thread_pool(self.nthreads)
2798                for k, (uri, aid) in term_params.items():
2799                    # Create and start a thread to stop the segment
2800                    tp.wait_for_slot()
2801                    t  = pooled_thread(\
2802                            target=self.terminate_segment(log=dealloc_log,
2803                                testbed=uri,
2804                                cert_file=cert_file, 
2805                                cert_pwd=pw,
2806                                trusted_certs=self.trusted_certs,
2807                                caller=self.call_TerminateSegment),
2808                            args=(uri, aid), name=k,
2809                            pdata=tp, trace_file=self.trace_file)
2810                    t.start()
2811                # Wait for completions
2812                tp.wait_for_all_done()
2813
2814            # release the allocations (failed experiments have done this
2815            # already, and starting experiments may be in odd states, so we
2816            # ignore errors releasing those allocations
2817            try: 
2818                for k, (uri, aid)  in term_params.items():
2819                    self.release_access(None, aid, uri=uri,
2820                            cert_file=cert_file, cert_pwd=pw)
2821            except service_error, e:
2822                if status != 'failed' and not force:
2823                    raise e
2824
2825        # Clean up the tmpdir no matter what
2826        finally:
2827            if tmpdir: self.remove_dirs(tmpdir)
2828
2829    def terminate_experiment(self, req, fid):
2830        """
2831        Swap this experiment out on the federants and delete the shared
2832        information
2833        """
2834        self.log.info("Terminate experiment call started for %s" % fid)
2835        tbparams = { }
2836        req = req.get('TerminateRequestBody', None)
2837        if not req:
2838            raise service_error(service_error.req,
2839                    "Bad request format (no TerminateRequestBody)")
2840
2841        key = self.get_experiment_key(req, 'experiment')
2842        try:
2843            proof = self.check_experiment_access(fid, key)
2844        except service_error, e:
2845            self.log.info(
2846                    "Terminate experiment call failed for %s: access denied" \
2847                            % fid)
2848            raise e
2849        exp = req.get('experiment', False)
2850        force = req.get('force', False)
2851
2852        dealloc_list = [ ]
2853
2854
2855        # Create a logger that logs to the dealloc_list as well as to the main
2856        # log file.
2857        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2858        dealloc_log.info("Terminating %s " %key)
2859        h = logging.StreamHandler(self.list_log(dealloc_list))
2860        # XXX: there should be a global one of these rather than repeating the
2861        # code.
2862        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2863                    '%d %b %y %H:%M:%S'))
2864        dealloc_log.addHandler(h)
2865
2866        self.state_lock.acquire()
2867        fed_exp = self.state.get(key, None)
2868        self.state_lock.release()
2869        repo = None
2870
2871        if fed_exp:
2872            status = self.check_termination_status(fed_exp, force)
2873            # get_termination_info updates the experiment state
2874            ids, term_params, expcert, repo = self.get_termination_info(fed_exp)
2875            self.deallocate_resources(term_params, expcert, status, force, 
2876                    dealloc_log)
2877
2878            # Remove the terminated experiment
2879            self.state_lock.acquire()
2880            for id in ids:
2881                self.clear_experiment_authorization(id, need_state_lock=False)
2882                if id in self.state: del self.state[id]
2883
2884            if self.state_filename: self.write_state()
2885            self.state_lock.release()
2886
2887            # Delete any synch points associated with this experiment.  All
2888            # synch points begin with the fedid of the experiment.
2889            fedid_keys = set(["fedid:%s" % f for f in ids \
2890                    if isinstance(f, fedid)])
2891            for k in self.synch_store.all_keys():
2892                try:
2893                    if len(k) > 45 and k[0:46] in fedid_keys:
2894                        self.synch_store.del_value(k)
2895                except synch_store.BadDeletionError:
2896                    pass
2897            self.write_store()
2898
2899            # Remove software and other cached stuff from the filesystem.
2900            if repo:
2901                self.remove_dirs("%s/%s" % (self.repodir, repo))
2902       
2903            self.log.info("Terminate experiment succeeded for %s %s" % \
2904                    (key, fid))
2905            return { 
2906                    'experiment': exp , 
2907                    'deallocationLog': string.join(dealloc_list, ''),
2908                    'proof': [proof.to_dict()],
2909                    }
2910        else:
2911            self.log.info("Terminate experiment failed for %s %s: no state" % \
2912                    (key, fid))
2913            raise service_error(service_error.req, "No saved state")
2914
2915
2916    def GetValue(self, req, fid):
2917        """
2918        Get a value from the synchronized store
2919        """
2920        req = req.get('GetValueRequestBody', None)
2921        if not req:
2922            raise service_error(service_error.req,
2923                    "Bad request format (no GetValueRequestBody)")
2924       
2925        name = req.get('name', None)
2926        wait = req.get('wait', False)
2927        rv = { 'name': name }
2928
2929        if not name:
2930            raise service_error(service_error.req, "No name?")
2931
2932        access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True)
2933
2934        if access_ok:
2935            self.log.debug("[GetValue] asking for %s " % name)
2936            try:
2937                v = self.synch_store.get_value(name, wait)
2938            except synch_store.RevokedKeyError:
2939                # No more synch on this key
2940                raise service_error(service_error.federant, 
2941                        "Synch key %s revoked" % name)
2942            if v is not None:
2943                rv['value'] = v
2944            rv['proof'] = proof.to_dict()
2945            self.log.debug("[GetValue] got %s from %s" % (v, name))
2946            return rv
2947        else:
2948            raise service_error(service_error.access, "Access Denied",
2949                    proof=proof)
2950       
2951
2952    def SetValue(self, req, fid):
2953        """
2954        Set a value in the synchronized store
2955        """
2956        req = req.get('SetValueRequestBody', None)
2957        if not req:
2958            raise service_error(service_error.req,
2959                    "Bad request format (no SetValueRequestBody)")
2960       
2961        name = req.get('name', None)
2962        v = req.get('value', '')
2963
2964        if not name:
2965            raise service_error(service_error.req, "No name?")
2966
2967        access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True)
2968
2969        if access_ok:
2970            try:
2971                self.synch_store.set_value(name, v)
2972                self.write_store()
2973                self.log.debug("[SetValue] set %s to %s" % (name, v))
2974            except synch_store.CollisionError:
2975                # Translate into a service_error
2976                raise service_error(service_error.req,
2977                        "Value already set: %s" %name)
2978            except synch_store.RevokedKeyError:
2979                # No more synch on this key
2980                raise service_error(service_error.federant, 
2981                        "Synch key %s revoked" % name)
2982                return { 'name': name, 'value': v, 'proof': proof.to_dict() }
2983        else:
2984            raise service_error(service_error.access, "Access Denied",
2985                    proof=proof)
Note: See TracBrowser for help on using the repository browser.