source: fedd/federation/experiment_control.py @ 8212638

Last change on this file since 8212638 was 8212638, checked in by Ted Faber <faber@…>, 12 years ago

Recognize and respect user allocated IP addresses

  • Property mode set to 100644
File size: 97.6 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15import shutil
16import zipfile
17
18import os.path
19
20import traceback
21# For parsing visualization output and splitter output
22import xml.parsers.expat
23
24from threading import Lock, Thread, Condition
25from subprocess import call, Popen, PIPE
26from string import join
27
28from urlparse import urlparse
29from urllib2 import urlopen, URLError
30
31from util import *
32from deter import fedid, generate_fedid
33from remote_service import xmlrpc_handler, soap_handler, service_caller
34from service_error import service_error
35from synch_store import synch_store
36from experiment_partition import experiment_partition
37from experiment_control_legacy import experiment_control_legacy
38from authorizer import abac_authorizer
39from thread_pool import thread_pool, pooled_thread
40from experiment_info import experiment_info, allocation_info, federated_service
41from operation_status import operation_status
42
43from deter import topdl
44from deter import ip_allocator
45from deter import ip_addr
46import list_log
47
48
49class nullHandler(logging.Handler):
50    def emit(self, record): pass
51
52fl = logging.getLogger("fedd.experiment_control")
53fl.addHandler(nullHandler())
54
55class experiment_control_local(experiment_control_legacy):
56    """
57    Control of experiments that this system can directly access.
58
59    Includes experiment creation, termination and information dissemination.
60    Thred safe.
61    """
62
63    class ssh_cmd_timeout(RuntimeError): pass
64   
65    call_RequestAccess = service_caller('RequestAccess')
66    call_ReleaseAccess = service_caller('ReleaseAccess')
67    call_StartSegment = service_caller('StartSegment')
68    call_TerminateSegment = service_caller('TerminateSegment')
69    call_InfoSegment = service_caller('InfoSegment')
70    call_OperationSegment = service_caller('OperationSegment')
71    call_Ns2Topdl = service_caller('Ns2Topdl')
72
73    def __init__(self, config=None, auth=None):
74        """
75        Intialize the various attributes, most from the config object
76        """
77
78        def parse_tarfile_list(tf):
79            """
80            Parse a tarfile list from the configuration.  This is a set of
81            paths and tarfiles separated by spaces.
82            """
83            rv = [ ]
84            if tf is not None:
85                tl = tf.split()
86                while len(tl) > 1:
87                    p, t = tl[0:2]
88                    del tl[0:2]
89                    rv.append((p, t))
90            return rv
91
92        self.list_log = list_log.list_log
93
94        self.cert_file = config.get("experiment_control", "cert_file")
95        if self.cert_file:
96            self.cert_pwd = config.get("experiment_control", "cert_pwd")
97        else:
98            self.cert_file = config.get("globals", "cert_file")
99            self.cert_pwd = config.get("globals", "cert_pwd")
100
101        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
102                or config.get("globals", "trusted_certs")
103
104        self.repodir = config.get("experiment_control", "repodir")
105        self.repo_url = config.get("experiment_control", "repo_url", 
106                "https://users.isi.deterlab.net:23235");
107
108        self.exp_stem = "fed-stem"
109        self.log = logging.getLogger("fedd.experiment_control")
110        set_log_level(config, "experiment_control", self.log)
111        self.muxmax = 2
112        self.nthreads = 10
113        self.randomize_experiments = False
114
115        self.splitter = None
116        self.ssh_keygen = "/usr/bin/ssh-keygen"
117        self.ssh_identity_file = None
118
119
120        self.debug = config.getboolean("experiment_control", "create_debug")
121        self.cleanup = not config.getboolean("experiment_control", 
122                "leave_tmpfiles")
123        self.state_filename = config.get("experiment_control", 
124                "experiment_state")
125        self.store_filename = config.get("experiment_control", 
126                "synch_store")
127        self.store_url = config.get("experiment_control", "store_url")
128        self.splitter_url = config.get("experiment_control", "ns2topdl_uri")
129        self.grouper_url = config.get("experiment_control", "grouper_url")
130        self.fedkit = parse_tarfile_list(\
131                config.get("experiment_control", "fedkit"))
132        self.gatewaykit = parse_tarfile_list(\
133                config.get("experiment_control", "gatewaykit"))
134
135        dt = config.get("experiment_control", "direct_transit")
136        self.auth_type = config.get('experiment_control', 'auth_type') \
137                or 'legacy'
138        self.auth_dir = config.get('experiment_control', 'auth_dir')
139        # XXX: document this!
140        self.info_cache_limit = \
141                config.getint('experiment_control', 'info_cache', 600)
142        if dt: self.direct_transit = [ tb.strip() for tb in dt.split(",")]
143        else: self.direct_transit = [ ]
144        # NB for internal master/slave ops, not experiment setup
145        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
146
147        self.overrides = set([])
148        ovr = config.get('experiment_control', 'overrides')
149        if ovr:
150            for o in ovr.split(","):
151                o = o.strip()
152                if o.startswith('fedid:'): o = o[len('fedid:'):]
153                self.overrides.add(fedid(hexstr=o))
154
155        self.state = { }
156        self.state_lock = Lock()
157        self.tclsh = "/usr/local/bin/otclsh"
158        self.tcl_splitter = config.get("ns2topdl", "tcl_splitter") or \
159                config.get("experiment_control", "tcl_splitter",
160                        "/usr/testbed/lib/ns2ir/parse.tcl")
161        mapdb_file = config.get("experiment_control", "mapdb")
162        self.trace_file = sys.stderr
163
164        self.def_expstart = \
165                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
166                "/tmp/federate";
167        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
168                "FEDDIR/hosts";
169        self.def_gwstart = \
170                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
171                "/tmp/bridge.log";
172        self.def_mgwstart = \
173                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
174                "/tmp/bridge.log";
175        self.def_gwimage = "FBSD61-TUNNEL2";
176        self.def_gwtype = "pc";
177        self.local_access = { }
178
179        if self.auth_type == 'legacy':
180            if auth:
181                self.auth = auth
182            else:
183                self.log.error( "[access]: No authorizer initialized, " +\
184                        "creating local one.")
185                auth = authorizer()
186            self.get_access = self.legacy_get_access
187        elif self.auth_type == 'abac':
188            self.auth = abac_authorizer(load=self.auth_dir, 
189                    update=os.path.join(self.auth_dir,'update'))
190        else:
191            raise service_error(service_error.internal, 
192                    "Unknown auth_type: %s" % self.auth_type)
193
194        if mapdb_file:
195            self.read_mapdb(mapdb_file)
196        else:
197            self.log.warn("[experiment_control] No testbed map, using defaults")
198            self.tbmap = { 
199                    'deter':'https://users.isi.deterlab.net:23235',
200                    'emulab':'https://users.isi.deterlab.net:23236',
201                    'ucb':'https://users.isi.deterlab.net:23237',
202                    }
203
204        # Grab saved state.  OK to do this w/o locking because it's read only
205        # and only one thread should be in existence that can see self.state at
206        # this point.
207        if self.state_filename:
208            self.read_state()
209
210        if self.store_filename:
211            self.read_store()
212        else:
213            self.log.warning("No saved synch store")
214            self.synch_store = synch_store
215
216        # Dispatch tables
217        self.soap_services = {\
218                'New': soap_handler('New', self.new_experiment),
219                'Create': soap_handler('Create', self.create_experiment),
220                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
221                'Vis': soap_handler('Vis', self.get_vis),
222                'Info': soap_handler('Info', self.get_info),
223                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
224                'Operation': soap_handler('Operation', self.do_operation),
225                'Terminate': soap_handler('Terminate', 
226                    self.terminate_experiment),
227                'GetValue': soap_handler('GetValue', self.GetValue),
228                'SetValue': soap_handler('SetValue', self.SetValue),
229        }
230
231        self.xmlrpc_services = {\
232                'New': xmlrpc_handler('New', self.new_experiment),
233                'Create': xmlrpc_handler('Create', self.create_experiment),
234                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
235                'Vis': xmlrpc_handler('Vis', self.get_vis),
236                'Info': xmlrpc_handler('Info', self.get_info),
237                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
238                'Terminate': xmlrpc_handler('Terminate',
239                    self.terminate_experiment),
240                'Operation': xmlrpc_handler('Operation', self.do_operation),
241                'GetValue': xmlrpc_handler('GetValue', self.GetValue),
242                'SetValue': xmlrpc_handler('SetValue', self.SetValue),
243        }
244
245    # Call while holding self.state_lock
246    def write_state(self):
247        """
248        Write a new copy of experiment state after copying the existing state
249        to a backup.
250
251        State format is a simple pickling of the state dictionary.
252        """
253        if os.access(self.state_filename, os.W_OK):
254            copy_file(self.state_filename, \
255                    "%s.bak" % self.state_filename)
256        try:
257            f = open(self.state_filename, 'w')
258            pickle.dump(self.state, f)
259        except EnvironmentError, e:
260            self.log.error("Can't write file %s: %s" % \
261                    (self.state_filename, e))
262        except pickle.PicklingError, e:
263            self.log.error("Pickling problem: %s" % e)
264        except TypeError, e:
265            self.log.error("Pickling problem (TypeError): %s" % e)
266
267    @staticmethod
268    def get_alloc_ids(exp):
269        """
270        Used by read_store and read state.  This used to be worse.
271        """
272
273        return [ a.allocID for a in exp.get_all_allocations() ]
274
275
276    # Call while holding self.state_lock
277    def read_state(self):
278        """
279        Read a new copy of experiment state.  Old state is overwritten.
280
281        State format is a simple pickling of the state dictionary.
282        """
283       
284        try:
285            f = open(self.state_filename, "r")
286            self.state = pickle.load(f)
287            self.log.debug("[read_state]: Read state from %s" % \
288                    self.state_filename)
289        except EnvironmentError, e:
290            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
291                    % (self.state_filename, e))
292        except pickle.UnpicklingError, e:
293            self.log.warning(("[read_state]: No saved state: " + \
294                    "Unpickling failed: %s") % e)
295       
296        for s in self.state.values():
297            try:
298
299                eid = s.fedid
300                if eid : 
301                    if self.auth_type == 'legacy':
302                        # XXX: legacy
303                        # Give the owner rights to the experiment
304                        #self.auth.set_attribute(s['owner'], eid)
305                        # And holders of the eid as well
306                        self.auth.set_attribute(eid, eid)
307                        # allow overrides to control experiments as well
308                        for o in self.overrides:
309                            self.auth.set_attribute(o, eid)
310                        # Set permissions to allow reading of the software
311                        # repo, if any, as well.
312                        for a in self.get_alloc_ids(s):
313                            self.auth.set_attribute(a, 'repo/%s' % eid)
314                else:
315                    raise KeyError("No experiment id")
316            except KeyError, e:
317                self.log.warning("[read_state]: State ownership or identity " +\
318                        "misformatted in %s: %s" % (self.state_filename, e))
319
320    def read_mapdb(self, file):
321        """
322        Read a simple colon separated list of mappings for the
323        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
324        also adds testbeds to active if they include , active after
325        their name.
326        """
327
328        self.tbmap = { }
329        lineno =0
330        try:
331            f = open(file, "r")
332            for line in f:
333                lineno += 1
334                line = line.strip()
335                if line.startswith('#') or len(line) == 0:
336                    continue
337                try:
338                    label, url = line.split(':', 1)
339                    self.tbmap[label] = url
340                except ValueError, e:
341                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
342                            "map db: %s %s" % (lineno, line, e))
343        except EnvironmentError, e:
344            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
345                    "open %s: %s" % (file, e))
346        else:
347            f.close()
348
349    def read_store(self):
350        try:
351            self.synch_store = synch_store()
352            self.synch_store.load(self.store_filename)
353            self.log.debug("[read_store]: Read store from %s" % \
354                    self.store_filename)
355        except EnvironmentError, e:
356            self.log.warning("[read_store]: No saved store: Can't open %s: %s"\
357                    % (self.state_filename, e))
358            self.synch_store = synch_store()
359
360        # Set the initial permissions on data in the store.  XXX: This ad hoc
361        # authorization attribute initialization is getting out of hand.
362        # XXX: legacy
363        if self.auth_type == 'legacy':
364            for k in self.synch_store.all_keys():
365                try:
366                    if k.startswith('fedid:'):
367                        fid = fedid(hexstr=k[6:46])
368                        if self.state.has_key(fid):
369                            for a in self.get_alloc_ids(self.state[fid]):
370                                self.auth.set_attribute(a, k)
371                except ValueError, e:
372                    self.log.warn("Cannot deduce permissions for %s" % k)
373
374
375    def write_store(self):
376        """
377        Write a new copy of synch_store after writing current state
378        to a backup.  We use the internal synch_store pickle method to avoid
379        incinsistent data.
380
381        State format is a simple pickling of the store.
382        """
383        if os.access(self.store_filename, os.W_OK):
384            copy_file(self.store_filename, \
385                    "%s.bak" % self.store_filename)
386        try:
387            self.synch_store.save(self.store_filename)
388        except EnvironmentError, e:
389            self.log.error("Can't write file %s: %s" % \
390                    (self.store_filename, e))
391        except TypeError, e:
392            self.log.error("Pickling problem (TypeError): %s" % e)
393
394    # XXX this may belong somewhere else
395
396    def get_grouper_updates(self, fid):
397        if self.grouper_url is None: return
398        d = tempfile.mkdtemp()
399        try:
400            fstr = "%s" % fid
401            # XXX locking
402            zipname = os.path.join(d, 'grouper.zip')
403            dest = os.path.join(self.auth_dir, 'update')
404            resp = urlopen('%s?uid=%s' % (self.grouper_url, fstr))
405            f = open(zipname, 'w')
406            f.write(resp.read())
407            f.close()
408            zf = zipfile.ZipFile(zipname, 'r')
409            zf.extractall(dest)
410            zf.close()
411        except URLError, e:
412            self.log.error("Cannot connect to grouper: %s" % e)
413            pass
414        finally:
415            shutil.rmtree(d)
416
417
418    def remove_dirs(self, dir):
419        """
420        Remove the directory tree and all files rooted at dir.  Log any errors,
421        but continue.
422        """
423        self.log.debug("[removedirs]: removing %s" % dir)
424        try:
425            for path, dirs, files in os.walk(dir, topdown=False):
426                for f in files:
427                    os.remove(os.path.join(path, f))
428                for d in dirs:
429                    os.rmdir(os.path.join(path, d))
430            os.rmdir(dir)
431        except EnvironmentError, e:
432            self.log.error("Error deleting directory tree in %s" % e);
433
434    @staticmethod
435    def make_temp_certfile(expcert, tmpdir):
436        """
437        make a protected copy of the access certificate so the experiment
438        controller can act as the experiment principal.  mkstemp is the most
439        secure way to do that. The directory should be created by
440        mkdtemp.  Return the filename.
441        """
442        if expcert and tmpdir:
443            try:
444                certf, certfn = tempfile.mkstemp(suffix=".pem", dir=tmpdir)
445                f = os.fdopen(certf, 'w')
446                print >> f, expcert
447                f.close()
448            except EnvironmentError, e:
449                raise service_error(service_error.internal, 
450                        "Cannot create temp cert file?")
451            return certfn
452        else:
453            return None
454
455       
456    def generate_ssh_keys(self, dest, type="rsa" ):
457        """
458        Generate a set of keys for the gateways to use to talk.
459
460        Keys are of type type and are stored in the required dest file.
461        """
462        valid_types = ("rsa", "dsa")
463        t = type.lower();
464        if t not in valid_types: raise ValueError
465        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
466
467        try:
468            trace = open("/dev/null", "w")
469        except EnvironmentError:
470            raise service_error(service_error.internal,
471                    "Cannot open /dev/null??");
472
473        # May raise CalledProcessError
474        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
475        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
476        if rv != 0:
477            raise service_error(service_error.internal, 
478                    "Cannot generate nonce ssh keys.  %s return code %d" \
479                            % (self.ssh_keygen, rv))
480
481    def generate_seer_certs(self, destdir):
482        '''
483        Create a SEER ca cert and a node cert in destdir/ca.pem and
484        destdir/node.pem respectively.  These will be distributed throughout
485        the federated experiment.  This routine reports errors via
486        service_errors.
487        '''
488        openssl = '/usr/bin/openssl'
489        # All the filenames and parameters we need for openssl calls below
490        ca_key =os.path.join(destdir, 'ca.key') 
491        ca_pem = os.path.join(destdir, 'ca.pem')
492        node_key =os.path.join(destdir, 'node.key') 
493        node_pem = os.path.join(destdir, 'node.pem')
494        node_req = os.path.join(destdir, 'node.req')
495        node_signed = os.path.join(destdir, 'node.signed')
496        days = '%s' % (365 * 10)
497        serial = '%s' % random.randint(0, 1<<16)
498
499        try:
500            # Sequence of calls to create a CA key, create a ca cert, create a
501            # node key, node signing request, and finally a signed node
502            # certificate.
503            sequence = (
504                    (openssl, 'genrsa', '-out', ca_key, '1024'),
505                    (openssl, 'req', '-new', '-x509', '-key', ca_key, '-out', 
506                        ca_pem, '-days', days, '-subj', 
507                        '/C=US/ST=CA/O=DETER/OU=fedd/CN=CA' ),
508                    (openssl, 'genrsa', '-out', node_key, '1024'),
509                    (openssl, 'req', '-new', '-key', node_key, '-out', 
510                        node_req, '-days', days, '-subj', 
511                        '/C=US/ST=CA/O=DETER/OU=fedd/CN=node' ),
512                    (openssl, 'x509', '-CA', ca_pem, '-CAkey', ca_key, 
513                        '-set_serial', serial, '-req', '-in', node_req, 
514                        '-out', node_signed, '-days', days),
515                )
516            # Do all that stuff; bail if there's an error, and push all the
517            # output to dev/null.
518            for cmd in sequence:
519                trace = open("/dev/null", "w")
520                rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
521                if rv != 0:
522                    raise service_error(service_error.internal, 
523                            "Cannot generate SEER certs.  %s return code %d" \
524                                    % (' '.join(cmd), rv))
525            # Concatinate the node key and signed certificate into node.pem
526            f = open(node_pem, 'w')
527            for comp in (node_signed, node_key):
528                g = open(comp, 'r')
529                f.write(g.read())
530                g.close()
531            f.close()
532
533            # Throw out intermediaries.
534            for fn in (ca_key, node_key, node_req, node_signed):
535                os.unlink(fn)
536
537        except EnvironmentError, e:
538            # Any difficulties with the file system wind up here
539            raise service_error(service_error.internal,
540                    "File error on  %s while creating SEER certs: %s" % \
541                            (e.filename, e.strerror))
542
543
544
545    def gentopo(self, str):
546        """
547        Generate the topology data structure from the splitter's XML
548        representation of it.
549
550        The topology XML looks like:
551            <experiment>
552                <nodes>
553                    <node><vname></vname><ips>ip1:ip2</ips></node>
554                </nodes>
555                <lans>
556                    <lan>
557                        <vname></vname><vnode></vnode><ip></ip>
558                        <bandwidth></bandwidth><member>node:port</member>
559                    </lan>
560                </lans>
561        """
562        class topo_parse:
563            """
564            Parse the topology XML and create the dats structure.
565            """
566            def __init__(self):
567                # Typing of the subelements for data conversion
568                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
569                self.int_subelements = ( 'bandwidth',)
570                self.float_subelements = ( 'delay',)
571                # The final data structure
572                self.nodes = [ ]
573                self.lans =  [ ]
574                self.topo = { \
575                        'node': self.nodes,\
576                        'lan' : self.lans,\
577                    }
578                self.element = { }  # Current element being created
579                self.chars = ""     # Last text seen
580
581            def end_element(self, name):
582                # After each sub element the contents is added to the current
583                # element or to the appropriate list.
584                if name == 'node':
585                    self.nodes.append(self.element)
586                    self.element = { }
587                elif name == 'lan':
588                    self.lans.append(self.element)
589                    self.element = { }
590                elif name in self.str_subelements:
591                    self.element[name] = self.chars
592                    self.chars = ""
593                elif name in self.int_subelements:
594                    self.element[name] = int(self.chars)
595                    self.chars = ""
596                elif name in self.float_subelements:
597                    self.element[name] = float(self.chars)
598                    self.chars = ""
599
600            def found_chars(self, data):
601                self.chars += data.rstrip()
602
603
604        tp = topo_parse();
605        parser = xml.parsers.expat.ParserCreate()
606        parser.EndElementHandler = tp.end_element
607        parser.CharacterDataHandler = tp.found_chars
608
609        parser.Parse(str)
610
611        return tp.topo
612       
613
614    def genviz(self, topo):
615        """
616        Generate the visualization the virtual topology
617        """
618
619        neato = "/usr/local/bin/neato"
620        # These are used to parse neato output and to create the visualization
621        # file.
622        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="([\d\.]+),([\d\.]+)"')
623        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
624                "%s</type></node>"
625
626        try:
627            # Node names
628            nodes = [ n['vname'] for n in topo['node'] ]
629            topo_lans = topo['lan']
630        except KeyError, e:
631            raise service_error(service_error.internal, "Bad topology: %s" %e)
632
633        lans = { }
634        links = { }
635
636        # Walk through the virtual topology, organizing the connections into
637        # 2-node connections (links) and more-than-2-node connections (lans).
638        # When a lan is created, it's added to the list of nodes (there's a
639        # node in the visualization for the lan).
640        for l in topo_lans:
641            if links.has_key(l['vname']):
642                if len(links[l['vname']]) < 2:
643                    links[l['vname']].append(l['vnode'])
644                else:
645                    nodes.append(l['vname'])
646                    lans[l['vname']] = links[l['vname']]
647                    del links[l['vname']]
648                    lans[l['vname']].append(l['vnode'])
649            elif lans.has_key(l['vname']):
650                lans[l['vname']].append(l['vnode'])
651            else:
652                links[l['vname']] = [ l['vnode'] ]
653
654
655        # Open up a temporary file for dot to turn into a visualization
656        try:
657            df, dotname = tempfile.mkstemp()
658            dotfile = os.fdopen(df, 'w')
659        except EnvironmentError:
660            raise service_error(service_error.internal,
661                    "Failed to open file in genviz")
662
663        try:
664            dnull = open('/dev/null', 'w')
665        except EnvironmentError:
666            service_error(service_error.internal,
667                    "Failed to open /dev/null in genviz")
668
669        # Generate a dot/neato input file from the links, nodes and lans
670        try:
671            print >>dotfile, "graph G {"
672            for n in nodes:
673                print >>dotfile, '\t"%s"' % n
674            for l in links.keys():
675                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
676            for l in lans.keys():
677                for n in lans[l]:
678                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
679            print >>dotfile, "}"
680            dotfile.close()
681        except TypeError:
682            raise service_error(service_error.internal,
683                    "Single endpoint link in vtopo")
684        except EnvironmentError:
685            raise service_error(service_error.internal, "Cannot write dot file")
686
687        # Use dot to create a visualization
688        try:
689            dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005',
690                '-Gmaxiter=2000', '-Gpack=true', dotname], stdout=PIPE,
691                stderr=dnull, close_fds=True)
692        except EnvironmentError:
693            raise service_error(service_error.internal, 
694                    "Cannot generate visualization: is graphviz available?")
695        dnull.close()
696
697        # Translate dot to vis format
698        vis_nodes = [ ]
699        vis = { 'node': vis_nodes }
700        for line in dot.stdout:
701            m = vis_re.match(line)
702            if m:
703                vn = m.group(1)
704                vis_node = {'name': vn, \
705                        'x': float(m.group(2)),\
706                        'y' : float(m.group(3)),\
707                    }
708                if vn in links.keys() or vn in lans.keys():
709                    vis_node['type'] = 'lan'
710                else:
711                    vis_node['type'] = 'node'
712                vis_nodes.append(vis_node)
713        rv = dot.wait()
714
715        os.remove(dotname)
716        # XXX: graphviz seems to use low return codes for warnings, like
717        # "couldn't find font"
718        if rv < 2 : return vis
719        else: return None
720
721
722    def release_access(self, tb, aid, tbmap=None, uri=None, cert_file=None,
723            cert_pwd=None):
724        """
725        Release access to testbed through fedd
726        """
727
728        if not uri and tbmap:
729            uri = tbmap.get(tb, None)
730        if not uri:
731            raise service_error(service_error.server_config, 
732                    "Unknown testbed: %s" % tb)
733
734        if self.local_access.has_key(uri):
735            resp = self.local_access[uri].ReleaseAccess(\
736                    { 'ReleaseAccessRequestBody' : 
737                        {'allocID': {'fedid': aid}},}, 
738                    fedid(file=cert_file))
739            resp = { 'ReleaseAccessResponseBody': resp } 
740        else:
741            resp = self.call_ReleaseAccess(uri, {'allocID': {'fedid': aid} },
742                    cert_file, cert_pwd, self.trusted_certs)
743
744        # better error coding
745
746    def remote_ns2topdl(self, uri, desc):
747
748        req = {
749                'description' : { 'ns2description': desc },
750            }
751
752        r = self.call_Ns2Topdl(uri, req, self.cert_file, self.cert_pwd, 
753                self.trusted_certs)
754
755        if r.has_key('Ns2TopdlResponseBody'):
756            r = r['Ns2TopdlResponseBody']
757            ed = r.get('experimentdescription', None)
758            if ed.has_key('topdldescription'):
759                return topdl.Topology(**ed['topdldescription'])
760            else:
761                raise service_error(service_error.protocol, 
762                        "Bad splitter response (no output)")
763        else:
764            raise service_error(service_error.protocol, "Bad splitter response")
765
766    class start_segment:
767        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
768                cert_pwd=None, trusted_certs=None, caller=None,
769                log_collector=None):
770            self.log = log
771            self.debug = debug
772            self.cert_file = cert_file
773            self.cert_pwd = cert_pwd
774            self.trusted_certs = None
775            self.caller = caller
776            self.testbed = testbed
777            self.log_collector = log_collector
778            self.response = None
779            self.node = { }
780            self.subs = { }
781            self.tb = { }
782            self.proof = None
783
784        def make_map(self, resp):
785            if 'segmentdescription' not in resp  or \
786                    'topdldescription' not in resp['segmentdescription']:
787                self.log.warn('No topology returned from startsegment')
788                return 
789
790            top = topdl.Topology(
791                    **resp['segmentdescription']['topdldescription'])
792
793            for e in top.elements:
794                if isinstance(e, topdl.Computer):
795                    self.node[e.name] = e
796                elif isinstance(e, topdl.Testbed):
797                    self.tb[e.uri] = e
798            for s in top.substrates:
799                self.subs[s.name] = s
800
801        def __call__(self, uri, aid, topo, masters, attrs=None, connInfo=None):
802            req = {
803                    'allocID': { 'fedid' : aid }, 
804                    'segmentdescription': { 
805                        'topdldescription': topo.to_dict(),
806                    },
807                }
808
809            if connInfo:
810                req['connection'] = connInfo
811
812            import_svcs = [ s for m in masters.values() \
813                    for s in m if self.testbed in s.importers]
814
815            if import_svcs or self.testbed in masters:
816                req['service'] = []
817
818            for s in import_svcs:
819                for r in s.reqs:
820                    sr = copy.deepcopy(r)
821                    sr['visibility'] = 'import';
822                    req['service'].append(sr)
823
824            for s in masters.get(self.testbed, []):
825                for r in s.reqs:
826                    sr = copy.deepcopy(r)
827                    sr['visibility'] = 'export';
828                    req['service'].append(sr)
829
830            if attrs:
831                req['fedAttr'] = attrs
832
833            try:
834                self.log.debug("Calling StartSegment at %s " % uri)
835                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
836                        self.trusted_certs)
837                if r.has_key('StartSegmentResponseBody'):
838                    lval = r['StartSegmentResponseBody'].get('allocationLog',
839                            None)
840                    if lval and self.log_collector:
841                        for line in  lval.splitlines(True):
842                            self.log_collector.write(line)
843                    self.make_map(r['StartSegmentResponseBody'])
844                    if 'proof' in r: self.proof = r['proof']
845                    self.response = r
846                else:
847                    raise service_error(service_error.internal, 
848                            "Bad response!?: %s" %r)
849                return True
850            except service_error, e:
851                self.log.error("Start segment failed on %s: %s" % \
852                        (self.testbed, e))
853                return False
854
855
856
857    class terminate_segment:
858        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
859                cert_pwd=None, trusted_certs=None, caller=None):
860            self.log = log
861            self.debug = debug
862            self.cert_file = cert_file
863            self.cert_pwd = cert_pwd
864            self.trusted_certs = None
865            self.caller = caller
866            self.testbed = testbed
867
868        def __call__(self, uri, aid ):
869            req = {
870                    'allocID': {'fedid': aid }, 
871                }
872            self.log.info("Calling terminate segment")
873            try:
874                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
875                        self.trusted_certs)
876                self.log.info("Terminate segment succeeded")
877                return True
878            except service_error, e:
879                self.log.error("Terminate segment failed on %s: %s" % \
880                        (self.testbed, e))
881                return False
882
883    class info_segment(start_segment):
884        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
885                cert_pwd=None, trusted_certs=None, caller=None,
886                log_collector=None):
887            experiment_control_local.start_segment.__init__(self, debug, 
888                    log, testbed, cert_file, cert_pwd, trusted_certs, 
889                    caller, log_collector)
890
891        def __call__(self, uri, aid):
892            req = { 'allocID': { 'fedid' : aid } }
893
894            try:
895                self.log.debug("Calling InfoSegment at %s " % uri)
896                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
897                        self.trusted_certs)
898                if r.has_key('InfoSegmentResponseBody'):
899                    self.make_map(r['InfoSegmentResponseBody'])
900                    if 'proof' in r: self.proof = r['proof']
901                    self.response = r
902                else:
903                    raise service_error(service_error.internal, 
904                            "Bad response!?: %s" %r)
905                return True
906            except service_error, e:
907                self.log.error("Info segment failed on %s: %s" % \
908                        (self.testbed, e))
909                return False
910
911    class operation_segment:
912        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
913                cert_pwd=None, trusted_certs=None, caller=None,
914                log_collector=None):
915            self.log = log
916            self.debug = debug
917            self.cert_file = cert_file
918            self.cert_pwd = cert_pwd
919            self.trusted_certs = None
920            self.caller = caller
921            self.testbed = testbed
922            self.status = None
923
924        def __call__(self, uri, aid, op, targets, params):
925            req = { 
926                    'allocID': { 'fedid' : aid },
927                    'operation': op,
928                    'target': targets,
929                    }
930            if params: req['parameter'] = params
931
932
933            try:
934                self.log.debug("Calling OperationSegment at %s " % uri)
935                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
936                        self.trusted_certs)
937                if 'OperationSegmentResponseBody' in r:
938                    r = r['OperationSegmentResponseBody']
939                    if 'status' in r:
940                        self.status = r['status']
941                else:
942                    raise service_error(service_error.internal, 
943                            "Bad response!?: %s" %r)
944                return True
945            except service_error, e:
946                self.log.error("Operation segment failed on %s: %s" % \
947                        (self.testbed, e))
948                return False
949
950    def annotate_topology(self, top, data):
951        # These routines do various parts of the annotation
952        def add_new_names(nl, l):
953            """ add any names in nl to the list in l """
954            for n in nl:
955                if n not in l: l.append(n)
956       
957        def merge_services(ne, e):
958            for ns in ne.service:
959                # NB: the else is on the for
960                for s in e.service:
961                    if ns.name == s.name:
962                        s.importer = ns.importer
963                        s.param = ns.param
964                        s.description = ns.description
965                        s.status = ns.status
966                        break
967                else:
968                    e.service.append(ns)
969       
970        def merge_oses(ne, e):
971            """
972            Merge the operating system entries of ne into e
973            """
974            for nos in ne.os:
975                # NB: the else is on the for
976                for os in e.os:
977                    if nos.name == os.name:
978                        os.version = nos.version
979                        os.version = nos.distribution
980                        os.version = nos.distributionversion
981                        for a in nos.attribute:
982                            if os.get_attribute(a.attribute):
983                                os.remove_attribute(a.attribute)
984                            os.set_attribute(a.attribute, a.value)
985                        break
986                else:
987                    # If both nodes have one OS, this is a replacement
988                    if len(ne.os) == 1 and len(e.os) == 1: e.os = ne.os
989                    else: e.os.append(nos)
990
991        # Annotate the topology with embedding info
992        for e in top.elements:
993            if isinstance(e, topdl.Computer):
994                for s in data:
995                    ne = s.node.get(e.name, None)
996                    if ne is not None:
997                        add_new_names(ne.localname, e.localname)
998                        e.status = ne.status
999                        merge_services(ne, e)
1000                        add_new_names(ne.operation, e.operation)
1001                        if ne.os: merge_oses(ne, e)
1002                        break
1003            elif isinstance(e,topdl.Testbed):
1004                for s in data:
1005                    ne = s.tb.get(e.uri, None)
1006                    if ne is not None:
1007                        add_new_names(ne.localname, e.localname)
1008                        add_new_names(ne.operation, e.operation)
1009                        merge_services(ne, e)
1010                        for a in ne.attribute:
1011                            e.set_attribute(a.attribute, a.value)
1012        # Annotate substrates
1013        for s in top.substrates:
1014            for d in data:
1015                ss = d.subs.get(s.name, None)
1016                if ss is not None:
1017                    if ss.capacity is not None:
1018                        s.capacity = ss.capacity
1019                    if s.latency is not None:
1020                        s.latency = ss.latency
1021
1022
1023
1024    def allocate_resources(self, allocated, masters, eid, expid, 
1025            tbparams, top, topo, tmpdir, alloc_log=None, log_collector=None, 
1026            attrs=None, connInfo={}, tbmap=None, expcert=None):
1027
1028        started = { }           # Testbeds where a sub-experiment started
1029                                # successfully
1030
1031        # XXX
1032        fail_soft = False
1033
1034        if tbmap is None: tbmap = { }
1035
1036        log = alloc_log or self.log
1037
1038        tp = thread_pool(self.nthreads)
1039        threads = [ ]
1040        starters = [ ]
1041
1042        if expcert:
1043            cert = expcert
1044            pw = None
1045        else:
1046            cert = self.cert_file
1047            pw = self.cert_pwd
1048
1049        for tb in allocated.keys():
1050            # Create and start a thread to start the segment, and save it
1051            # to get the return value later
1052            tb_attrs = copy.copy(attrs)
1053            tp.wait_for_slot()
1054            uri = tbparams[tb].uri or tbmap.get(testbed_base(tb), None)
1055            base, suffix = split_testbed(tb)
1056            if suffix:
1057                tb_attrs.append({'attribute': 'experiment_name', 
1058                    'value': "%s-%s" % (eid, suffix)})
1059            else:
1060                tb_attrs.append({'attribute': 'experiment_name', 'value': eid})
1061            if not uri:
1062                raise service_error(service_error.internal, 
1063                        "Unknown testbed %s !?" % tb)
1064
1065            aid = tbparams[tb].allocID
1066            if not aid:
1067                raise service_error(service_error.internal, 
1068                        "No alloc id for testbed %s !?" % tb)
1069
1070            s = self.start_segment(log=log, debug=self.debug,
1071                    testbed=tb, cert_file=cert,
1072                    cert_pwd=pw, trusted_certs=self.trusted_certs,
1073                    caller=self.call_StartSegment,
1074                    log_collector=log_collector)
1075            starters.append(s)
1076            t  = pooled_thread(\
1077                    target=s, name=tb,
1078                    args=(uri, aid, topo[tb], masters, tb_attrs, connInfo[tb]),
1079                    pdata=tp, trace_file=self.trace_file)
1080            threads.append(t)
1081            t.start()
1082
1083        # Wait until all finish (keep pinging the log, though)
1084        mins = 0
1085        revoked = False
1086        while not tp.wait_for_all_done(60.0):
1087            mins += 1
1088            alloc_log.info("Waiting for sub threads (it has been %d mins)" \
1089                    % mins)
1090            if not revoked and \
1091                    len([ t.getName() for t in threads if t.rv == False]) > 0:
1092                # a testbed has failed.  Revoke this experiment's
1093                # synchronizarion values so that sub experiments will not
1094                # deadlock waiting for synchronization that will never happen
1095                self.log.info("A subexperiment has failed to swap in, " + \
1096                        "revoking synch keys")
1097                var_key = "fedid:%s" % expid
1098                for k in self.synch_store.all_keys():
1099                    if len(k) > 45 and k[0:46] == var_key:
1100                        self.synch_store.revoke_key(k)
1101                revoked = True
1102
1103        failed = [ t.getName() for t in threads if not t.rv ]
1104        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1105
1106        # If one failed clean up, unless fail_soft is set
1107        if failed:
1108            if not fail_soft:
1109                tp.clear()
1110                for tb in succeeded:
1111                    # Create and start a thread to stop the segment
1112                    tp.wait_for_slot()
1113                    uri = tbparams[tb].uri
1114                    t  = pooled_thread(\
1115                            target=self.terminate_segment(log=log,
1116                                testbed=tb,
1117                                cert_file=cert, 
1118                                cert_pwd=pw,
1119                                trusted_certs=self.trusted_certs,
1120                                caller=self.call_TerminateSegment),
1121                            args=(uri, tbparams[tb].allocID),
1122                            name=tb,
1123                            pdata=tp, trace_file=self.trace_file)
1124                    t.start()
1125                # Wait until all finish (if any are being stopped)
1126                if succeeded:
1127                    tp.wait_for_all_done()
1128
1129                # release the allocations
1130                for tb in tbparams.keys():
1131                    try:
1132                        self.release_access(tb, tbparams[tb].allocID, 
1133                                tbmap=tbmap, uri=tbparams[tb].uri,
1134                                cert_file=cert, cert_pwd=pw)
1135                    except service_error, e:
1136                        self.log.warn("Error releasing access: %s" % e.desc)
1137                # Remove the placeholder
1138                self.state_lock.acquire()
1139                self.state[eid].status = 'failed'
1140                self.state[eid].updated()
1141                if self.state_filename: self.write_state()
1142                self.state_lock.release()
1143                # Remove the repo dir
1144                self.remove_dirs("%s/%s" %(self.repodir, expid))
1145                # Walk up tmpdir, deleting as we go
1146                if self.cleanup:
1147                    self.remove_dirs(tmpdir)
1148                else:
1149                    log.debug("[start_experiment]: not removing %s" % tmpdir)
1150
1151
1152                log.error("Swap in failed on %s" % ",".join(failed))
1153                return
1154        else:
1155            # Walk through the successes and gather the proofs
1156            proofs = { }
1157            for s in starters:
1158                if s.proof:
1159                    proofs[s.testbed] = s.proof
1160            self.annotate_topology(top, starters)
1161            log.info("[start_segment]: Experiment %s active" % eid)
1162
1163
1164        # Walk up tmpdir, deleting as we go
1165        if self.cleanup:
1166            self.remove_dirs(tmpdir)
1167        else:
1168            log.debug("[start_experiment]: not removing %s" % tmpdir)
1169
1170        # Insert the experiment into our state and update the disk copy.
1171        self.state_lock.acquire()
1172        self.state[expid].status = 'active'
1173        self.state[eid] = self.state[expid]
1174        self.state[eid].top = top
1175        self.state[eid].updated()
1176        # Append startup proofs
1177        for f in self.state[eid].get_all_allocations():
1178            if f.tb in proofs:
1179                f.proof.append(proofs[f.tb])
1180
1181        if self.state_filename: self.write_state()
1182        self.state_lock.release()
1183        return
1184
1185
1186    def add_kit(self, e, kit):
1187        """
1188        Add a Software object created from the list of (install, location)
1189        tuples passed as kit  to the software attribute of an object e.  We
1190        do this enough to break out the code, but it's kind of a hack to
1191        avoid changing the old tuple rep.
1192        """
1193
1194        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1195
1196        if isinstance(e.software, list): e.software.extend(s)
1197        else: e.software = s
1198
1199    def append_experiment_authorization(self, expid, attrs, 
1200            need_state_lock=True):
1201        """
1202        Append the authorization information to system state
1203        """
1204
1205        for p, a in attrs:
1206            # PNNL Debug
1207            self.log.debug("Append auth: %s %s" % (p, a))
1208            if p and a:
1209                # Improperly configured overrides can add bad principals.
1210                self.auth.set_attribute(p, a)
1211            else:
1212                self.log.debug("Bad append experiment_authorization %s %s" \
1213                        % (p, a))
1214        self.auth.save()
1215
1216        if need_state_lock: self.state_lock.acquire()
1217        # XXX: really a no op?
1218        #self.state[expid]['auth'].update(attrs)
1219        if self.state_filename: self.write_state()
1220        if need_state_lock: self.state_lock.release()
1221
1222    def clear_experiment_authorization(self, expid, need_state_lock=True):
1223        """
1224        Attrs is a set of attribute principal pairs that need to be removed
1225        from the authenticator.  Remove them and save the authenticator.
1226        """
1227
1228        if need_state_lock: self.state_lock.acquire()
1229        # XXX: should be a no-op
1230        #if expid in self.state and 'auth' in self.state[expid]:
1231            #for p, a in self.state[expid]['auth']:
1232                #self.auth.unset_attribute(p, a)
1233            #self.state[expid]['auth'] = set()
1234        if self.state_filename: self.write_state()
1235        if need_state_lock: self.state_lock.release()
1236        self.auth.save()
1237
1238
1239    def create_experiment_state(self, fid, req, expid, expcert,
1240            state='starting'):
1241        """
1242        Create the initial entry in the experiment's state.  The expid and
1243        expcert are the experiment's fedid and certifacte that represents that
1244        ID, which are installed in the experiment state.  If the request
1245        includes a suggested local name that is used if possible.  If the local
1246        name is already taken by an experiment owned by this user that has
1247        failed, it is overwritten.  Otherwise new letters are added until a
1248        valid localname is found.  The generated local name is returned.
1249        """
1250
1251        if req.has_key('experimentID') and \
1252                req['experimentID'].has_key('localname'):
1253            overwrite = False
1254            eid = req['experimentID']['localname']
1255            # If there's an old failed experiment here with the same local name
1256            # and accessible by this user, we'll overwrite it, otherwise we'll
1257            # fall through and do the collision avoidance.
1258            old_expid = self.get_experiment_fedid(eid)
1259            if old_expid:
1260                users_experiment = True
1261                try:
1262                    self.check_experiment_access(fid, old_expid)
1263                except service_error, e:
1264                    if e.code == service_error.access: users_experiment = False
1265                    else: raise e
1266                if users_experiment:
1267                    self.state_lock.acquire()
1268                    status = self.state[eid].status
1269                    if status and status == 'failed':
1270                        # remove the old access attributes
1271                        self.clear_experiment_authorization(eid,
1272                                need_state_lock=False)
1273                        overwrite = True
1274                        del self.state[eid]
1275                        del self.state[old_expid]
1276                    self.state_lock.release()
1277                else:
1278                    self.log.info('Experiment %s exists, ' % eid + \
1279                            'but this user cannot access it')
1280            self.state_lock.acquire()
1281            while (self.state.has_key(eid) and not overwrite):
1282                eid += random.choice(string.ascii_letters)
1283            # Initial state
1284            self.state[eid] = experiment_info(fedid=expid, localname=eid, 
1285                    identity=expcert)
1286            self.state[expid] = self.state[eid]
1287            if self.state_filename: self.write_state()
1288            self.state_lock.release()
1289        else:
1290            eid = self.exp_stem
1291            for i in range(0,5):
1292                eid += random.choice(string.ascii_letters)
1293            self.state_lock.acquire()
1294            while (self.state.has_key(eid)):
1295                eid = self.exp_stem
1296                for i in range(0,5):
1297                    eid += random.choice(string.ascii_letters)
1298            # Initial state
1299            self.state[eid] = experiment_info(fedid=expid, localname=eid, 
1300                    identity=expcert)
1301            self.state[expid] = self.state[eid]
1302            if self.state_filename: self.write_state()
1303            self.state_lock.release()
1304
1305        # Let users touch the state.  Authorize this fid and the expid itself
1306        # to touch the experiment, as well as allowing th eoverrides.
1307        self.append_experiment_authorization(eid, 
1308                set([(fid, expid), (expid,expid)] + \
1309                        [ (o, expid) for o in self.overrides]))
1310
1311        return eid
1312
1313
1314    def allocate_ips_to_topo(self, top):
1315        """
1316        Add an ip4_address attribute to all the hosts in the topology that have
1317        not already been assigned one by the user, based on the shared
1318        substrates on which they sit.  An /etc/hosts file is also created and
1319        returned as a list of hostfiles entries.  We also return the allocator,
1320        because we may need to allocate IPs to portals
1321        (specifically DRAGON portals).
1322        """
1323        subs = sorted(top.substrates, 
1324                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1325                reverse=True)
1326        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1327        ifs = { }
1328        hosts = [ ]
1329
1330        assigned = { }
1331        assigned_subs = set()
1332        # Look for IP networks that were already assigned to the experiment by
1333        # the user.  We assume that users are smart in making that assignment,
1334        # but we check them later.
1335        for e in top.elements:
1336            if not isinstance(e, topdl.Computer): continue
1337            for inf in e.interface:
1338                a = inf.get_attribute('ip4_address')
1339                if not a: continue
1340                for s in inf.substrate:
1341                    assigned_subs.add(s)
1342                a = ip_addr(a)
1343                n = ip_addr(inf.get_attribute('ip4_netmask'))
1344
1345                sz = 0x100000000 - long(n)
1346                net = (long(a) & long(n)) & 0xffffffff
1347                if net in assigned:
1348                    if sz > assigned[net]: assigned[net] = sz
1349                assigned[net] = sz
1350
1351        # Walk all substrates, smallest to largest, assigning IP addresses to
1352        # the unassigned and checking the assigned.  Build an /etc/hosts file
1353        # as well.
1354        for idx, s in enumerate(subs):
1355            if s.name not in assigned_subs:
1356                net_size = len(s.interfaces)+2
1357
1358                # Get an ip allocation for this substrate.  Make sure the
1359                # allocation does not collide with any user allocations.
1360                ok = False
1361                while not ok:
1362                    a = ips.allocate(net_size)
1363                    if not a:
1364                        raise service_error(service_error.req, 
1365                                "Cannot allocate IP addresses")
1366                    base, num = a
1367                    if num < net_size: 
1368                        raise service_error(service_error.internal,
1369                                "Allocator returned wrong number of IPs??")
1370                    # Check for overlap.  An assigned network could contain an
1371                    # endpoint of the new allocation or the new allocation
1372                    # could contain an endpoint of an assigned network.
1373                    # NB the else is attached to the for - if the loop is not
1374                    # exited by a break, ok = True.
1375                    for n, sz in assigned.items():
1376                        if base >= n and base < n + sz:
1377                            ok = False
1378                            break
1379                        if base + num > n and base + num  < n + sz:
1380                            ok = False
1381                            break
1382                        if n >= base and n < base + num:
1383                            ok = False
1384                            break
1385                        if n+sz > base and n+sz < base + num:
1386                            ok = False
1387                            break
1388                    else:
1389                        ok = True
1390                           
1391                mask = ips.min_alloc
1392                while mask < net_size:
1393                    mask *= 2
1394
1395                netmask = ((2**32-1) ^ (mask-1))
1396            else:
1397                base = 0
1398
1399            # Add the attributes and build hosts
1400            base += 1
1401            for i in s.interfaces:
1402                # Add address and masks to interfaces on unassigned substrates
1403                if s.name not in assigned_subs:
1404                    i.attribute.append(
1405                            topdl.Attribute('ip4_address', 
1406                                "%s" % ip_addr(base)))
1407                    i.attribute.append(
1408                            topdl.Attribute('ip4_netmask', 
1409                                "%s" % ip_addr(int(netmask))))
1410
1411                # Make /etc/hosts entries
1412                addr = i.get_attribute('ip4_address')
1413                if addr is None:
1414                    raise service_error(service_error.req, 
1415                            "Partially assigned substrate %s" %s.name)
1416                hname = i.element.name
1417                if hname in ifs:
1418                    hosts.append("%s\t%s-%s %s-%d" % \
1419                            (addr, hname, s.name, hname, ifs[hname]))
1420                else:
1421                    # First IP allocated it the default ip according to hosts,
1422                    # so the extra alias is here.
1423                    ifs[hname] = 0
1424                    hosts.append("%s\t%s-%s %s-%d %s" % \
1425                            (addr, hname, s.name, hname, ifs[hname], hname))
1426
1427                ifs[hname] += 1
1428                base += 1
1429        return hosts, ips
1430
1431    def get_access_to_testbeds(self, testbeds, fid, allocated, 
1432            tbparam, masters, tbmap, expid=None, expcert=None):
1433        for tb in testbeds:
1434            self.get_access(tb, tbparam, fid, masters, tbmap, expid,
1435                    expcert)
1436            allocated[tb] = 1
1437
1438    def get_access(self, tb, tbparam,fid, masters, tbmap, expid=None, 
1439            expcert=None):
1440        """
1441        Get access to testbed through fedd and set the parameters for that tb
1442        """
1443        def get_export_project(svcs):
1444            """
1445            Look through for the list of federated_service for this testbed
1446            objects for a project_export service, and extract the project
1447            parameter.
1448            """
1449
1450            pe = [s for s in svcs if s.name=='project_export']
1451            if len(pe) == 1:
1452                return pe[0].params.get('project', None)
1453            elif len(pe) == 0:
1454                return None
1455            else:
1456                raise service_error(service_error.req,
1457                        "More than one project export is not supported")
1458
1459        def add_services(svcs, type, slist, keys):
1460            """
1461            Add the given services to slist.  type is import or export.  Also
1462            add a mapping entry from the assigned id to the original service
1463            record.
1464            """
1465            for i, s in enumerate(svcs):
1466                idx = '%s%d' % (type, i)
1467                keys[idx] = s
1468                sr = {'id': idx, 'name': s.name, 'visibility': type }
1469                if s.params:
1470                    sr['fedAttr'] = [ { 'attribute': k, 'value': v } \
1471                            for k, v in s.params.items()]
1472                slist.append(sr)
1473
1474        uri = tbmap.get(testbed_base(tb), None)
1475        if not uri:
1476            raise service_error(service_error.server_config, 
1477                    "Unknown testbed: %s" % tb)
1478
1479        export_svcs = masters.get(tb,[])
1480        import_svcs = [ s for m in masters.values() \
1481                for s in m \
1482                    if tb in s.importers ]
1483
1484        export_project = get_export_project(export_svcs)
1485        # Compose the credential list so that IDs come before attributes
1486        creds = set()
1487        keys = set()
1488        certs = self.auth.get_creds_for_principal(fid)
1489        # Append credenials about this experiment controller - e.g. that it is
1490        # trusted.
1491        certs.update(self.auth.get_creds_for_principal(
1492            fedid(file=self.cert_file)))
1493        if expid:
1494            certs.update(self.auth.get_creds_for_principal(expid))
1495        for c in certs:
1496            keys.add(c.issuer_cert())
1497            creds.add(c.attribute_cert())
1498        creds = list(keys) + list(creds)
1499
1500        if expcert: cert, pw = expcert, None
1501        else: cert, pw = self.cert_file, self.cert_pw
1502
1503        # Request credentials
1504        req = {
1505                'abac_credential': creds,
1506            }
1507        # Make the service request from the services we're importing and
1508        # exporting.  Keep track of the export request ids so we can
1509        # collect the resulting info from the access response.
1510        e_keys = { }
1511        if import_svcs or export_svcs:
1512            slist = []
1513            add_services(import_svcs, 'import', slist, e_keys)
1514            add_services(export_svcs, 'export', slist, e_keys)
1515            req['service'] = slist
1516
1517        if self.local_access.has_key(uri):
1518            # Local access call
1519            req = { 'RequestAccessRequestBody' : req }
1520            r = self.local_access[uri].RequestAccess(req, 
1521                    fedid(file=self.cert_file))
1522            r = { 'RequestAccessResponseBody' : r }
1523        else:
1524            r = self.call_RequestAccess(uri, req, cert, pw, self.trusted_certs)
1525
1526        if r.has_key('RequestAccessResponseBody'):
1527            # Through to here we have a valid response, not a fault.
1528            # Access denied is a fault, so something better or worse than
1529            # access denied has happened.
1530            r = r['RequestAccessResponseBody']
1531            self.log.debug("[get_access] Access granted")
1532        else:
1533            raise service_error(service_error.protocol,
1534                        "Bad proxy response")
1535        if 'proof' not in r:
1536            raise service_error(service_error.protocol,
1537                        "Bad access response (no access proof)")
1538
1539        tbparam[tb] = allocation_info(allocID=r['allocID'].get('fedid', None),
1540                tb=tb, uri=uri, proof=[r['proof']], 
1541                services=masters.get(tb, None))
1542
1543        # Collect the responses corresponding to the services this testbed
1544        # exports.  These will be the service requests that we will include in
1545        # the start segment requests (with appropriate visibility values) to
1546        # import and export the segments.
1547        for s in r.get('service', []):
1548            id = s.get('id', None)
1549            # Note that this attaches the response to the object in the masters
1550            # data structure.  (The e_keys index disappears when this fcn
1551            # returns)
1552            if id and id in e_keys:
1553                e_keys[id].reqs.append(s)
1554
1555        # Add attributes to parameter space.  We don't allow attributes to
1556        # overlay any parameters already installed.
1557        for a in r.get('fedAttr', []):
1558            try:
1559                if a['attribute']:
1560                    tbparam[tb].set_attribute(a['attribute'], a['value'])
1561            except KeyError:
1562                self.log.error("Bad attribute in response: %s" % a)
1563
1564
1565    def split_topology(self, top, topo, testbeds):
1566        """
1567        Create the sub-topologies that are needed for experiment instantiation.
1568        """
1569        for tb in testbeds:
1570            topo[tb] = top.clone()
1571            # copy in for loop allows deletions from the original
1572            for e in [ e for e in topo[tb].elements]:
1573                etb = e.get_attribute('testbed')
1574                # NB: elements without a testbed attribute won't appear in any
1575                # sub topologies. 
1576                if not etb or etb != tb:
1577                    for i in e.interface:
1578                        for s in i.subs:
1579                            try:
1580                                s.interfaces.remove(i)
1581                            except ValueError:
1582                                raise service_error(service_error.internal,
1583                                        "Can't remove interface??")
1584                    topo[tb].elements.remove(e)
1585            topo[tb].make_indices()
1586
1587    def confirm_software(self, top):
1588        """
1589        Make sure that the software to be loaded in the topo is all available
1590        before we begin making access requests, etc.  This is a subset of
1591        wrangle_software.
1592        """
1593        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1594        pkgs.update([x.location for e in top.elements for x in e.software])
1595
1596        for pkg in pkgs:
1597            loc = pkg
1598
1599            scheme, host, path = urlparse(loc)[0:3]
1600            dest = os.path.basename(path)
1601            if not scheme:
1602                if not loc.startswith('/'):
1603                    loc = "/%s" % loc
1604                loc = "file://%s" %loc
1605            # NB: if scheme was found, loc == pkg
1606            try:
1607                u = urlopen(loc)
1608                u.close()
1609            except Exception, e:
1610                raise service_error(service_error.req, 
1611                        "Cannot open %s: %s" % (loc, e))
1612        return True
1613
1614    def wrangle_software(self, expid, top, topo, tbparams):
1615        """
1616        Copy software out to the repository directory, allocate permissions and
1617        rewrite the segment topologies to look for the software in local
1618        places.
1619        """
1620
1621        # XXX: PNNL debug
1622        self.log.debug("calling wrangle software")
1623        # Copy the rpms and tarfiles to a distribution directory from
1624        # which the federants can retrieve them
1625        linkpath = "%s/software" %  expid
1626        softdir ="%s/%s" % ( self.repodir, linkpath)
1627        softmap = { }
1628
1629        # self.fedkit and self.gateway kit are lists of tuples of
1630        # (install_location, download_location) this extracts the download
1631        # locations.
1632        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1633        pkgs.update([x.location for e in top.elements for x in e.software])
1634        try:
1635            # XXX: PNNL debug
1636            self.log.debug("wrangle software: makedirs")
1637            os.makedirs(softdir)
1638            # XXX: PNNL debug
1639            self.log.debug("wrangle software: makedirs success")
1640        except EnvironmentError, e:
1641            raise service_error(
1642                    "Cannot create software directory: %s" % e)
1643        # The actual copying.  Everything's converted into a url for copying.
1644        auth_attrs = set()
1645        for pkg in pkgs:
1646            loc = pkg
1647
1648            scheme, host, path = urlparse(loc)[0:3]
1649            dest = os.path.basename(path)
1650            if not scheme:
1651                if not loc.startswith('/'):
1652                    loc = "/%s" % loc
1653                loc = "file://%s" %loc
1654            # NB: if scheme was found, loc == pkg
1655            try:
1656                u = urlopen(loc)
1657            except Exception, e:
1658                raise service_error(service_error.req, 
1659                        "Cannot open %s: %s" % (loc, e))
1660            try:
1661                f = open("%s/%s" % (softdir, dest) , "w")
1662                self.log.debug("Writing %s/%s" % (softdir,dest) )
1663                data = u.read(4096)
1664                while data:
1665                    f.write(data)
1666                    data = u.read(4096)
1667                f.close()
1668                u.close()
1669            except Exception, e:
1670                raise service_error(service_error.internal,
1671                        "Could not copy %s: %s" % (loc, e))
1672            path = re.sub("/tmp", "", linkpath)
1673            # XXX
1674            softmap[pkg] = \
1675                    "%s/%s/%s" %\
1676                    ( self.repo_url, path, dest)
1677
1678            # Allow the individual segments to access the software by assigning
1679            # an attribute to each testbed allocation that encodes the data to
1680            # be released.  This expression collects the data for each run of
1681            # the loop.
1682            auth_attrs.update([
1683                (tbparams[tb].allocID, "/%s/%s" % ( path, dest)) \
1684                        for tb in tbparams.keys()])
1685
1686        self.append_experiment_authorization(expid, auth_attrs)
1687
1688        # Convert the software locations in the segments into the local
1689        # copies on this host
1690        for soft in [ s for tb in topo.values() \
1691                for e in tb.elements \
1692                    if getattr(e, 'software', False) \
1693                        for s in e.software ]:
1694            if softmap.has_key(soft.location):
1695                soft.location = softmap[soft.location]
1696
1697
1698    def new_experiment(self, req, fid):
1699        """
1700        The external interface to empty initial experiment creation called from
1701        the dispatcher.
1702
1703        Creates a working directory, splits the incoming description using the
1704        splitter script and parses out the avrious subsections using the
1705        lcasses above.  Once each sub-experiment is created, use pooled threads
1706        to instantiate them and start it all up.
1707        """
1708        self.log.info("New experiment call started for %s" % fid)
1709        req = req.get('NewRequestBody', None)
1710        if not req:
1711            raise service_error(service_error.req,
1712                    "Bad request format (no NewRequestBody)")
1713
1714        # import may partially succeed so always save credentials and warn
1715        if not self.auth.import_credentials(data_list=req.get('credential', [])):
1716            self.log.debug("Failed to import delegation credentials(!)")
1717        self.get_grouper_updates(fid)
1718        self.auth.update()
1719        self.auth.save()
1720
1721        try:
1722            access_ok, proof = self.auth.check_attribute(fid, 'new', 
1723                    with_proof=True)
1724        except service_error, e:
1725            self.log.info("New experiment call for %s: access denied" % fid)
1726            raise e
1727
1728
1729        if not access_ok:
1730            self.log.info("New experiment call for %s: Access denied" % fid)
1731            raise service_error(service_error.access, "New access denied",
1732                    proof=[proof])
1733
1734        try:
1735            tmpdir = tempfile.mkdtemp(prefix="split-")
1736        except EnvironmentError:
1737            raise service_error(service_error.internal, "Cannot create tmp dir")
1738
1739        # Generate an ID for the experiment (slice) and a certificate that the
1740        # allocator can use to prove they own it.  We'll ship it back through
1741        # the encrypted connection.  If the requester supplied one, use it.
1742        if 'experimentAccess' in req and 'X509' in req['experimentAccess']:
1743            expcert = req['experimentAccess']['X509']
1744            expid = fedid(certstr=expcert)
1745            self.state_lock.acquire()
1746            if expid in self.state:
1747                self.state_lock.release()
1748                raise service_error(service_error.req, 
1749                        'fedid %s identifies an existing experiment' % expid)
1750            self.state_lock.release()
1751        else:
1752            (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1753
1754        #now we're done with the tmpdir, and it should be empty
1755        if self.cleanup:
1756            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1757            os.rmdir(tmpdir)
1758        else:
1759            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1760
1761        eid = self.create_experiment_state(fid, req, expid, expcert, 
1762                state='empty')
1763
1764        rv = {
1765                'experimentID': [
1766                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1767                ],
1768                'experimentStatus': 'empty',
1769                'experimentAccess': { 'X509' : expcert },
1770                'proof': proof.to_dict(),
1771            }
1772
1773        self.log.info("New experiment call succeeded for %s" % fid)
1774        return rv
1775
1776    # create_experiment sub-functions
1777
1778    @staticmethod
1779    def get_experiment_key(req, field='experimentID'):
1780        """
1781        Parse the experiment identifiers out of the request (the request body
1782        tag has been removed).  Specifically this pulls either the fedid or the
1783        localname out of the experimentID field.  A fedid is preferred.  If
1784        neither is present or the request does not contain the fields,
1785        service_errors are raised.
1786        """
1787        # Get the experiment access
1788        exp = req.get(field, None)
1789        if exp:
1790            if exp.has_key('fedid'):
1791                key = exp['fedid']
1792            elif exp.has_key('localname'):
1793                key = exp['localname']
1794            else:
1795                raise service_error(service_error.req, "Unknown lookup type")
1796        else:
1797            raise service_error(service_error.req, "No request?")
1798
1799        return key
1800
1801    def get_experiment_ids_and_start(self, key, tmpdir):
1802        """
1803        Get the experiment name, id and access certificate from the state, and
1804        set the experiment state to 'starting'.  returns a triple (fedid,
1805        localname, access_cert_file). The access_cert_file is a copy of the
1806        contents of the access certificate, created in the tempdir with
1807        restricted permissions.  If things are confused, raise an exception.
1808        """
1809
1810        expid = eid = None
1811        self.state_lock.acquire()
1812        if key in self.state:
1813            exp = self.state[key]
1814            exp.status = "starting"
1815            exp.updated()
1816            expid = exp.fedid
1817            eid = exp.localname
1818            expcert = exp.identity
1819        self.state_lock.release()
1820
1821        # make a protected copy of the access certificate so the experiment
1822        # controller can act as the experiment principal.
1823        if expcert:
1824            expcert_file = self.make_temp_certfile(expcert, tmpdir)
1825            if not expcert_file:
1826                raise service_error(service_error.internal, 
1827                        "Cannot create temp cert file?")
1828        else:
1829            expcert_file = None
1830
1831        return (eid, expid, expcert_file)
1832
1833    def get_topology(self, req, tmpdir):
1834        """
1835        Get the ns2 content and put it into a file for parsing.  Call the local
1836        or remote parser and return the topdl.Topology.  Errors result in
1837        exceptions.  req is the request and tmpdir is a work directory.
1838        """
1839
1840        # The tcl parser needs to read a file so put the content into that file
1841        descr=req.get('experimentdescription', None)
1842        if descr:
1843            if 'ns2description' in descr:
1844                file_content=descr['ns2description']
1845            elif 'topdldescription' in descr:
1846                return topdl.Topology(**descr['topdldescription'])
1847            else:
1848                raise service_error(service_error.req, 
1849                        'Unknown experiment description type')
1850        else:
1851            raise service_error(service_error.req, "No experiment description")
1852
1853
1854        if self.splitter_url:
1855            self.log.debug("Calling remote topdl translator at %s" % \
1856                    self.splitter_url)
1857            top = self.remote_ns2topdl(self.splitter_url, file_content)
1858        else:
1859            tclfile = os.path.join(tmpdir, "experiment.tcl")
1860            if file_content:
1861                try:
1862                    f = open(tclfile, 'w')
1863                    f.write(file_content)
1864                    f.close()
1865                except EnvironmentError:
1866                    raise service_error(service_error.internal,
1867                            "Cannot write temp experiment description")
1868            else:
1869                raise service_error(service_error.req, 
1870                        "Only ns2descriptions supported")
1871            pid = "dummy"
1872            gid = "dummy"
1873            eid = "dummy"
1874
1875            tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
1876                str(self.muxmax), '-m', 'dummy']
1877
1878            tclcmd.extend([pid, gid, eid, tclfile])
1879
1880            self.log.debug("running local splitter %s", " ".join(tclcmd))
1881            # This is just fantastic.  As a side effect the parser copies
1882            # tb_compat.tcl into the current directory, so that directory
1883            # must be writable by the fedd user.  Doing this in the
1884            # temporary subdir ensures this is the case.
1885            tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
1886                    cwd=tmpdir)
1887            split_data = tclparser.stdout
1888
1889            top = topdl.topology_from_xml(file=split_data, top="experiment")
1890            os.remove(tclfile)
1891
1892        return top
1893
1894    def get_testbed_services(self, req, testbeds):
1895        """
1896        Parse the services section of the request into two dicts mapping
1897        testbed to lists of federated_service objects.  The first dict maps all
1898        exporters of services to those service objects, the second maps
1899        testbeds to service objects only for services requiring portals.
1900        """
1901
1902        # Sanity check the services.  Exports or imports from unknown testbeds
1903        # cause an exception.
1904        for s in req.get('service', []):
1905            for t in s.get('import', []):
1906                if t not in testbeds:
1907                    raise service_error(service_error.req, 
1908                            'Service import to unknown testbed: %s' %t)
1909            for t in s.get('export', []):
1910                if t not in testbeds:
1911                    raise service_error(service_error.req, 
1912                            'Service export from unknown testbed: %s' %t)
1913
1914
1915        # We construct both dicts here because deriving the second is more
1916        # complex than it looks - both the keys and lists can differ, and it's
1917        # much easier to generate both in one pass.
1918        masters = { }
1919        pmasters = { }
1920        for s in req.get('service', []):
1921            # If this is a service request with the importall field
1922            # set, fill it out.
1923
1924            if s.get('importall', False):
1925                s['import'] = [ tb for tb in testbeds \
1926                        if tb not in s.get('export',[])]
1927                del s['importall']
1928
1929            # Add the service to masters
1930            for tb in s.get('export', []):
1931                if s.get('name', None):
1932
1933                    params = { }
1934                    for a in s.get('fedAttr', []):
1935                        params[a.get('attribute', '')] = a.get('value','')
1936
1937                    fser = federated_service(name=s['name'],
1938                            exporter=tb, importers=s.get('import',[]),
1939                            params=params)
1940                    if fser.name == 'hide_hosts' \
1941                            and 'hosts' not in fser.params:
1942                        fser.params['hosts'] = \
1943                                ",".join(tb_hosts.get(fser.exporter, []))
1944                    if tb in masters: masters[tb].append(fser)
1945                    else: masters[tb] = [fser]
1946
1947                    if fser.portal:
1948                        if tb in pmasters: pmasters[tb].append(fser)
1949                        else: pmasters[tb] = [fser]
1950                else:
1951                    self.log.error('Testbed service does not have name " + \
1952                            "and importers')
1953        return masters, pmasters
1954
1955    def generate_keys_and_hosts(self, tmpdir, expid, hosts, tbparams):
1956        """
1957        Create the ssh keys necessary for interconnecting the portal nodes and
1958        the global hosts file for letting each segment know about the IP
1959        addresses in play.  Save these into the repo.  Add attributes to the
1960        autorizer allowing access controllers to download them and return a set
1961        of attributes that inform the segments where to find this stuff.  May
1962        raise service_errors in if there are problems.
1963        """
1964        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1965        gw_secretkey_base = "fed.%s" % self.ssh_type
1966        keydir = os.path.join(tmpdir, 'keys')
1967        gw_pubkey = os.path.join(keydir, gw_pubkey_base)
1968        gw_secretkey = os.path.join(keydir, gw_secretkey_base)
1969
1970        try:
1971            self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1972        except ValueError:
1973            raise service_error(service_error.server_config, 
1974                    "Bad key type (%s)" % self.ssh_type)
1975
1976        self.generate_seer_certs(keydir)
1977
1978        # Copy configuration files into the remote file store
1979        # The config urlpath
1980        configpath = "/%s/config" % expid
1981        # The config file system location
1982        configdir ="%s%s" % ( self.repodir, configpath)
1983        try:
1984            os.makedirs(configdir)
1985        except EnvironmentError, e:
1986            raise service_error(service_error.internal,
1987                    "Cannot create config directory: %s" % e)
1988        try:
1989            f = open("%s/hosts" % configdir, "w")
1990            print >> f, string.join(hosts, '\n')
1991            f.close()
1992        except EnvironmentError, e:
1993            raise service_error(service_error.internal, 
1994                    "Cannot write hosts file: %s" % e)
1995        try:
1996            copy_file(gw_pubkey, os.path.join(configdir, gw_pubkey_base))
1997            copy_file(gw_secretkey, os.path.join(configdir, gw_secretkey_base))
1998            copy_file(os.path.join(keydir, 'ca.pem'), 
1999                    os.path.join(configdir, 'ca.pem'))
2000            copy_file(os.path.join(keydir, 'node.pem'), 
2001                    os.path.join(configdir, 'node.pem'))
2002        except EnvironmentError, e:
2003            raise service_error(service_error.internal, 
2004                    "Cannot copy keyfiles: %s" % e)
2005
2006        # Allow the individual testbeds to access the configuration files,
2007        # again by setting an attribute for the relevant pathnames on each
2008        # allocation principal.  Yeah, that's a long list comprehension.
2009        self.append_experiment_authorization(expid, set([
2010            (tbparams[tb].allocID, "%s/%s" % (configpath, f)) \
2011                    for tb in tbparams.keys() \
2012                        for f in ("hosts", 'ca.pem', 'node.pem', 
2013                            gw_secretkey_base, gw_pubkey_base)]))
2014
2015        attrs = [ 
2016                {
2017                    'attribute': 'ssh_pubkey', 
2018                    'value': '%s/%s/config/%s' % \
2019                            (self.repo_url, expid, gw_pubkey_base)
2020                },
2021                {
2022                    'attribute': 'ssh_secretkey', 
2023                    'value': '%s/%s/config/%s' % \
2024                            (self.repo_url, expid, gw_secretkey_base)
2025                },
2026                {
2027                    'attribute': 'hosts', 
2028                    'value': '%s/%s/config/hosts' % \
2029                            (self.repo_url, expid)
2030                },
2031                {
2032                    'attribute': 'seer_ca_pem', 
2033                    'value': '%s/%s/config/%s' % \
2034                            (self.repo_url, expid, 'ca.pem')
2035                },
2036                {
2037                    'attribute': 'seer_node_pem', 
2038                    'value': '%s/%s/config/%s' % \
2039                            (self.repo_url, expid, 'node.pem')
2040                },
2041            ]
2042        return attrs
2043
2044
2045    def get_vtopo(self, req, fid):
2046        """
2047        Return the stored virtual topology for this experiment
2048        """
2049        rv = None
2050        state = None
2051        self.log.info("vtopo call started for %s" %  fid)
2052
2053        req = req.get('VtopoRequestBody', None)
2054        if not req:
2055            raise service_error(service_error.req,
2056                    "Bad request format (no VtopoRequestBody)")
2057        exp = req.get('experiment', None)
2058        if exp:
2059            if exp.has_key('fedid'):
2060                key = exp['fedid']
2061                keytype = "fedid"
2062            elif exp.has_key('localname'):
2063                key = exp['localname']
2064                keytype = "localname"
2065            else:
2066                raise service_error(service_error.req, "Unknown lookup type")
2067        else:
2068            raise service_error(service_error.req, "No request?")
2069
2070        try:
2071            proof = self.check_experiment_access(fid, key)
2072        except service_error, e:
2073            self.log.info("vtopo call failed for %s: access denied" %  fid)
2074            raise e
2075
2076        self.state_lock.acquire()
2077        # XXX: this needs to be recalculated
2078        if key in self.state:
2079            if self.state[key].top is not None:
2080                vtopo = topdl.topology_to_vtopo(self.state[key].top)
2081                rv = { 'experiment' : {keytype: key },
2082                        'vtopo': vtopo,
2083                        'proof': proof.to_dict(), 
2084                    }
2085            else:
2086                state = self.state[key].status
2087        self.state_lock.release()
2088
2089        if rv: 
2090            self.log.info("vtopo call completed for %s %s " % \
2091                (key, fid))
2092            return rv
2093        else: 
2094            if state:
2095                self.log.info("vtopo call completed for %s %s (Not ready)" % \
2096                    (key, fid))
2097                raise service_error(service_error.partial, 
2098                        "Not ready: %s" % state)
2099            else:
2100                self.log.info("vtopo call completed for %s %s (No experiment)"\
2101                        % (key, fid))
2102                raise service_error(service_error.req, "No such experiment")
2103
2104    def get_vis(self, req, fid):
2105        """
2106        Return the stored visualization for this experiment
2107        """
2108        rv = None
2109        state = None
2110
2111        self.log.info("vis call started for %s" %  fid)
2112        req = req.get('VisRequestBody', None)
2113        if not req:
2114            raise service_error(service_error.req,
2115                    "Bad request format (no VisRequestBody)")
2116        exp = req.get('experiment', None)
2117        if exp:
2118            if exp.has_key('fedid'):
2119                key = exp['fedid']
2120                keytype = "fedid"
2121            elif exp.has_key('localname'):
2122                key = exp['localname']
2123                keytype = "localname"
2124            else:
2125                raise service_error(service_error.req, "Unknown lookup type")
2126        else:
2127            raise service_error(service_error.req, "No request?")
2128
2129        try:
2130            proof = self.check_experiment_access(fid, key)
2131        except service_error, e:
2132            self.log.info("vis call failed for %s: access denied" %  fid)
2133            raise e
2134
2135        self.state_lock.acquire()
2136        # Generate the visualization
2137        if key in self.state:
2138            if self.state[key].top is not None:
2139                try:
2140                    vis = self.genviz(
2141                            topdl.topology_to_vtopo(self.state[key].top))
2142                except service_error, e:
2143                    self.state_lock.release()
2144                    raise e
2145                rv =  { 'experiment' : {keytype: key },
2146                        'vis': vis,
2147                        'proof': proof.to_dict(), 
2148                        }
2149            else:
2150                state = self.state[key].status
2151        self.state_lock.release()
2152
2153        if rv: 
2154            self.log.info("vis call completed for %s %s " % \
2155                (key, fid))
2156            return rv
2157        else:
2158            if state:
2159                self.log.info("vis call completed for %s %s (not ready)" % \
2160                    (key, fid))
2161                raise service_error(service_error.partial, 
2162                        "Not ready: %s" % state)
2163            else:
2164                self.log.info("vis call completed for %s %s (no experiment)" % \
2165                    (key, fid))
2166                raise service_error(service_error.req, "No such experiment")
2167
2168   
2169    def save_federant_information(self, allocated, tbparams, eid, top):
2170        """
2171        Store the various data that have changed in the experiment state
2172        between when it was started and the beginning of resource allocation.
2173        This is basically the information about each local allocation.  This
2174        fills in the values of the placeholder allocation in the state.  It
2175        also collects the access proofs and returns them as dicts for a
2176        response message.
2177        """
2178        self.state_lock.acquire()
2179        exp = self.state[eid]
2180        exp.top = top.clone()
2181        # save federant information
2182        for k in allocated.keys():
2183            exp.add_allocation(tbparams[k])
2184            top.elements.append(topdl.Testbed(uri=tbparams[k].uri,
2185                type="testbed", localname=[k], 
2186                service=[ s.to_topdl() for s in tbparams[k].services]))
2187
2188        # Access proofs for the response message
2189        proofs = [copy.deepcopy(p) for k in tbparams.keys()\
2190                    for p in tbparams[k].proof]
2191        exp.updated()
2192        if self.state_filename: 
2193            self.write_state()
2194        self.state_lock.release()
2195        return proofs
2196
2197    def clear_placeholder(self, eid, expid, tmpdir):
2198        """
2199        Clear the placeholder and remove any allocated temporary dir.
2200        """
2201
2202        self.state_lock.acquire()
2203        del self.state[eid]
2204        del self.state[expid]
2205        if self.state_filename: self.write_state()
2206        self.state_lock.release()
2207        if tmpdir and self.cleanup:
2208            self.remove_dirs(tmpdir)
2209
2210    # end of create_experiment sub-functions
2211
2212    def create_experiment(self, req, fid):
2213        """
2214        The external interface to experiment creation called from the
2215        dispatcher.
2216
2217        Creates a working directory, splits the incoming description using the
2218        splitter script and parses out the various subsections using the
2219        classes above.  Once each sub-experiment is created, use pooled threads
2220        to instantiate them and start it all up.
2221        """
2222
2223        self.log.info("Create experiment call started for %s" % fid)
2224        req = req.get('CreateRequestBody', None)
2225        if req:
2226            key = self.get_experiment_key(req)
2227        else:
2228            raise service_error(service_error.req,
2229                    "Bad request format (no CreateRequestBody)")
2230
2231        # Import information from the requester
2232        # import may partially succeed so always save credentials and warn
2233        if not self.auth.import_credentials(data_list=req.get('credential',[])):
2234            self.log.debug("Failed to import delegation credentials(!)")
2235        self.get_grouper_updates(fid)
2236        self.auth.update()
2237        self.auth.save()
2238
2239        try:
2240            # Make sure that the caller can talk to us
2241            proof = self.check_experiment_access(fid, key)
2242        except service_error, e:
2243            self.log.info("Create experiment call failed for %s: access denied"\
2244                    % fid)
2245            raise e
2246
2247
2248        # Install the testbed map entries supplied with the request into a copy
2249        # of the testbed map.
2250        tbmap = dict(self.tbmap)
2251        for m in req.get('testbedmap', []):
2252            if 'testbed' in m and 'uri' in m:
2253                tbmap[m['testbed']] = m['uri']
2254
2255        # a place to work
2256        try:
2257            tmpdir = tempfile.mkdtemp(prefix="split-")
2258            os.mkdir(tmpdir+"/keys")
2259        except EnvironmentError:
2260            raise service_error(service_error.internal, "Cannot create tmp dir")
2261
2262        tbparams = { }
2263
2264        eid, expid, expcert_file = \
2265                self.get_experiment_ids_and_start(key, tmpdir)
2266
2267        # This catches exceptions to clear the placeholder if necessary
2268        try: 
2269            if not (eid and expid):
2270                raise service_error(service_error.internal, 
2271                        "Cannot find local experiment info!?")
2272
2273            top = self.get_topology(req, tmpdir)
2274            self.confirm_software(top)
2275            # Assign the IPs
2276            hosts, ip_allocator = self.allocate_ips_to_topo(top)
2277            # Find the testbeds to look up
2278            tb_hosts = { }
2279            testbeds = [ ]
2280            for e in top.elements:
2281                if isinstance(e, topdl.Computer):
2282                    tb = e.get_attribute('testbed') or 'default'
2283                    if tb in tb_hosts: tb_hosts[tb].append(e.name)
2284                    else: 
2285                        tb_hosts[tb] = [ e.name ]
2286                        testbeds.append(tb)
2287
2288            masters, pmasters = self.get_testbed_services(req, testbeds)
2289            allocated = { }         # Testbeds we can access
2290            topo ={ }               # Sub topologies
2291            connInfo = { }          # Connection information
2292
2293            self.get_access_to_testbeds(testbeds, fid, allocated, 
2294                    tbparams, masters, tbmap, expid, expcert_file)
2295
2296            # tbactive is the set of testbeds that have NATs in front of their
2297            # portals. They need to initiate connections.
2298            tbactive = set([k for k, v in tbparams.items() \
2299                    if v.get_attribute('nat_portals')])
2300
2301            self.split_topology(top, topo, testbeds)
2302
2303            attrs = self.generate_keys_and_hosts(tmpdir, expid, hosts, tbparams)
2304            # XXX: PNNL debug
2305            self.log.debug("Back from generate keys")
2306
2307            part = experiment_partition(self.auth, self.store_url, tbmap,
2308                    self.muxmax, self.direct_transit, tbactive)
2309            part.add_portals(top, topo, eid, pmasters, tbparams, ip_allocator,
2310                    connInfo, expid)
2311            # XXX: PNNL debug
2312            self.log.debug("Back from add portals")
2313
2314            auth_attrs = set()
2315            # Now get access to the dynamic testbeds (those added above)
2316            for tb in [ t for t in topo if t not in allocated]:
2317                # XXX: PNNL debug
2318                self.log.debug("dynamic testbeds %s" %tb)
2319                self.get_access(tb, tbparams, fid, masters, tbmap, 
2320                        expid, expcert_file)
2321                allocated[tb] = 1
2322                store_keys = topo[tb].get_attribute('store_keys')
2323                # Give the testbed access to keys it exports or imports
2324                if store_keys:
2325                    auth_attrs.update(set([
2326                        (tbparams[tb].allocID, sk) \
2327                                for sk in store_keys.split(" ")]))
2328
2329            # XXX: PNNL debug
2330            self.log.debug("done with dynamic testbeds %s" % auth_attrs)
2331            if auth_attrs:
2332                self.append_experiment_authorization(expid, auth_attrs)
2333
2334            # transit and disconnected testbeds may not have a connInfo entry.
2335            # Fill in the blanks.
2336            for t in allocated.keys():
2337                if not connInfo.has_key(t):
2338                    connInfo[t] = { }
2339
2340            self.wrangle_software(expid, top, topo, tbparams)
2341
2342            proofs = self.save_federant_information(allocated, tbparams, 
2343                    eid, top)
2344        except service_error, e:
2345            # If something goes wrong in the parse (usually an access error)
2346            # clear the placeholder state.  From here on out the code delays
2347            # exceptions.  Failing at this point returns a fault to the remote
2348            # caller.
2349
2350            self.log.info("Create experiment call failed for %s %s: %s" % 
2351                    (eid, fid, e))
2352            self.clear_placeholder(eid, expid, tmpdir)
2353            raise e
2354
2355        # Start the background swapper and return the starting state.  From
2356        # here on out, the state will stick around a while.
2357
2358        # Create a logger that logs to the experiment's state object as well as
2359        # to the main log file.
2360        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
2361        alloc_collector = self.list_log(self.state[eid].log)
2362        h = logging.StreamHandler(alloc_collector)
2363        # XXX: there should be a global one of these rather than repeating the
2364        # code.
2365        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2366                    '%d %b %y %H:%M:%S'))
2367        alloc_log.addHandler(h)
2368
2369        # Start a thread to do the resource allocation
2370        t  = Thread(target=self.allocate_resources,
2371                args=(allocated, masters, eid, expid, tbparams, 
2372                    top, topo, tmpdir, alloc_log, alloc_collector, attrs,
2373                    connInfo, tbmap, expcert_file),
2374                name=eid)
2375        t.start()
2376
2377        rv = {
2378                'experimentID': [
2379                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2380                ],
2381                'experimentStatus': 'starting',
2382                'proof': [ proof.to_dict() ] + proofs,
2383            }
2384        self.log.info("Create experiment call succeeded for %s %s" % \
2385                (eid, fid))
2386
2387        return rv
2388   
2389    def get_experiment_fedid(self, key):
2390        """
2391        find the fedid associated with the localname key in the state database.
2392        """
2393
2394        rv = None
2395        self.state_lock.acquire()
2396        if key in self.state:
2397            rv = self.state[key].fedid
2398        self.state_lock.release()
2399        return rv
2400
2401    def check_experiment_access(self, fid, key):
2402        """
2403        Confirm that the fid has access to the experiment.  Though a request
2404        may be made in terms of a local name, the access attribute is always
2405        the experiment's fedid.
2406        """
2407        if not isinstance(key, fedid):
2408            key = self.get_experiment_fedid(key)
2409
2410        access_ok, proof = self.auth.check_attribute(fid, key, with_proof=True)
2411
2412        if access_ok:
2413            return proof
2414        else:
2415            raise service_error(service_error.access, "Access Denied",
2416                proof)
2417
2418
2419    def get_handler(self, path, fid):
2420        """
2421        Perhaps surprisingly named, this function handles HTTP GET requests to
2422        this server (SOAP requests are POSTs).
2423        """
2424        self.log.info("Get handler %s %s" % (path, fid))
2425        if len("%s" % fid) == 0:
2426            return (None, None)
2427        # XXX: log proofs?
2428        if self.auth.check_attribute(fid, path):
2429            return ("%s/%s" % (self.repodir, path), "application/binary")
2430        else:
2431            return (None, None)
2432
2433    def update_info(self, key, force=False):
2434        top = None
2435        self.state_lock.acquire()
2436        if key in self.state:
2437            if force or self.state[key].older_than(self.info_cache_limit):
2438                top = self.state[key].top
2439                if top is not None: top = top.clone()
2440                d1, info_params, cert, d2 = \
2441                        self.get_segment_info(self.state[key], need_lock=False)
2442        self.state_lock.release()
2443
2444        if top is None: return
2445
2446        try:
2447            tmpdir = tempfile.mkdtemp(prefix="info-")
2448        except EnvironmentError:
2449            raise service_error(service_error.internal, 
2450                    "Cannot create tmp dir")
2451        cert_file = self.make_temp_certfile(cert, tmpdir)
2452
2453        data = []
2454        try:
2455            for k, (uri, aid) in info_params.items():
2456                info=self.info_segment(log=self.log, testbed=uri,
2457                            cert_file=cert_file, cert_pwd=None,
2458                            trusted_certs=self.trusted_certs,
2459                            caller=self.call_InfoSegment)
2460                info(uri, aid)
2461                data.append(info)
2462        # Clean up the tmpdir no matter what
2463        finally:
2464            if tmpdir: self.remove_dirs(tmpdir)
2465
2466        self.annotate_topology(top, data)
2467        self.state_lock.acquire()
2468        if key in self.state:
2469            self.state[key].top = top
2470            self.state[key].updated()
2471            if self.state_filename: self.write_state()
2472        self.state_lock.release()
2473
2474   
2475    def get_info(self, req, fid):
2476        """
2477        Return all the stored info about this experiment
2478        """
2479        rv = None
2480
2481        self.log.info("Info call started for %s" %  fid)
2482        req = req.get('InfoRequestBody', None)
2483        if not req:
2484            raise service_error(service_error.req,
2485                    "Bad request format (no InfoRequestBody)")
2486        exp = req.get('experiment', None)
2487        legacy = req.get('legacy', False)
2488        fresh = req.get('fresh', False)
2489        if exp:
2490            if exp.has_key('fedid'):
2491                key = exp['fedid']
2492                keytype = "fedid"
2493            elif exp.has_key('localname'):
2494                key = exp['localname']
2495                keytype = "localname"
2496            else:
2497                raise service_error(service_error.req, "Unknown lookup type")
2498        else:
2499            raise service_error(service_error.req, "No request?")
2500
2501        try:
2502            proof = self.check_experiment_access(fid, key)
2503        except service_error, e:
2504            self.log.info("Info call failed for %s: access denied" %  fid)
2505
2506
2507        self.update_info(key, fresh)
2508
2509        self.state_lock.acquire()
2510        if self.state.has_key(key):
2511            rv = self.state[key].get_info()
2512            # Copy the topo if we need legacy annotations
2513            if legacy:
2514                top = self.state[key].top
2515                if top is not None: top = top.clone()
2516        self.state_lock.release()
2517        self.log.info("Gathered Info for %s %s" % (key, fid))
2518
2519        # If the legacy visualization and topology representations are
2520        # requested, calculate them and add them to the return.
2521        if legacy and rv is not None:
2522            self.log.info("Generating legacy Info for %s %s" % (key, fid))
2523            if top is not None:
2524                vtopo = topdl.topology_to_vtopo(top)
2525                if vtopo is not None:
2526                    rv['vtopo'] = vtopo
2527                    try:
2528                        vis = self.genviz(vtopo)
2529                    except service_error, e:
2530                        self.log.debug('Problem generating visualization: %s' \
2531                                % e)
2532                        vis = None
2533                    if vis is not None:
2534                        rv['vis'] = vis
2535        if rv:
2536            self.log.info("Info succeded for %s %s" % (key, fid))
2537            rv['proof'] = proof.to_dict()
2538            return rv
2539        else: 
2540            self.log.info("Info failed for %s %s: no experiment" % (key, fid))
2541            raise service_error(service_error.req, "No such experiment")
2542
2543    def operate_on_segments(self, op_params, cert, op, testbeds, params, 
2544            results):
2545        """
2546        Call OperateSegment on multiple testbeds and gather the results.
2547        op_params contains the parameters needed to contact that testbed, cert
2548        is a certificate containing the fedid to use, op is the operation,
2549        testbeds is a dict mapping testbed name to targets in that testbed,
2550        params are the parameters to include a,d results is a growing list of
2551        the results of the calls.
2552        """
2553        try:
2554            tmpdir = tempfile.mkdtemp(prefix="info-")
2555        except EnvironmentError:
2556            raise service_error(service_error.internal, 
2557                    "Cannot create tmp dir")
2558        cert_file = self.make_temp_certfile(cert, tmpdir)
2559
2560        try:
2561            for tb, targets in testbeds.items():
2562                if tb in op_params:
2563                    uri, aid = op_params[tb]
2564                    operate=self.operation_segment(log=self.log, testbed=uri,
2565                                cert_file=cert_file, cert_pwd=None,
2566                                trusted_certs=self.trusted_certs,
2567                                caller=self.call_OperationSegment)
2568                    if operate(uri, aid, op, targets, params):
2569                        if operate.status is not None:
2570                            results.extend(operate.status)
2571                            continue
2572                # Something went wrong in a weird way.  Add statuses
2573                # that reflect that to results
2574                for t in targets:
2575                    results.append(operation_status(t, 
2576                        operation_status.federant,
2577                        'Unexpected error on %s' % tb))
2578        # Clean up the tmpdir no matter what
2579        finally:
2580            if tmpdir: self.remove_dirs(tmpdir)
2581
2582    def do_operation(self, req, fid):
2583        """
2584        Find the testbeds holding each target and ask them to carry out the
2585        operation.  Return the statuses.
2586        """
2587        # Map an element to the testbed containing it
2588        def element_to_tb(e):
2589            if isinstance(e, topdl.Computer): return e.get_attribute("testbed")
2590            elif isinstance(e, topdl.Testbed): return e.name
2591            else: return None
2592        # If d is an operation_status object, make it a dict
2593        def make_dict(d):
2594            if isinstance(d, dict): return d
2595            elif isinstance(d, operation_status): return d.to_dict()
2596            else: return { }
2597
2598        def element_name(e):
2599            if isinstance(e, topdl.Computer): return e.name
2600            elif isinstance(e, topdl.Testbed): 
2601                if e.localname: return e.localname[0]
2602                else: return None
2603            else: return None
2604
2605        self.log.info("Operation call started for %s" %  fid)
2606        req = req.get('OperationRequestBody', None)
2607        if not req:
2608            raise service_error(service_error.req,
2609                    "Bad request format (no OperationRequestBody)")
2610        exp = req.get('experiment', None)
2611        op = req.get('operation', None)
2612        targets = set(req.get('target', []))
2613        params = req.get('parameter', None)
2614
2615        if exp:
2616            if 'fedid' in exp:
2617                key = exp['fedid']
2618                keytype = "fedid"
2619            elif 'localname' in exp:
2620                key = exp['localname']
2621                keytype = "localname"
2622            else:
2623                raise service_error(service_error.req, "Unknown lookup type")
2624        else:
2625            raise service_error(service_error.req, "No request?")
2626
2627        if op is None or not targets:
2628            raise service_error(service_error.req, "No request?")
2629
2630        try:
2631            proof = self.check_experiment_access(fid, key)
2632        except service_error, e:
2633            self.log.info("Operation call failed for %s: access denied" %  fid)
2634            raise e
2635
2636        self.state_lock.acquire()
2637        if key in self.state:
2638            d1, op_params, cert, d2 = \
2639                    self.get_segment_info(self.state[key], need_lock=False,
2640                            key='tb')
2641            top = self.state[key].top
2642            if top is not None:
2643                top = top.clone()
2644        self.state_lock.release()
2645
2646        if top is None:
2647            self.log.info("Operation call failed for %s: not active" %  fid)
2648            raise service_error(service_error.partial, "No topology yet", 
2649                    proof=proof)
2650
2651        testbeds = { }
2652        results = []
2653        for e in top.elements:
2654            ename = element_name(e)
2655            if ename in targets:
2656                tb = element_to_tb(e)
2657                targets.remove(ename)
2658                if tb is not None:
2659                    if tb in testbeds: testbeds[tb].append(ename)
2660                    else: testbeds[tb] = [ ename ]
2661                else:
2662                    results.append(operation_status(e.name, 
2663                        code=operation_status.no_target, 
2664                        description='Cannot map target to testbed'))
2665
2666        for t in targets:
2667            results.append(operation_status(t, operation_status.no_target))
2668
2669        self.operate_on_segments(op_params, cert, op, testbeds, params,
2670                results)
2671
2672        self.log.info("Operation call succeeded for %s" %  fid)
2673        return { 
2674                'experiment': exp, 
2675                'status': [make_dict(r) for r in results],
2676                'proof': proof.to_dict()
2677                }
2678
2679
2680    def get_multi_info(self, req, fid):
2681        """
2682        Return all the stored info that this fedid can access
2683        """
2684        rv = { 'info': [ ], 'proof': [ ] }
2685
2686        self.log.info("Multi Info call started for %s" %  fid)
2687        self.get_grouper_updates(fid)
2688        self.auth.update()
2689        self.auth.save()
2690        self.state_lock.acquire()
2691        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2692            try:
2693                proof = self.check_experiment_access(fid, key)
2694            except service_error, e:
2695                if e.code == service_error.access:
2696                    continue
2697                else:
2698                    self.log.info("Multi Info call failed for %s: %s" %  \
2699                            (e,fid))
2700                    self.state_lock.release()
2701                    raise e
2702
2703            if self.state.has_key(key):
2704                e = self.state[key].get_info()
2705                e['proof'] = proof.to_dict()
2706                rv['info'].append(e)
2707                rv['proof'].append(proof.to_dict())
2708        self.state_lock.release()
2709        self.log.info("Multi Info call succeeded for %s" %  fid)
2710        return rv
2711
2712    def check_termination_status(self, fed_exp, force):
2713        """
2714        Confirm that the experiment is sin a valid state to stop (or force it)
2715        return the state - invalid states for deletion and force settings cause
2716        exceptions.
2717        """
2718        self.state_lock.acquire()
2719        status = fed_exp.status
2720
2721        if status:
2722            if status in ('starting', 'terminating'):
2723                if not force:
2724                    self.state_lock.release()
2725                    raise service_error(service_error.partial, 
2726                            'Experiment still being created or destroyed')
2727                else:
2728                    self.log.warning('Experiment in %s state ' % status + \
2729                            'being terminated by force.')
2730            self.state_lock.release()
2731            return status
2732        else:
2733            # No status??? trouble
2734            self.state_lock.release()
2735            raise service_error(service_error.internal,
2736                    "Experiment has no status!?")
2737
2738    def get_segment_info(self, fed_exp, need_lock=True, key='aid'):
2739        ids = []
2740        term_params = { }
2741        if need_lock: self.state_lock.acquire()
2742        ids = [ x for x in (fed_exp.localname, fed_exp.fedid) if x is not None ]
2743        expcert = fed_exp.identity
2744        repo = "%s" % fed_exp.fedid
2745
2746        # Collect the allocation/segment ids into a dict keyed by the fedid
2747        # of the allocation that contains a tuple of uri, aid
2748        for i, fed in enumerate(fed_exp.get_all_allocations()):
2749            uri = fed.uri
2750            aid = fed.allocID
2751            if key == 'aid': term_params[aid] = (uri, aid)
2752            elif key == 'tb': term_params[fed.tb] = (uri, aid)
2753
2754        if need_lock: self.state_lock.release()
2755        return ids, term_params, expcert, repo
2756
2757
2758    def get_termination_info(self, fed_exp):
2759        self.state_lock.acquire()
2760        ids, term_params, expcert, repo = self.get_segment_info(fed_exp, False)
2761        # Change the experiment state
2762        fed_exp.status = 'terminating'
2763        fed_exp.updated()
2764        if self.state_filename: self.write_state()
2765        self.state_lock.release()
2766
2767        return ids, term_params, expcert, repo
2768
2769
2770    def deallocate_resources(self, term_params, expcert, status, force, 
2771            dealloc_log):
2772        tmpdir = None
2773        # This try block makes sure the tempdir is cleared
2774        try:
2775            # If no expcert, try the deallocation as the experiment
2776            # controller instance.
2777            if expcert and self.auth_type != 'legacy': 
2778                try:
2779                    tmpdir = tempfile.mkdtemp(prefix="term-")
2780                except EnvironmentError:
2781                    raise service_error(service_error.internal, 
2782                            "Cannot create tmp dir")
2783                cert_file = self.make_temp_certfile(expcert, tmpdir)
2784                pw = None
2785            else: 
2786                cert_file = self.cert_file
2787                pw = self.cert_pwd
2788
2789            # Stop everyone.  NB, wait_for_all waits until a thread starts
2790            # and then completes, so we can't wait if nothing starts.  So,
2791            # no tbparams, no start.
2792            if len(term_params) > 0:
2793                tp = thread_pool(self.nthreads)
2794                for k, (uri, aid) in term_params.items():
2795                    # Create and start a thread to stop the segment
2796                    tp.wait_for_slot()
2797                    t  = pooled_thread(\
2798                            target=self.terminate_segment(log=dealloc_log,
2799                                testbed=uri,
2800                                cert_file=cert_file, 
2801                                cert_pwd=pw,
2802                                trusted_certs=self.trusted_certs,
2803                                caller=self.call_TerminateSegment),
2804                            args=(uri, aid), name=k,
2805                            pdata=tp, trace_file=self.trace_file)
2806                    t.start()
2807                # Wait for completions
2808                tp.wait_for_all_done()
2809
2810            # release the allocations (failed experiments have done this
2811            # already, and starting experiments may be in odd states, so we
2812            # ignore errors releasing those allocations
2813            try: 
2814                for k, (uri, aid)  in term_params.items():
2815                    self.release_access(None, aid, uri=uri,
2816                            cert_file=cert_file, cert_pwd=pw)
2817            except service_error, e:
2818                if status != 'failed' and not force:
2819                    raise e
2820
2821        # Clean up the tmpdir no matter what
2822        finally:
2823            if tmpdir: self.remove_dirs(tmpdir)
2824
2825    def terminate_experiment(self, req, fid):
2826        """
2827        Swap this experiment out on the federants and delete the shared
2828        information
2829        """
2830        self.log.info("Terminate experiment call started for %s" % fid)
2831        tbparams = { }
2832        req = req.get('TerminateRequestBody', None)
2833        if not req:
2834            raise service_error(service_error.req,
2835                    "Bad request format (no TerminateRequestBody)")
2836
2837        key = self.get_experiment_key(req, 'experiment')
2838        try:
2839            proof = self.check_experiment_access(fid, key)
2840        except service_error, e:
2841            self.log.info(
2842                    "Terminate experiment call failed for %s: access denied" \
2843                            % fid)
2844            raise e
2845        exp = req.get('experiment', False)
2846        force = req.get('force', False)
2847
2848        dealloc_list = [ ]
2849
2850
2851        # Create a logger that logs to the dealloc_list as well as to the main
2852        # log file.
2853        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2854        dealloc_log.info("Terminating %s " %key)
2855        h = logging.StreamHandler(self.list_log(dealloc_list))
2856        # XXX: there should be a global one of these rather than repeating the
2857        # code.
2858        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2859                    '%d %b %y %H:%M:%S'))
2860        dealloc_log.addHandler(h)
2861
2862        self.state_lock.acquire()
2863        fed_exp = self.state.get(key, None)
2864        self.state_lock.release()
2865        repo = None
2866
2867        if fed_exp:
2868            status = self.check_termination_status(fed_exp, force)
2869            # get_termination_info updates the experiment state
2870            ids, term_params, expcert, repo = self.get_termination_info(fed_exp)
2871            self.deallocate_resources(term_params, expcert, status, force, 
2872                    dealloc_log)
2873
2874            # Remove the terminated experiment
2875            self.state_lock.acquire()
2876            for id in ids:
2877                self.clear_experiment_authorization(id, need_state_lock=False)
2878                if id in self.state: del self.state[id]
2879
2880            if self.state_filename: self.write_state()
2881            self.state_lock.release()
2882
2883            # Delete any synch points associated with this experiment.  All
2884            # synch points begin with the fedid of the experiment.
2885            fedid_keys = set(["fedid:%s" % f for f in ids \
2886                    if isinstance(f, fedid)])
2887            for k in self.synch_store.all_keys():
2888                try:
2889                    if len(k) > 45 and k[0:46] in fedid_keys:
2890                        self.synch_store.del_value(k)
2891                except synch_store.BadDeletionError:
2892                    pass
2893            self.write_store()
2894
2895            # Remove software and other cached stuff from the filesystem.
2896            if repo:
2897                self.remove_dirs("%s/%s" % (self.repodir, repo))
2898       
2899            self.log.info("Terminate experiment succeeded for %s %s" % \
2900                    (key, fid))
2901            return { 
2902                    'experiment': exp , 
2903                    'deallocationLog': string.join(dealloc_list, ''),
2904                    'proof': [proof.to_dict()],
2905                    }
2906        else:
2907            self.log.info("Terminate experiment failed for %s %s: no state" % \
2908                    (key, fid))
2909            raise service_error(service_error.req, "No saved state")
2910
2911
2912    def GetValue(self, req, fid):
2913        """
2914        Get a value from the synchronized store
2915        """
2916        req = req.get('GetValueRequestBody', None)
2917        if not req:
2918            raise service_error(service_error.req,
2919                    "Bad request format (no GetValueRequestBody)")
2920       
2921        name = req.get('name', None)
2922        wait = req.get('wait', False)
2923        rv = { 'name': name }
2924
2925        if not name:
2926            raise service_error(service_error.req, "No name?")
2927
2928        access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True)
2929
2930        if access_ok:
2931            self.log.debug("[GetValue] asking for %s " % name)
2932            try:
2933                v = self.synch_store.get_value(name, wait)
2934            except synch_store.RevokedKeyError:
2935                # No more synch on this key
2936                raise service_error(service_error.federant, 
2937                        "Synch key %s revoked" % name)
2938            if v is not None:
2939                rv['value'] = v
2940            rv['proof'] = proof.to_dict()
2941            self.log.debug("[GetValue] got %s from %s" % (v, name))
2942            return rv
2943        else:
2944            raise service_error(service_error.access, "Access Denied",
2945                    proof=proof)
2946       
2947
2948    def SetValue(self, req, fid):
2949        """
2950        Set a value in the synchronized store
2951        """
2952        req = req.get('SetValueRequestBody', None)
2953        if not req:
2954            raise service_error(service_error.req,
2955                    "Bad request format (no SetValueRequestBody)")
2956       
2957        name = req.get('name', None)
2958        v = req.get('value', '')
2959
2960        if not name:
2961            raise service_error(service_error.req, "No name?")
2962
2963        access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True)
2964
2965        if access_ok:
2966            try:
2967                self.synch_store.set_value(name, v)
2968                self.write_store()
2969                self.log.debug("[SetValue] set %s to %s" % (name, v))
2970            except synch_store.CollisionError:
2971                # Translate into a service_error
2972                raise service_error(service_error.req,
2973                        "Value already set: %s" %name)
2974            except synch_store.RevokedKeyError:
2975                # No more synch on this key
2976                raise service_error(service_error.federant, 
2977                        "Synch key %s revoked" % name)
2978                return { 'name': name, 'value': v, 'proof': proof.to_dict() }
2979        else:
2980            raise service_error(service_error.access, "Access Denied",
2981                    proof=proof)
Note: See TracBrowser for help on using the repository browser.