source: fedd/federation/experiment_control.py @ 7d3000e

axis_examplecompt_changesinfo-ops
Last change on this file since 7d3000e was 7de1537, checked in by Ted Faber <faber@…>, 14 years ago

Remove the ssh_key parameters to the controller. They do nothing.

  • Property mode set to 100644
File size: 83.8 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import Lock, Thread, Condition
21from subprocess import call, Popen, PIPE
22from string import join
23
24from urlparse import urlparse
25from urllib2 import urlopen
26
27from util import *
28from fedid import fedid, generate_fedid
29from remote_service import xmlrpc_handler, soap_handler, service_caller
30from service_error import service_error
31from synch_store import synch_store
32from experiment_partition import experiment_partition
33from experiment_control_legacy import experiment_control_legacy
34from authorizer import abac_authorizer
35from thread_pool import thread_pool, pooled_thread
36
37import topdl
38import list_log
39from ip_allocator import ip_allocator
40from ip_addr import ip_addr
41
42
43class nullHandler(logging.Handler):
44    def emit(self, record): pass
45
46fl = logging.getLogger("fedd.experiment_control")
47fl.addHandler(nullHandler())
48
49
50# Right now, no support for composition.
51class federated_service:
52    def __init__(self, name, exporter=None, importers=None, params=None, 
53            reqs=None, portal=None):
54        self.name=name
55        self.exporter=exporter
56        if importers is None: self.importers = []
57        else: self.importers=importers
58        if params is None: self.params = { }
59        else: self.params = params
60        if reqs is None: self.reqs = []
61        else: self.reqs = reqs
62
63        if portal is not None:
64            self.portal = portal
65        else:
66            self.portal = (name in federated_service.needs_portal)
67
68    def __str__(self):
69        return "name %s export %s import %s params %s reqs %s" % \
70                (self.name, self.exporter, self.importers, self.params,
71                        [ (r['name'], r['visibility']) for r in self.reqs] )
72
73    needs_portal = ('SMB', 'seer', 'tmcd', 'project_export', 'seer_master')
74
75class experiment_control_local(experiment_control_legacy):
76    """
77    Control of experiments that this system can directly access.
78
79    Includes experiment creation, termination and information dissemination.
80    Thred safe.
81    """
82
83    class ssh_cmd_timeout(RuntimeError): pass
84   
85    call_RequestAccess = service_caller('RequestAccess')
86    call_ReleaseAccess = service_caller('ReleaseAccess')
87    call_StartSegment = service_caller('StartSegment')
88    call_TerminateSegment = service_caller('TerminateSegment')
89    call_Ns2Topdl = service_caller('Ns2Topdl')
90
91    def __init__(self, config=None, auth=None):
92        """
93        Intialize the various attributes, most from the config object
94        """
95
96        def parse_tarfile_list(tf):
97            """
98            Parse a tarfile list from the configuration.  This is a set of
99            paths and tarfiles separated by spaces.
100            """
101            rv = [ ]
102            if tf is not None:
103                tl = tf.split()
104                while len(tl) > 1:
105                    p, t = tl[0:2]
106                    del tl[0:2]
107                    rv.append((p, t))
108            return rv
109
110        self.list_log = list_log.list_log
111
112        self.cert_file = config.get("experiment_control", "cert_file")
113        if self.cert_file:
114            self.cert_pwd = config.get("experiment_control", "cert_pwd")
115        else:
116            self.cert_file = config.get("globals", "cert_file")
117            self.cert_pwd = config.get("globals", "cert_pwd")
118
119        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
120                or config.get("globals", "trusted_certs")
121
122        self.repodir = config.get("experiment_control", "repodir")
123        self.repo_url = config.get("experiment_control", "repo_url", 
124                "https://users.isi.deterlab.net:23235");
125
126        self.exp_stem = "fed-stem"
127        self.log = logging.getLogger("fedd.experiment_control")
128        set_log_level(config, "experiment_control", self.log)
129        self.muxmax = 2
130        self.nthreads = 10
131        self.randomize_experiments = False
132
133        self.splitter = None
134        self.ssh_keygen = "/usr/bin/ssh-keygen"
135        self.ssh_identity_file = None
136
137
138        self.debug = config.getboolean("experiment_control", "create_debug")
139        self.cleanup = not config.getboolean("experiment_control", 
140                "leave_tmpfiles")
141        self.state_filename = config.get("experiment_control", 
142                "experiment_state")
143        self.store_filename = config.get("experiment_control", 
144                "synch_store")
145        self.store_url = config.get("experiment_control", "store_url")
146        self.splitter_url = config.get("experiment_control", "ns2topdl_uri")
147        self.fedkit = parse_tarfile_list(\
148                config.get("experiment_control", "fedkit"))
149        self.gatewaykit = parse_tarfile_list(\
150                config.get("experiment_control", "gatewaykit"))
151        accessdb_file = config.get("experiment_control", "accessdb")
152
153        dt = config.get("experiment_control", "direct_transit")
154        self.auth_type = config.get('experiment_control', 'auth_type') \
155                or 'legacy'
156        self.auth_dir = config.get('experiment_control', 'auth_dir')
157        if dt: self.direct_transit = [ tb.strip() for tb in dt.split(",")]
158        else: self.direct_transit = [ ]
159        # NB for internal master/slave ops, not experiment setup
160        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
161
162        self.overrides = set([])
163        ovr = config.get('experiment_control', 'overrides')
164        if ovr:
165            for o in ovr.split(","):
166                o = o.strip()
167                if o.startswith('fedid:'): o = o[len('fedid:'):]
168                self.overrides.add(fedid(hexstr=o))
169
170        self.state = { }
171        self.state_lock = Lock()
172        self.tclsh = "/usr/local/bin/otclsh"
173        self.tcl_splitter = config.get("ns2topdl", "tcl_splitter") or \
174                config.get("experiment_control", "tcl_splitter",
175                        "/usr/testbed/lib/ns2ir/parse.tcl")
176        mapdb_file = config.get("experiment_control", "mapdb")
177        self.trace_file = sys.stderr
178
179        self.def_expstart = \
180                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
181                "/tmp/federate";
182        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
183                "FEDDIR/hosts";
184        self.def_gwstart = \
185                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
186                "/tmp/bridge.log";
187        self.def_mgwstart = \
188                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
189                "/tmp/bridge.log";
190        self.def_gwimage = "FBSD61-TUNNEL2";
191        self.def_gwtype = "pc";
192        self.local_access = { }
193
194        if self.auth_type == 'legacy':
195            if auth:
196                self.auth = auth
197            else:
198                self.log.error( "[access]: No authorizer initialized, " +\
199                        "creating local one.")
200                auth = authorizer()
201            self.get_access = self.legacy_get_access
202        elif self.auth_type == 'abac':
203            self.auth = abac_authorizer(load=self.auth_dir)
204        else:
205            raise service_error(service_error.internal, 
206                    "Unknown auth_type: %s" % self.auth_type)
207
208        if mapdb_file:
209            self.read_mapdb(mapdb_file)
210        else:
211            self.log.warn("[experiment_control] No testbed map, using defaults")
212            self.tbmap = { 
213                    'deter':'https://users.isi.deterlab.net:23235',
214                    'emulab':'https://users.isi.deterlab.net:23236',
215                    'ucb':'https://users.isi.deterlab.net:23237',
216                    }
217
218        if accessdb_file:
219                self.read_accessdb(accessdb_file)
220        else:
221            raise service_error(service_error.internal,
222                    "No accessdb specified in config")
223
224        # Grab saved state.  OK to do this w/o locking because it's read only
225        # and only one thread should be in existence that can see self.state at
226        # this point.
227        if self.state_filename:
228            self.read_state()
229
230        if self.store_filename:
231            self.read_store()
232        else:
233            self.log.warning("No saved synch store")
234            self.synch_store = synch_store
235
236        # Dispatch tables
237        self.soap_services = {\
238                'New': soap_handler('New', self.new_experiment),
239                'Create': soap_handler('Create', self.create_experiment),
240                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
241                'Vis': soap_handler('Vis', self.get_vis),
242                'Info': soap_handler('Info', self.get_info),
243                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
244                'Terminate': soap_handler('Terminate', 
245                    self.terminate_experiment),
246                'GetValue': soap_handler('GetValue', self.GetValue),
247                'SetValue': soap_handler('SetValue', self.SetValue),
248        }
249
250        self.xmlrpc_services = {\
251                'New': xmlrpc_handler('New', self.new_experiment),
252                'Create': xmlrpc_handler('Create', self.create_experiment),
253                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
254                'Vis': xmlrpc_handler('Vis', self.get_vis),
255                'Info': xmlrpc_handler('Info', self.get_info),
256                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
257                'Terminate': xmlrpc_handler('Terminate',
258                    self.terminate_experiment),
259                'GetValue': xmlrpc_handler('GetValue', self.GetValue),
260                'SetValue': xmlrpc_handler('SetValue', self.SetValue),
261        }
262
263    # Call while holding self.state_lock
264    def write_state(self):
265        """
266        Write a new copy of experiment state after copying the existing state
267        to a backup.
268
269        State format is a simple pickling of the state dictionary.
270        """
271        if os.access(self.state_filename, os.W_OK):
272            copy_file(self.state_filename, \
273                    "%s.bak" % self.state_filename)
274        try:
275            f = open(self.state_filename, 'w')
276            pickle.dump(self.state, f)
277        except EnvironmentError, e:
278            self.log.error("Can't write file %s: %s" % \
279                    (self.state_filename, e))
280        except pickle.PicklingError, e:
281            self.log.error("Pickling problem: %s" % e)
282        except TypeError, e:
283            self.log.error("Pickling problem (TypeError): %s" % e)
284
285    @staticmethod
286    def get_alloc_ids(state):
287        """
288        Pull the fedids of the identifiers of each allocation from the
289        state.  Again, a dict dive that's best isolated.
290
291        Used by read_store and read state
292        """
293
294        return [ f['allocID']['fedid'] 
295                for f in state.get('federant',[]) \
296                    if f.has_key('allocID') and \
297                        f['allocID'].has_key('fedid')]
298
299    # Call while holding self.state_lock
300    def read_state(self):
301        """
302        Read a new copy of experiment state.  Old state is overwritten.
303
304        State format is a simple pickling of the state dictionary.
305        """
306       
307        def get_experiment_id(state):
308            """
309            Pull the fedid experimentID out of the saved state.  This is kind
310            of a gross walk through the dict.
311            """
312
313            if state.has_key('experimentID'):
314                for e in state['experimentID']:
315                    if e.has_key('fedid'):
316                        return e['fedid']
317                else:
318                    return None
319            else:
320                return None
321
322        try:
323            f = open(self.state_filename, "r")
324            self.state = pickle.load(f)
325            self.log.debug("[read_state]: Read state from %s" % \
326                    self.state_filename)
327        except EnvironmentError, e:
328            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
329                    % (self.state_filename, e))
330        except pickle.UnpicklingError, e:
331            self.log.warning(("[read_state]: No saved state: " + \
332                    "Unpickling failed: %s") % e)
333       
334        for s in self.state.values():
335            try:
336
337                eid = get_experiment_id(s)
338                if eid : 
339                    if self.auth_type == 'legacy':
340                        # XXX: legacy
341                        # Give the owner rights to the experiment
342                        self.auth.set_attribute(s['owner'], eid)
343                        # And holders of the eid as well
344                        self.auth.set_attribute(eid, eid)
345                        # allow overrides to control experiments as well
346                        for o in self.overrides:
347                            self.auth.set_attribute(o, eid)
348                        # Set permissions to allow reading of the software
349                        # repo, if any, as well.
350                        for a in self.get_alloc_ids(s):
351                            self.auth.set_attribute(a, 'repo/%s' % eid)
352                else:
353                    raise KeyError("No experiment id")
354            except KeyError, e:
355                self.log.warning("[read_state]: State ownership or identity " +\
356                        "misformatted in %s: %s" % (self.state_filename, e))
357
358
359    def read_accessdb(self, accessdb_file):
360        """
361        Read the mapping from fedids that can create experiments to their name
362        in the 3-level access namespace.  All will be asserted from this
363        testbed and can include the local username and porject that will be
364        asserted on their behalf by this fedd.  Each fedid is also added to the
365        authorization system with the "create" attribute.
366        """
367        self.accessdb = {}
368        # These are the regexps for parsing the db
369        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
370        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
371                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
372        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
373                "\s*->\s*(" + name_expr + ")\s*$")
374        lineno = 0
375
376        # Parse the mappings and store in self.authdb, a dict of
377        # fedid -> (proj, user)
378        try:
379            f = open(accessdb_file, "r")
380            for line in f:
381                lineno += 1
382                line = line.strip()
383                if len(line) == 0 or line.startswith('#'):
384                    continue
385                m = project_line.match(line)
386                if m:
387                    fid = fedid(hexstr=m.group(1))
388                    project, user = m.group(2,3)
389                    if not self.accessdb.has_key(fid):
390                        self.accessdb[fid] = []
391                    self.accessdb[fid].append((project, user))
392                    continue
393
394                m = user_line.match(line)
395                if m:
396                    fid = fedid(hexstr=m.group(1))
397                    project = None
398                    user = m.group(2)
399                    if not self.accessdb.has_key(fid):
400                        self.accessdb[fid] = []
401                    self.accessdb[fid].append((project, user))
402                    continue
403                self.log.warn("[experiment_control] Error parsing access " +\
404                        "db %s at line %d" %  (accessdb_file, lineno))
405        except EnvironmentError:
406            raise service_error(service_error.internal,
407                    ("Error opening/reading %s as experiment " +\
408                            "control accessdb") %  accessdb_file)
409        f.close()
410
411        # Initialize the authorization attributes
412        # XXX: legacy
413        if self.auth_type == 'legacy':
414            for fid in self.accessdb.keys():
415                self.auth.set_attribute(fid, 'create')
416                self.auth.set_attribute(fid, 'new')
417
418    def read_mapdb(self, file):
419        """
420        Read a simple colon separated list of mappings for the
421        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
422        """
423
424        self.tbmap = { }
425        lineno =0
426        try:
427            f = open(file, "r")
428            for line in f:
429                lineno += 1
430                line = line.strip()
431                if line.startswith('#') or len(line) == 0:
432                    continue
433                try:
434                    label, url = line.split(':', 1)
435                    self.tbmap[label] = url
436                except ValueError, e:
437                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
438                            "map db: %s %s" % (lineno, line, e))
439        except EnvironmentError, e:
440            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
441                    "open %s: %s" % (file, e))
442        f.close()
443
444    def read_store(self):
445        try:
446            self.synch_store = synch_store()
447            self.synch_store.load(self.store_filename)
448            self.log.debug("[read_store]: Read store from %s" % \
449                    self.store_filename)
450        except EnvironmentError, e:
451            self.log.warning("[read_store]: No saved store: Can't open %s: %s"\
452                    % (self.state_filename, e))
453            self.synch_store = synch_store()
454
455        # Set the initial permissions on data in the store.  XXX: This ad hoc
456        # authorization attribute initialization is getting out of hand.
457        # XXX: legacy
458        if self.auth_type == 'legacy':
459            for k in self.synch_store.all_keys():
460                try:
461                    if k.startswith('fedid:'):
462                        fid = fedid(hexstr=k[6:46])
463                        if self.state.has_key(fid):
464                            for a in self.get_alloc_ids(self.state[fid]):
465                                self.auth.set_attribute(a, k)
466                except ValueError, e:
467                    self.log.warn("Cannot deduce permissions for %s" % k)
468
469
470    def write_store(self):
471        """
472        Write a new copy of synch_store after writing current state
473        to a backup.  We use the internal synch_store pickle method to avoid
474        incinsistent data.
475
476        State format is a simple pickling of the store.
477        """
478        if os.access(self.store_filename, os.W_OK):
479            copy_file(self.store_filename, \
480                    "%s.bak" % self.store_filename)
481        try:
482            self.synch_store.save(self.store_filename)
483        except EnvironmentError, e:
484            self.log.error("Can't write file %s: %s" % \
485                    (self.store_filename, e))
486        except TypeError, e:
487            self.log.error("Pickling problem (TypeError): %s" % e)
488
489
490    def remove_dirs(self, dir):
491        """
492        Remove the directory tree and all files rooted at dir.  Log any errors,
493        but continue.
494        """
495        self.log.debug("[removedirs]: removing %s" % dir)
496        try:
497            for path, dirs, files in os.walk(dir, topdown=False):
498                for f in files:
499                    os.remove(os.path.join(path, f))
500                for d in dirs:
501                    os.rmdir(os.path.join(path, d))
502            os.rmdir(dir)
503        except EnvironmentError, e:
504            self.log.error("Error deleting directory tree in %s" % e);
505
506    @staticmethod
507    def make_temp_certfile(expcert, tmpdir):
508        """
509        make a protected copy of the access certificate so the experiment
510        controller can act as the experiment principal.  mkstemp is the most
511        secure way to do that. The directory should be created by
512        mkdtemp.  Return the filename.
513        """
514        if expcert and tmpdir:
515            try:
516                certf, certfn = tempfile.mkstemp(suffix=".pem", dir=tmpdir)
517                f = os.fdopen(certf, 'w')
518                print >> f, expcert
519                f.close()
520            except EnvironmentError, e:
521                raise service_error(service_error.internal, 
522                        "Cannot create temp cert file?")
523            return certfn
524        else:
525            return None
526
527       
528    def generate_ssh_keys(self, dest, type="rsa" ):
529        """
530        Generate a set of keys for the gateways to use to talk.
531
532        Keys are of type type and are stored in the required dest file.
533        """
534        valid_types = ("rsa", "dsa")
535        t = type.lower();
536        if t not in valid_types: raise ValueError
537        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
538
539        try:
540            trace = open("/dev/null", "w")
541        except EnvironmentError:
542            raise service_error(service_error.internal,
543                    "Cannot open /dev/null??");
544
545        # May raise CalledProcessError
546        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
547        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
548        if rv != 0:
549            raise service_error(service_error.internal, 
550                    "Cannot generate nonce ssh keys.  %s return code %d" \
551                            % (self.ssh_keygen, rv))
552
553    def gentopo(self, str):
554        """
555        Generate the topology data structure from the splitter's XML
556        representation of it.
557
558        The topology XML looks like:
559            <experiment>
560                <nodes>
561                    <node><vname></vname><ips>ip1:ip2</ips></node>
562                </nodes>
563                <lans>
564                    <lan>
565                        <vname></vname><vnode></vnode><ip></ip>
566                        <bandwidth></bandwidth><member>node:port</member>
567                    </lan>
568                </lans>
569        """
570        class topo_parse:
571            """
572            Parse the topology XML and create the dats structure.
573            """
574            def __init__(self):
575                # Typing of the subelements for data conversion
576                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
577                self.int_subelements = ( 'bandwidth',)
578                self.float_subelements = ( 'delay',)
579                # The final data structure
580                self.nodes = [ ]
581                self.lans =  [ ]
582                self.topo = { \
583                        'node': self.nodes,\
584                        'lan' : self.lans,\
585                    }
586                self.element = { }  # Current element being created
587                self.chars = ""     # Last text seen
588
589            def end_element(self, name):
590                # After each sub element the contents is added to the current
591                # element or to the appropriate list.
592                if name == 'node':
593                    self.nodes.append(self.element)
594                    self.element = { }
595                elif name == 'lan':
596                    self.lans.append(self.element)
597                    self.element = { }
598                elif name in self.str_subelements:
599                    self.element[name] = self.chars
600                    self.chars = ""
601                elif name in self.int_subelements:
602                    self.element[name] = int(self.chars)
603                    self.chars = ""
604                elif name in self.float_subelements:
605                    self.element[name] = float(self.chars)
606                    self.chars = ""
607
608            def found_chars(self, data):
609                self.chars += data.rstrip()
610
611
612        tp = topo_parse();
613        parser = xml.parsers.expat.ParserCreate()
614        parser.EndElementHandler = tp.end_element
615        parser.CharacterDataHandler = tp.found_chars
616
617        parser.Parse(str)
618
619        return tp.topo
620       
621
622    def genviz(self, topo):
623        """
624        Generate the visualization the virtual topology
625        """
626
627        neato = "/usr/local/bin/neato"
628        # These are used to parse neato output and to create the visualization
629        # file.
630        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="([\d\.]+),([\d\.]+)"')
631        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
632                "%s</type></node>"
633
634        try:
635            # Node names
636            nodes = [ n['vname'] for n in topo['node'] ]
637            topo_lans = topo['lan']
638        except KeyError, e:
639            raise service_error(service_error.internal, "Bad topology: %s" %e)
640
641        lans = { }
642        links = { }
643
644        # Walk through the virtual topology, organizing the connections into
645        # 2-node connections (links) and more-than-2-node connections (lans).
646        # When a lan is created, it's added to the list of nodes (there's a
647        # node in the visualization for the lan).
648        for l in topo_lans:
649            if links.has_key(l['vname']):
650                if len(links[l['vname']]) < 2:
651                    links[l['vname']].append(l['vnode'])
652                else:
653                    nodes.append(l['vname'])
654                    lans[l['vname']] = links[l['vname']]
655                    del links[l['vname']]
656                    lans[l['vname']].append(l['vnode'])
657            elif lans.has_key(l['vname']):
658                lans[l['vname']].append(l['vnode'])
659            else:
660                links[l['vname']] = [ l['vnode'] ]
661
662
663        # Open up a temporary file for dot to turn into a visualization
664        try:
665            df, dotname = tempfile.mkstemp()
666            dotfile = os.fdopen(df, 'w')
667        except EnvironmentError:
668            raise service_error(service_error.internal,
669                    "Failed to open file in genviz")
670
671        try:
672            dnull = open('/dev/null', 'w')
673        except EnvironmentError:
674            service_error(service_error.internal,
675                    "Failed to open /dev/null in genviz")
676
677        # Generate a dot/neato input file from the links, nodes and lans
678        try:
679            print >>dotfile, "graph G {"
680            for n in nodes:
681                print >>dotfile, '\t"%s"' % n
682            for l in links.keys():
683                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
684            for l in lans.keys():
685                for n in lans[l]:
686                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
687            print >>dotfile, "}"
688            dotfile.close()
689        except TypeError:
690            raise service_error(service_error.internal,
691                    "Single endpoint link in vtopo")
692        except EnvironmentError:
693            raise service_error(service_error.internal, "Cannot write dot file")
694
695        # Use dot to create a visualization
696        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
697                '-Gpack=true', dotname], stdout=PIPE, stderr=dnull,
698                close_fds=True)
699        dnull.close()
700
701        # Translate dot to vis format
702        vis_nodes = [ ]
703        vis = { 'node': vis_nodes }
704        for line in dot.stdout:
705            m = vis_re.match(line)
706            if m:
707                vn = m.group(1)
708                vis_node = {'name': vn, \
709                        'x': float(m.group(2)),\
710                        'y' : float(m.group(3)),\
711                    }
712                if vn in links.keys() or vn in lans.keys():
713                    vis_node['type'] = 'lan'
714                else:
715                    vis_node['type'] = 'node'
716                vis_nodes.append(vis_node)
717        rv = dot.wait()
718
719        os.remove(dotname)
720        if rv == 0 : return vis
721        else: return None
722
723
724    def release_access(self, tb, aid, tbmap=None, uri=None, cert_file=None,
725            cert_pwd=None):
726        """
727        Release access to testbed through fedd
728        """
729
730        if not uri and tbmap:
731            uri = tbmap.get(tb, None)
732        if not uri:
733            raise service_error(service_error.server_config, 
734                    "Unknown testbed: %s" % tb)
735
736        if self.local_access.has_key(uri):
737            resp = self.local_access[uri].ReleaseAccess(\
738                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
739                    fedid(file=cert_file))
740            resp = { 'ReleaseAccessResponseBody': resp } 
741        else:
742            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
743                    cert_file, cert_pwd, self.trusted_certs)
744
745        # better error coding
746
747    def remote_ns2topdl(self, uri, desc):
748
749        req = {
750                'description' : { 'ns2description': desc },
751            }
752
753        r = self.call_Ns2Topdl(uri, req, self.cert_file, self.cert_pwd, 
754                self.trusted_certs)
755
756        if r.has_key('Ns2TopdlResponseBody'):
757            r = r['Ns2TopdlResponseBody']
758            ed = r.get('experimentdescription', None)
759            if ed.has_key('topdldescription'):
760                return topdl.Topology(**ed['topdldescription'])
761            else:
762                raise service_error(service_error.protocol, 
763                        "Bad splitter response (no output)")
764        else:
765            raise service_error(service_error.protocol, "Bad splitter response")
766
767    class start_segment:
768        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
769                cert_pwd=None, trusted_certs=None, caller=None,
770                log_collector=None):
771            self.log = log
772            self.debug = debug
773            self.cert_file = cert_file
774            self.cert_pwd = cert_pwd
775            self.trusted_certs = None
776            self.caller = caller
777            self.testbed = testbed
778            self.log_collector = log_collector
779            self.response = None
780            self.node = { }
781            self.proof = None
782
783        def make_map(self, resp):
784            for e in resp.get('embedding', []):
785                if 'toponame' in e and 'physname' in e:
786                    self.node[e['toponame']] = e['physname'][0]
787
788        def __call__(self, uri, aid, topo, masters, attrs=None, connInfo=None):
789            req = {
790                    'allocID': { 'fedid' : aid }, 
791                    'segmentdescription': { 
792                        'topdldescription': topo.to_dict(),
793                    },
794                }
795
796            if connInfo:
797                req['connection'] = connInfo
798
799            import_svcs = [ s for m in masters.values() \
800                    for s in m if self.testbed in s.importers]
801
802            if import_svcs or self.testbed in masters:
803                req['service'] = []
804
805            for s in import_svcs:
806                for r in s.reqs:
807                    sr = copy.deepcopy(r)
808                    sr['visibility'] = 'import';
809                    req['service'].append(sr)
810
811            for s in masters.get(self.testbed, []):
812                for r in s.reqs:
813                    sr = copy.deepcopy(r)
814                    sr['visibility'] = 'export';
815                    req['service'].append(sr)
816
817            if attrs:
818                req['fedAttr'] = attrs
819
820            try:
821                self.log.debug("Calling StartSegment at %s " % uri)
822                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
823                        self.trusted_certs)
824                if r.has_key('StartSegmentResponseBody'):
825                    lval = r['StartSegmentResponseBody'].get('allocationLog',
826                            None)
827                    if lval and self.log_collector:
828                        for line in  lval.splitlines(True):
829                            self.log_collector.write(line)
830                    self.make_map(r['StartSegmentResponseBody'])
831                    if 'proof' in r: self.proof = r['proof']
832                    self.response = r
833                else:
834                    raise service_error(service_error.internal, 
835                            "Bad response!?: %s" %r)
836                return True
837            except service_error, e:
838                self.log.error("Start segment failed on %s: %s" % \
839                        (self.testbed, e))
840                return False
841
842
843
844    class terminate_segment:
845        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
846                cert_pwd=None, trusted_certs=None, caller=None):
847            self.log = log
848            self.debug = debug
849            self.cert_file = cert_file
850            self.cert_pwd = cert_pwd
851            self.trusted_certs = None
852            self.caller = caller
853            self.testbed = testbed
854
855        def __call__(self, uri, aid ):
856            req = {
857                    'allocID': aid , 
858                }
859            try:
860                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
861                        self.trusted_certs)
862                return True
863            except service_error, e:
864                self.log.error("Terminate segment failed on %s: %s" % \
865                        (self.testbed, e))
866                return False
867
868    def allocate_resources(self, allocated, masters, eid, expid, 
869            tbparams, top, topo, tmpdir, alloc_log=None, log_collector=None, 
870            attrs=None, connInfo={}, tbmap=None, expcert=None):
871
872        started = { }           # Testbeds where a sub-experiment started
873                                # successfully
874
875        # XXX
876        fail_soft = False
877
878        if tbmap is None: tbmap = { }
879
880        log = alloc_log or self.log
881
882        tp = thread_pool(self.nthreads)
883        threads = [ ]
884        starters = [ ]
885
886        if expcert:
887            cert = expcert
888            pw = None
889        else:
890            cert = self.cert_file
891            pw = self.cert_pwd
892
893        for tb in allocated.keys():
894            # Create and start a thread to start the segment, and save it
895            # to get the return value later
896            tb_attrs = copy.copy(attrs)
897            tp.wait_for_slot()
898            uri = tbparams[tb].get('uri', tbmap.get(testbed_base(tb), None))
899            base, suffix = split_testbed(tb)
900            if suffix:
901                tb_attrs.append({'attribute': 'experiment_name', 
902                    'value': "%s-%s" % (eid, suffix)})
903            else:
904                tb_attrs.append({'attribute': 'experiment_name', 'value': eid})
905            if not uri:
906                raise service_error(service_error.internal, 
907                        "Unknown testbed %s !?" % tb)
908
909            if tbparams[tb].has_key('allocID') and \
910                    tbparams[tb]['allocID'].has_key('fedid'):
911                aid = tbparams[tb]['allocID']['fedid']
912            else:
913                raise service_error(service_error.internal, 
914                        "No alloc id for testbed %s !?" % tb)
915
916            s = self.start_segment(log=log, debug=self.debug,
917                    testbed=tb, cert_file=cert,
918                    cert_pwd=pw, trusted_certs=self.trusted_certs,
919                    caller=self.call_StartSegment,
920                    log_collector=log_collector)
921            starters.append(s)
922            t  = pooled_thread(\
923                    target=s, name=tb,
924                    args=(uri, aid, topo[tb], masters, tb_attrs, connInfo[tb]),
925                    pdata=tp, trace_file=self.trace_file)
926            threads.append(t)
927            t.start()
928
929        # Wait until all finish (keep pinging the log, though)
930        mins = 0
931        revoked = False
932        while not tp.wait_for_all_done(60.0):
933            mins += 1
934            alloc_log.info("Waiting for sub threads (it has been %d mins)" \
935                    % mins)
936            if not revoked and \
937                    len([ t.getName() for t in threads if t.rv == False]) > 0:
938                # a testbed has failed.  Revoke this experiment's
939                # synchronizarion values so that sub experiments will not
940                # deadlock waiting for synchronization that will never happen
941                self.log.info("A subexperiment has failed to swap in, " + \
942                        "revoking synch keys")
943                var_key = "fedid:%s" % expid
944                for k in self.synch_store.all_keys():
945                    if len(k) > 45 and k[0:46] == var_key:
946                        self.synch_store.revoke_key(k)
947                revoked = True
948
949        failed = [ t.getName() for t in threads if not t.rv ]
950        succeeded = [tb for tb in allocated.keys() if tb not in failed]
951
952        # If one failed clean up, unless fail_soft is set
953        if failed:
954            if not fail_soft:
955                tp.clear()
956                for tb in succeeded:
957                    # Create and start a thread to stop the segment
958                    tp.wait_for_slot()
959                    uri = tbparams[tb]['uri']
960                    t  = pooled_thread(\
961                            target=self.terminate_segment(log=log,
962                                testbed=tb,
963                                cert_file=cert, 
964                                cert_pwd=pw,
965                                trusted_certs=self.trusted_certs,
966                                caller=self.call_TerminateSegment),
967                            args=(uri, tbparams[tb]['federant']['allocID']),
968                            name=tb,
969                            pdata=tp, trace_file=self.trace_file)
970                    t.start()
971                # Wait until all finish (if any are being stopped)
972                if succeeded:
973                    tp.wait_for_all_done()
974
975                # release the allocations
976                for tb in tbparams.keys():
977                    try:
978                        self.release_access(tb, tbparams[tb]['allocID'], 
979                                tbmap=tbmap, uri=tbparams[tb].get('uri', None),
980                                cert_file=cert, cert_pwd=pw)
981                    except service_error, e:
982                        self.log.warn("Error releasing access: %s" % e.desc)
983                # Remove the placeholder
984                self.state_lock.acquire()
985                self.state[eid]['experimentStatus'] = 'failed'
986                if self.state_filename: self.write_state()
987                self.state_lock.release()
988                # Remove the repo dir
989                self.remove_dirs("%s/%s" %(self.repodir, expid))
990                # Walk up tmpdir, deleting as we go
991                if self.cleanup:
992                    self.remove_dirs(tmpdir)
993                else:
994                    log.debug("[start_experiment]: not removing %s" % tmpdir)
995
996
997                log.error("Swap in failed on %s" % ",".join(failed))
998                return
999        else:
1000            # Walk through the successes and gather the virtual to physical
1001            # mapping.
1002            embedding = [ ]
1003            proofs = { }
1004            for s in starters:
1005                for k, v in s.node.items():
1006                    embedding.append({
1007                        'toponame': k, 
1008                        'physname': [ v],
1009                        'testbed': s.testbed
1010                        })
1011                if s.proof:
1012                    proofs[s.testbed] = s.proof
1013            log.info("[start_segment]: Experiment %s active" % eid)
1014
1015
1016        # Walk up tmpdir, deleting as we go
1017        if self.cleanup:
1018            self.remove_dirs(tmpdir)
1019        else:
1020            log.debug("[start_experiment]: not removing %s" % tmpdir)
1021
1022        # Insert the experiment into our state and update the disk copy.
1023        self.state_lock.acquire()
1024        self.state[expid]['experimentStatus'] = 'active'
1025        self.state[eid] = self.state[expid]
1026        self.state[eid]['experimentdescription']['topdldescription'] = \
1027                top.to_dict()
1028        self.state[eid]['embedding'] = embedding
1029        # Append startup proofs
1030        for f in self.state[eid]['federant']:
1031            if 'name' in f and 'localname' in f['name']:
1032                if f['name']['localname'] in proofs:
1033                    f['proof'].append(proofs[f['name']['localname']])
1034
1035        if self.state_filename: self.write_state()
1036        self.state_lock.release()
1037        return
1038
1039
1040    def add_kit(self, e, kit):
1041        """
1042        Add a Software object created from the list of (install, location)
1043        tuples passed as kit  to the software attribute of an object e.  We
1044        do this enough to break out the code, but it's kind of a hack to
1045        avoid changing the old tuple rep.
1046        """
1047
1048        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1049
1050        if isinstance(e.software, list): e.software.extend(s)
1051        else: e.software = s
1052
1053    def append_experiment_authorization(self, expid, attrs, 
1054            need_state_lock=True):
1055        """
1056        Append the authorization information to system state
1057        """
1058
1059        for p, a in attrs:
1060            self.auth.set_attribute(p, a)
1061        self.auth.save()
1062
1063        if need_state_lock: self.state_lock.acquire()
1064        self.state[expid]['auth'].update(attrs)
1065        if self.state_filename: self.write_state()
1066        if need_state_lock: self.state_lock.release()
1067
1068    def clear_experiment_authorization(self, expid, need_state_lock=True):
1069        """
1070        Attrs is a set of attribute principal pairs that need to be removed
1071        from the authenticator.  Remove them and save the authenticator.
1072        """
1073
1074        if need_state_lock: self.state_lock.acquire()
1075        if expid in self.state and 'auth' in self.state[expid]:
1076            for p, a in self.state[expid]['auth']:
1077                self.auth.unset_attribute(p, a)
1078            self.state[expid]['auth'] = set()
1079        if self.state_filename: self.write_state()
1080        if need_state_lock: self.state_lock.release()
1081        self.auth.save()
1082
1083
1084    def create_experiment_state(self, fid, req, expid, expcert,
1085            state='starting'):
1086        """
1087        Create the initial entry in the experiment's state.  The expid and
1088        expcert are the experiment's fedid and certifacte that represents that
1089        ID, which are installed in the experiment state.  If the request
1090        includes a suggested local name that is used if possible.  If the local
1091        name is already taken by an experiment owned by this user that has
1092        failed, it is overwritten.  Otherwise new letters are added until a
1093        valid localname is found.  The generated local name is returned.
1094        """
1095
1096        if req.has_key('experimentID') and \
1097                req['experimentID'].has_key('localname'):
1098            overwrite = False
1099            eid = req['experimentID']['localname']
1100            # If there's an old failed experiment here with the same local name
1101            # and accessible by this user, we'll overwrite it, otherwise we'll
1102            # fall through and do the collision avoidance.
1103            old_expid = self.get_experiment_fedid(eid)
1104            if old_expid and self.check_experiment_access(fid, old_expid):
1105                self.state_lock.acquire()
1106                status = self.state[eid].get('experimentStatus', None)
1107                if status and status == 'failed':
1108                    # remove the old access attributes
1109                    self.clear_experiment_authorization(eid,
1110                            need_state_lock=False)
1111                    overwrite = True
1112                    del self.state[eid]
1113                    del self.state[old_expid]
1114                self.state_lock.release()
1115            self.state_lock.acquire()
1116            while (self.state.has_key(eid) and not overwrite):
1117                eid += random.choice(string.ascii_letters)
1118            # Initial state
1119            self.state[eid] = {
1120                    'experimentID' : \
1121                            [ { 'localname' : eid }, {'fedid': expid } ],
1122                    'experimentStatus': state,
1123                    'experimentAccess': { 'X509' : expcert },
1124                    'owner': fid,
1125                    'log' : [],
1126                    'auth': set(),
1127                }
1128            self.state[expid] = self.state[eid]
1129            if self.state_filename: self.write_state()
1130            self.state_lock.release()
1131        else:
1132            eid = self.exp_stem
1133            for i in range(0,5):
1134                eid += random.choice(string.ascii_letters)
1135            self.state_lock.acquire()
1136            while (self.state.has_key(eid)):
1137                eid = self.exp_stem
1138                for i in range(0,5):
1139                    eid += random.choice(string.ascii_letters)
1140            # Initial state
1141            self.state[eid] = {
1142                    'experimentID' : \
1143                            [ { 'localname' : eid }, {'fedid': expid } ],
1144                    'experimentStatus': state,
1145                    'experimentAccess': { 'X509' : expcert },
1146                    'owner': fid,
1147                    'log' : [],
1148                    'auth': set(),
1149                }
1150            self.state[expid] = self.state[eid]
1151            if self.state_filename: self.write_state()
1152            self.state_lock.release()
1153
1154        # Let users touch the state.  Authorize this fid and the expid itself
1155        # to touch the experiment, as well as allowing th eoverrides.
1156        self.append_experiment_authorization(eid, 
1157                set([(fid, expid), (expid,expid)] + \
1158                        [ (o, expid) for o in self.overrides]))
1159
1160        return eid
1161
1162
1163    def allocate_ips_to_topo(self, top):
1164        """
1165        Add an ip4_address attribute to all the hosts in the topology, based on
1166        the shared substrates on which they sit.  An /etc/hosts file is also
1167        created and returned as a list of hostfiles entries.  We also return
1168        the allocator, because we may need to allocate IPs to portals
1169        (specifically DRAGON portals).
1170        """
1171        subs = sorted(top.substrates, 
1172                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1173                reverse=True)
1174        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1175        ifs = { }
1176        hosts = [ ]
1177
1178        for idx, s in enumerate(subs):
1179            net_size = len(s.interfaces)+2
1180
1181            a = ips.allocate(net_size)
1182            if a :
1183                base, num = a
1184                if num < net_size: 
1185                    raise service_error(service_error.internal,
1186                            "Allocator returned wrong number of IPs??")
1187            else:
1188                raise service_error(service_error.req, 
1189                        "Cannot allocate IP addresses")
1190            mask = ips.min_alloc
1191            while mask < net_size:
1192                mask *= 2
1193
1194            netmask = ((2**32-1) ^ (mask-1))
1195
1196            base += 1
1197            for i in s.interfaces:
1198                i.attribute.append(
1199                        topdl.Attribute('ip4_address', 
1200                            "%s" % ip_addr(base)))
1201                i.attribute.append(
1202                        topdl.Attribute('ip4_netmask', 
1203                            "%s" % ip_addr(int(netmask))))
1204
1205                hname = i.element.name
1206                if ifs.has_key(hname):
1207                    hosts.append("%s\t%s-%s %s-%d" % \
1208                            (ip_addr(base), hname, s.name, hname,
1209                                ifs[hname]))
1210                else:
1211                    ifs[hname] = 0
1212                    hosts.append("%s\t%s-%s %s-%d %s" % \
1213                            (ip_addr(base), hname, s.name, hname,
1214                                ifs[hname], hname))
1215
1216                ifs[hname] += 1
1217                base += 1
1218        return hosts, ips
1219
1220    def get_access_to_testbeds(self, testbeds, fid, allocated, 
1221            tbparam, masters, tbmap, expid=None, expcert=None):
1222        for tb in testbeds:
1223            self.get_access(tb, tbparam, fid, masters, tbmap, expid,
1224                    expcert)
1225            allocated[tb] = 1
1226
1227    def get_access(self, tb, tbparam,fid, masters, tbmap, expid=None, 
1228            expcert=None):
1229        """
1230        Get access to testbed through fedd and set the parameters for that tb
1231        """
1232        def get_export_project(svcs):
1233            """
1234            Look through for the list of federated_service for this testbed
1235            objects for a project_export service, and extract the project
1236            parameter.
1237            """
1238
1239            pe = [s for s in svcs if s.name=='project_export']
1240            if len(pe) == 1:
1241                return pe[0].params.get('project', None)
1242            elif len(pe) == 0:
1243                return None
1244            else:
1245                raise service_error(service_error.req,
1246                        "More than one project export is not supported")
1247
1248        def add_services(svcs, type, slist):
1249            """
1250            Add the given services to slist.  type is import or export.
1251            """
1252            for i, s in enumerate(svcs):
1253                idx = '%s%d' % (type, i)
1254                sr = {'id': idx, 'name': s.name, 'visibility': type }
1255                if s.params:
1256                    sr['fedAttr'] = [ { 'attribute': k, 'value': v } \
1257                            for k, v in s.params.items()]
1258                slist.append(sr)
1259
1260        uri = tbmap.get(testbed_base(tb), None)
1261        if not uri:
1262            raise service_error(service_error.server_config, 
1263                    "Unknown testbed: %s" % tb)
1264
1265        export_svcs = masters.get(tb,[])
1266        import_svcs = [ s for m in masters.values() \
1267                for s in m \
1268                    if tb in s.importers ]
1269
1270        export_project = get_export_project(export_svcs)
1271        # Compose the credential list so that IDs come before attributes
1272        creds = set()
1273        keys = set()
1274        certs = self.auth.get_creds_for_principal(fid)
1275        if expid:
1276            certs.update(self.auth.get_creds_for_principal(expid))
1277        for c in certs:
1278            keys.add(c.issuer_cert())
1279            creds.add(c.attribute_cert())
1280        creds = list(keys) + list(creds)
1281
1282        if expcert: cert, pw = expcert, None
1283        else: cert, pw = self.cert_file, self.cert_pw
1284
1285        # Request credentials
1286        req = {
1287                'abac_credential': creds,
1288            }
1289        # Make the service request from the services we're importing and
1290        # exporting.  Keep track of the export request ids so we can
1291        # collect the resulting info from the access response.
1292        e_keys = { }
1293        if import_svcs or export_svcs:
1294            slist = []
1295            add_services(import_svcs, 'import', slist)
1296            add_services(export_svcs, 'export', slist)
1297            req['service'] = slist
1298
1299        if self.local_access.has_key(uri):
1300            # Local access call
1301            req = { 'RequestAccessRequestBody' : req }
1302            r = self.local_access[uri].RequestAccess(req, 
1303                    fedid(file=self.cert_file))
1304            r = { 'RequestAccessResponseBody' : r }
1305        else:
1306            r = self.call_RequestAccess(uri, req, cert, pw, self.trusted_certs)
1307
1308        if r.has_key('RequestAccessResponseBody'):
1309            # Through to here we have a valid response, not a fault.
1310            # Access denied is a fault, so something better or worse than
1311            # access denied has happened.
1312            r = r['RequestAccessResponseBody']
1313            self.log.debug("[get_access] Access granted")
1314        else:
1315            raise service_error(service_error.protocol,
1316                        "Bad proxy response")
1317        if 'proof' not in r:
1318            raise service_error(service_error.protocol,
1319                        "Bad access response (no access proof)")
1320       
1321        tbparam[tb] = { 
1322                "allocID" : r['allocID'],
1323                "uri": uri,
1324                "proof": [ r['proof'] ],
1325                }
1326
1327        # Collect the responses corresponding to the services this testbed
1328        # exports.  These will be the service requests that we will include in
1329        # the start segment requests (with appropriate visibility values) to
1330        # import and export the segments.
1331        for s in r.get('service', []):
1332            id = s.get('id', None)
1333            if id and id in e_keys:
1334                e_keys[id].reqs.append(s)
1335
1336        # Add attributes to parameter space.  We don't allow attributes to
1337        # overlay any parameters already installed.
1338        for a in r.get('fedAttr', []):
1339            try:
1340                if a['attribute'] and \
1341                        isinstance(a['attribute'], basestring)\
1342                        and not tbparam[tb].has_key(a['attribute'].lower()):
1343                    tbparam[tb][a['attribute'].lower()] = a['value']
1344            except KeyError:
1345                self.log.error("Bad attribute in response: %s" % a)
1346
1347
1348    def split_topology(self, top, topo, testbeds):
1349        """
1350        Create the sub-topologies that are needed for experiment instantiation.
1351        """
1352        for tb in testbeds:
1353            topo[tb] = top.clone()
1354            # copy in for loop allows deletions from the original
1355            for e in [ e for e in topo[tb].elements]:
1356                etb = e.get_attribute('testbed')
1357                # NB: elements without a testbed attribute won't appear in any
1358                # sub topologies. 
1359                if not etb or etb != tb:
1360                    for i in e.interface:
1361                        for s in i.subs:
1362                            try:
1363                                s.interfaces.remove(i)
1364                            except ValueError:
1365                                raise service_error(service_error.internal,
1366                                        "Can't remove interface??")
1367                    topo[tb].elements.remove(e)
1368            topo[tb].make_indices()
1369
1370    def confirm_software(self, top):
1371        """
1372        Make sure that the software to be loaded in the topo is all available
1373        before we begin making access requests, etc.  This is a subset of
1374        wrangle_software.
1375        """
1376        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1377        pkgs.update([x.location for e in top.elements for x in e.software])
1378
1379        for pkg in pkgs:
1380            loc = pkg
1381
1382            scheme, host, path = urlparse(loc)[0:3]
1383            dest = os.path.basename(path)
1384            if not scheme:
1385                if not loc.startswith('/'):
1386                    loc = "/%s" % loc
1387                loc = "file://%s" %loc
1388            # NB: if scheme was found, loc == pkg
1389            try:
1390                u = urlopen(loc)
1391                u.close()
1392            except Exception, e:
1393                raise service_error(service_error.req, 
1394                        "Cannot open %s: %s" % (loc, e))
1395        return True
1396
1397    def wrangle_software(self, expid, top, topo, tbparams):
1398        """
1399        Copy software out to the repository directory, allocate permissions and
1400        rewrite the segment topologies to look for the software in local
1401        places.
1402        """
1403
1404        # Copy the rpms and tarfiles to a distribution directory from
1405        # which the federants can retrieve them
1406        linkpath = "%s/software" %  expid
1407        softdir ="%s/%s" % ( self.repodir, linkpath)
1408        softmap = { }
1409
1410        # self.fedkit and self.gateway kit are lists of tuples of
1411        # (install_location, download_location) this extracts the download
1412        # locations.
1413        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1414        pkgs.update([x.location for e in top.elements for x in e.software])
1415        try:
1416            os.makedirs(softdir)
1417        except EnvironmentError, e:
1418            raise service_error(
1419                    "Cannot create software directory: %s" % e)
1420        # The actual copying.  Everything's converted into a url for copying.
1421        auth_attrs = set()
1422        for pkg in pkgs:
1423            loc = pkg
1424
1425            scheme, host, path = urlparse(loc)[0:3]
1426            dest = os.path.basename(path)
1427            if not scheme:
1428                if not loc.startswith('/'):
1429                    loc = "/%s" % loc
1430                loc = "file://%s" %loc
1431            # NB: if scheme was found, loc == pkg
1432            try:
1433                u = urlopen(loc)
1434            except Exception, e:
1435                raise service_error(service_error.req, 
1436                        "Cannot open %s: %s" % (loc, e))
1437            try:
1438                f = open("%s/%s" % (softdir, dest) , "w")
1439                self.log.debug("Writing %s/%s" % (softdir,dest) )
1440                data = u.read(4096)
1441                while data:
1442                    f.write(data)
1443                    data = u.read(4096)
1444                f.close()
1445                u.close()
1446            except Exception, e:
1447                raise service_error(service_error.internal,
1448                        "Could not copy %s: %s" % (loc, e))
1449            path = re.sub("/tmp", "", linkpath)
1450            # XXX
1451            softmap[pkg] = \
1452                    "%s/%s/%s" %\
1453                    ( self.repo_url, path, dest)
1454
1455            # Allow the individual segments to access the software by assigning
1456            # an attribute to each testbed allocation that encodes the data to
1457            # be released.  This expression collects the data for each run of
1458            # the loop.
1459            auth_attrs.update([
1460                (tbparams[tb]['allocID']['fedid'], "/%s/%s" % ( path, dest)) \
1461                        for tb in tbparams.keys()])
1462
1463        self.append_experiment_authorization(expid, auth_attrs)
1464
1465        # Convert the software locations in the segments into the local
1466        # copies on this host
1467        for soft in [ s for tb in topo.values() \
1468                for e in tb.elements \
1469                    if getattr(e, 'software', False) \
1470                        for s in e.software ]:
1471            if softmap.has_key(soft.location):
1472                soft.location = softmap[soft.location]
1473
1474
1475    def new_experiment(self, req, fid):
1476        """
1477        The external interface to empty initial experiment creation called from
1478        the dispatcher.
1479
1480        Creates a working directory, splits the incoming description using the
1481        splitter script and parses out the avrious subsections using the
1482        lcasses above.  Once each sub-experiment is created, use pooled threads
1483        to instantiate them and start it all up.
1484        """
1485        req = req.get('NewRequestBody', None)
1486        if not req:
1487            raise service_error(service_error.req,
1488                    "Bad request format (no NewRequestBody)")
1489
1490        if self.auth.import_credentials(data_list=req.get('credential', [])):
1491            self.auth.save()
1492
1493        access_ok, proof = self.auth.check_attribute(fid, 'new', 
1494                with_proof=True)
1495
1496        if not access_ok:
1497            raise service_error(service_error.access, "New access denied",
1498                    proof=[proof])
1499
1500        try:
1501            tmpdir = tempfile.mkdtemp(prefix="split-")
1502        except EnvironmentError:
1503            raise service_error(service_error.internal, "Cannot create tmp dir")
1504
1505        try:
1506            access_user = self.accessdb[fid]
1507        except KeyError:
1508            raise service_error(service_error.internal,
1509                    "Access map and authorizer out of sync in " + \
1510                            "new_experiment for fedid %s"  % fid)
1511
1512        # Generate an ID for the experiment (slice) and a certificate that the
1513        # allocator can use to prove they own it.  We'll ship it back through
1514        # the encrypted connection.  If the requester supplied one, use it.
1515        if 'experimentAccess' in req and 'X509' in req['experimentAccess']:
1516            expcert = req['experimentAccess']['X509']
1517            expid = fedid(certstr=expcert)
1518            self.state_lock.acquire()
1519            if expid in self.state:
1520                self.state_lock.release()
1521                raise service_error(service_error.req, 
1522                        'fedid %s identifies an existing experiment' % expid)
1523            self.state_lock.release()
1524        else:
1525            (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1526
1527        #now we're done with the tmpdir, and it should be empty
1528        if self.cleanup:
1529            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1530            os.rmdir(tmpdir)
1531        else:
1532            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1533
1534        eid = self.create_experiment_state(fid, req, expid, expcert, 
1535                state='empty')
1536
1537        rv = {
1538                'experimentID': [
1539                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1540                ],
1541                'experimentStatus': 'empty',
1542                'experimentAccess': { 'X509' : expcert },
1543                'proof': proof.to_dict(),
1544            }
1545
1546        return rv
1547
1548    # create_experiment sub-functions
1549
1550    @staticmethod
1551    def get_experiment_key(req, field='experimentID'):
1552        """
1553        Parse the experiment identifiers out of the request (the request body
1554        tag has been removed).  Specifically this pulls either the fedid or the
1555        localname out of the experimentID field.  A fedid is preferred.  If
1556        neither is present or the request does not contain the fields,
1557        service_errors are raised.
1558        """
1559        # Get the experiment access
1560        exp = req.get(field, None)
1561        if exp:
1562            if exp.has_key('fedid'):
1563                key = exp['fedid']
1564            elif exp.has_key('localname'):
1565                key = exp['localname']
1566            else:
1567                raise service_error(service_error.req, "Unknown lookup type")
1568        else:
1569            raise service_error(service_error.req, "No request?")
1570
1571        return key
1572
1573    def get_experiment_ids_and_start(self, key, tmpdir):
1574        """
1575        Get the experiment name, id and access certificate from the state, and
1576        set the experiment state to 'starting'.  returns a triple (fedid,
1577        localname, access_cert_file). The access_cert_file is a copy of the
1578        contents of the access certificate, created in the tempdir with
1579        restricted permissions.  If things are confused, raise an exception.
1580        """
1581
1582        expid = eid = None
1583        self.state_lock.acquire()
1584        if self.state.has_key(key):
1585            self.state[key]['experimentStatus'] = "starting"
1586            for e in self.state[key].get('experimentID',[]):
1587                if not expid and e.has_key('fedid'):
1588                    expid = e['fedid']
1589                elif not eid and e.has_key('localname'):
1590                    eid = e['localname']
1591            if 'experimentAccess' in self.state[key] and \
1592                    'X509' in self.state[key]['experimentAccess']:
1593                expcert = self.state[key]['experimentAccess']['X509']
1594            else:
1595                expcert = None
1596        self.state_lock.release()
1597
1598        # make a protected copy of the access certificate so the experiment
1599        # controller can act as the experiment principal.
1600        if expcert:
1601            expcert_file = self.make_temp_certfile(expcert, tmpdir)
1602            if not expcert_file:
1603                raise service_error(service_error.internal, 
1604                        "Cannot create temp cert file?")
1605        else:
1606            expcert_file = None
1607
1608        return (eid, expid, expcert_file)
1609
1610    def get_topology(self, req, tmpdir):
1611        """
1612        Get the ns2 content and put it into a file for parsing.  Call the local
1613        or remote parser and return the topdl.Topology.  Errors result in
1614        exceptions.  req is the request and tmpdir is a work directory.
1615        """
1616
1617        # The tcl parser needs to read a file so put the content into that file
1618        descr=req.get('experimentdescription', None)
1619        if descr:
1620            if 'ns2description' in descr:
1621                file_content=descr['ns2description']
1622            elif 'topdldescription' in descr:
1623                return topdl.Topology(**descr['topdldescription'])
1624            else:
1625                raise service_error(service_error.req, 
1626                        'Unknown experiment description type')
1627        else:
1628            raise service_error(service_error.req, "No experiment description")
1629
1630
1631        if self.splitter_url:
1632            self.log.debug("Calling remote topdl translator at %s" % \
1633                    self.splitter_url)
1634            top = self.remote_ns2topdl(self.splitter_url, file_content)
1635        else:
1636            tclfile = os.path.join(tmpdir, "experiment.tcl")
1637            if file_content:
1638                try:
1639                    f = open(tclfile, 'w')
1640                    f.write(file_content)
1641                    f.close()
1642                except EnvironmentError:
1643                    raise service_error(service_error.internal,
1644                            "Cannot write temp experiment description")
1645            else:
1646                raise service_error(service_error.req, 
1647                        "Only ns2descriptions supported")
1648            pid = "dummy"
1649            gid = "dummy"
1650            eid = "dummy"
1651
1652            tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
1653                str(self.muxmax), '-m', 'dummy']
1654
1655            tclcmd.extend([pid, gid, eid, tclfile])
1656
1657            self.log.debug("running local splitter %s", " ".join(tclcmd))
1658            # This is just fantastic.  As a side effect the parser copies
1659            # tb_compat.tcl into the current directory, so that directory
1660            # must be writable by the fedd user.  Doing this in the
1661            # temporary subdir ensures this is the case.
1662            tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
1663                    cwd=tmpdir)
1664            split_data = tclparser.stdout
1665
1666            top = topdl.topology_from_xml(file=split_data, top="experiment")
1667            os.remove(tclfile)
1668
1669        return top
1670
1671    def get_testbed_services(self, req, testbeds):
1672        """
1673        Parse the services section of the request into into two dicts mapping
1674        testbed to lists of federated_service objects.  The first lists all
1675        exporters of services, and the second all exporters of services that
1676        need control portals int the experiment.
1677        """
1678        masters = { }
1679        pmasters = { }
1680        for s in req.get('service', []):
1681            # If this is a service request with the importall field
1682            # set, fill it out.
1683
1684            if s.get('importall', False):
1685                s['import'] = [ tb for tb in testbeds \
1686                        if tb not in s.get('export',[])]
1687                del s['importall']
1688
1689            # Add the service to masters
1690            for tb in s.get('export', []):
1691                if s.get('name', None):
1692
1693                    params = { }
1694                    for a in s.get('fedAttr', []):
1695                        params[a.get('attribute', '')] = a.get('value','')
1696
1697                    fser = federated_service(name=s['name'],
1698                            exporter=tb, importers=s.get('import',[]),
1699                            params=params)
1700                    if fser.name == 'hide_hosts' \
1701                            and 'hosts' not in fser.params:
1702                        fser.params['hosts'] = \
1703                                ",".join(tb_hosts.get(fser.exporter, []))
1704                    if tb in masters: masters[tb].append(fser)
1705                    else: masters[tb] = [fser]
1706
1707                    if fser.portal:
1708                        if tb not in pmasters: pmasters[tb] = [ fser ]
1709                        else: pmasters[tb].append(fser)
1710                else:
1711                    self.log.error('Testbed service does not have name " + \
1712                            "and importers')
1713        return masters, pmasters
1714
1715    def generate_keys_and_hosts(self, tmpdir, expid, hosts, tbparams):
1716        """
1717        Create the ssh keys necessary for interconnecting the potral nodes and
1718        the global hosts file for letting each segment know about the IP
1719        addresses in play.  Save these into the repo.  Add attributes to the
1720        autorizer allowing access controllers to download them and return a set
1721        of attributes that inform the segments where to find this stuff.  Mau
1722        raise service_errors in if there are problems.
1723        """
1724        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1725        gw_secretkey_base = "fed.%s" % self.ssh_type
1726        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1727        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1728
1729        try:
1730            self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1731        except ValueError:
1732            raise service_error(service_error.server_config, 
1733                    "Bad key type (%s)" % self.ssh_type)
1734
1735
1736        # Copy configuration files into the remote file store
1737        # The config urlpath
1738        configpath = "/%s/config" % expid
1739        # The config file system location
1740        configdir ="%s%s" % ( self.repodir, configpath)
1741        try:
1742            os.makedirs(configdir)
1743        except EnvironmentError, e:
1744            raise service_error(service_error.internal,
1745                    "Cannot create config directory: %s" % e)
1746        try:
1747            f = open("%s/hosts" % configdir, "w")
1748            print >> f, string.join(hosts, '\n')
1749            f.close()
1750        except EnvironmentError, e:
1751            raise service_error(service_error.internal, 
1752                    "Cannot write hosts file: %s" % e)
1753        try:
1754            copy_file("%s" % gw_pubkey, "%s/%s" % \
1755                    (configdir, gw_pubkey_base))
1756            copy_file("%s" % gw_secretkey, "%s/%s" % \
1757                    (configdir, gw_secretkey_base))
1758        except EnvironmentError, e:
1759            raise service_error(service_error.internal, 
1760                    "Cannot copy keyfiles: %s" % e)
1761
1762        # Allow the individual testbeds to access the configuration files,
1763        # again by setting an attribute for the relevant pathnames on each
1764        # allocation principal.  Yeah, that's a long list comprehension.
1765        self.append_experiment_authorization(expid, set([
1766            (tbparams[tb]['allocID']['fedid'], "%s/%s" % (configpath, f)) \
1767                    for tb in tbparams.keys() \
1768                        for f in ("hosts", gw_secretkey_base, gw_pubkey_base)]))
1769
1770        attrs = [ 
1771                {
1772                    'attribute': 'ssh_pubkey', 
1773                    'value': '%s/%s/config/%s' % \
1774                            (self.repo_url, expid, gw_pubkey_base)
1775                },
1776                {
1777                    'attribute': 'ssh_secretkey', 
1778                    'value': '%s/%s/config/%s' % \
1779                            (self.repo_url, expid, gw_secretkey_base)
1780                },
1781                {
1782                    'attribute': 'hosts', 
1783                    'value': '%s/%s/config/hosts' % \
1784                            (self.repo_url, expid)
1785                },
1786            ]
1787        return attrs
1788
1789
1790    def get_vtopo(self, req, fid):
1791        """
1792        Return the stored virtual topology for this experiment
1793        """
1794        rv = None
1795        state = None
1796
1797        req = req.get('VtopoRequestBody', None)
1798        if not req:
1799            raise service_error(service_error.req,
1800                    "Bad request format (no VtopoRequestBody)")
1801        exp = req.get('experiment', None)
1802        if exp:
1803            if exp.has_key('fedid'):
1804                key = exp['fedid']
1805                keytype = "fedid"
1806            elif exp.has_key('localname'):
1807                key = exp['localname']
1808                keytype = "localname"
1809            else:
1810                raise service_error(service_error.req, "Unknown lookup type")
1811        else:
1812            raise service_error(service_error.req, "No request?")
1813
1814        proof = self.check_experiment_access(fid, key)
1815
1816        self.state_lock.acquire()
1817        if self.state.has_key(key):
1818            if self.state[key].has_key('vtopo'):
1819                rv = { 'experiment' : {keytype: key },
1820                        'vtopo': self.state[key]['vtopo'],
1821                        'proof': proof.to_dict(), 
1822                    }
1823            else:
1824                state = self.state[key]['experimentStatus']
1825        self.state_lock.release()
1826
1827        if rv: return rv
1828        else: 
1829            if state:
1830                raise service_error(service_error.partial, 
1831                        "Not ready: %s" % state)
1832            else:
1833                raise service_error(service_error.req, "No such experiment")
1834
1835    def get_vis(self, req, fid):
1836        """
1837        Return the stored visualization for this experiment
1838        """
1839        rv = None
1840        state = None
1841
1842        req = req.get('VisRequestBody', None)
1843        if not req:
1844            raise service_error(service_error.req,
1845                    "Bad request format (no VisRequestBody)")
1846        exp = req.get('experiment', None)
1847        if exp:
1848            if exp.has_key('fedid'):
1849                key = exp['fedid']
1850                keytype = "fedid"
1851            elif exp.has_key('localname'):
1852                key = exp['localname']
1853                keytype = "localname"
1854            else:
1855                raise service_error(service_error.req, "Unknown lookup type")
1856        else:
1857            raise service_error(service_error.req, "No request?")
1858
1859        proof = self.check_experiment_access(fid, key)
1860
1861        self.state_lock.acquire()
1862        if self.state.has_key(key):
1863            if self.state[key].has_key('vis'):
1864                rv =  { 'experiment' : {keytype: key },
1865                        'vis': self.state[key]['vis'],
1866                        'proof': proof.to_dict(), 
1867                        }
1868            else:
1869                state = self.state[key]['experimentStatus']
1870        self.state_lock.release()
1871
1872        if rv: return rv
1873        else:
1874            if state:
1875                raise service_error(service_error.partial, 
1876                        "Not ready: %s" % state)
1877            else:
1878                raise service_error(service_error.req, "No such experiment")
1879
1880   
1881    def save_federant_information(self, allocated, tbparams, eid, vtopo, vis,
1882            top):
1883        """
1884        Store the various data that have changed in the experiment state
1885        between when it was started and the beginning of resource allocation.
1886        This is basically the information about each local allocation.  This
1887        fills in the values of the placeholder allocation in the state.  It
1888        also collects the access proofs and returns them as dicts for a
1889        response message.
1890        """
1891        # save federant information
1892        for k in allocated.keys():
1893            tbparams[k]['federant'] = {
1894                    'name': [ { 'localname' : eid} ],
1895                    'allocID' : tbparams[k]['allocID'],
1896                    'uri': tbparams[k]['uri'],
1897                    'proof': tbparams[k]['proof'],
1898                }
1899
1900        self.state_lock.acquire()
1901        self.state[eid]['vtopo'] = vtopo
1902        self.state[eid]['vis'] = vis
1903        self.state[eid]['experimentdescription'] = \
1904                { 'topdldescription': top.to_dict() }
1905        self.state[eid]['federant'] = \
1906                [ tbparams[tb]['federant'] for tb in tbparams.keys() \
1907                    if tbparams[tb].has_key('federant') ]
1908        # Access proofs for the response message
1909        proofs = [copy.deepcopy(p) for k in tbparams.keys()\
1910                    for p in tbparams[k]['federant']['proof']]
1911        if self.state_filename: 
1912            self.write_state()
1913        self.state_lock.release()
1914        return proofs
1915
1916    def clear_placeholder(self, eid, expid, tmpdir):
1917        """
1918        Clear the placeholder and remove any allocated temporary dir.
1919        """
1920
1921        self.state_lock.acquire()
1922        del self.state[eid]
1923        del self.state[expid]
1924        if self.state_filename: self.write_state()
1925        self.state_lock.release()
1926        if tmpdir and self.cleanup:
1927            self.remove_dirs(tmpdir)
1928
1929    # end of create_experiment sub-functions
1930
1931    def create_experiment(self, req, fid):
1932        """
1933        The external interface to experiment creation called from the
1934        dispatcher.
1935
1936        Creates a working directory, splits the incoming description using the
1937        splitter script and parses out the various subsections using the
1938        classes above.  Once each sub-experiment is created, use pooled threads
1939        to instantiate them and start it all up.
1940        """
1941
1942        req = req.get('CreateRequestBody', None)
1943        if req:
1944            key = self.get_experiment_key(req)
1945        else:
1946            raise service_error(service_error.req,
1947                    "Bad request format (no CreateRequestBody)")
1948
1949        # Import information from the requester
1950        if self.auth.import_credentials(data_list=req.get('credential', [])):
1951            self.auth.save()
1952
1953        # Make sure that the caller can talk to us
1954        proof = self.check_experiment_access(fid, key)
1955
1956        # Install the testbed map entries supplied with the request into a copy
1957        # of the testbed map.
1958        tbmap = dict(self.tbmap)
1959        for m in req.get('testbedmap', []):
1960            if 'testbed' in m and 'uri' in m:
1961                tbmap[m['testbed']] = m['uri']
1962
1963        # a place to work
1964        try:
1965            tmpdir = tempfile.mkdtemp(prefix="split-")
1966            os.mkdir(tmpdir+"/keys")
1967        except EnvironmentError:
1968            raise service_error(service_error.internal, "Cannot create tmp dir")
1969
1970        tbparams = { }
1971
1972        eid, expid, expcert_file = \
1973                self.get_experiment_ids_and_start(key, tmpdir)
1974
1975        # This catches exceptions to clear the placeholder if necessary
1976        try: 
1977            if not (eid and expid):
1978                raise service_error(service_error.internal, 
1979                        "Cannot find local experiment info!?")
1980
1981            top = self.get_topology(req, tmpdir)
1982            self.confirm_software(top)
1983            # Assign the IPs
1984            hosts, ip_allocator = self.allocate_ips_to_topo(top)
1985            # Find the testbeds to look up
1986            tb_hosts = { }
1987            testbeds = [ ]
1988            for e in top.elements:
1989                if isinstance(e, topdl.Computer):
1990                    tb = e.get_attribute('testbed') or 'default'
1991                    if tb in tb_hosts: tb_hosts[tb].append(e.name)
1992                    else: 
1993                        tb_hosts[tb] = [ e.name ]
1994                        testbeds.append(tb)
1995
1996            masters, pmasters = self.get_testbed_services(req, testbeds)
1997            allocated = { }         # Testbeds we can access
1998            topo ={ }               # Sub topologies
1999            connInfo = { }          # Connection information
2000
2001            self.split_topology(top, topo, testbeds)
2002
2003            self.get_access_to_testbeds(testbeds, fid, allocated, 
2004                    tbparams, masters, tbmap, expid, expcert_file)
2005
2006            attrs = self.generate_keys_and_hosts(tmpdir, expid, hosts, tbparams)
2007
2008            part = experiment_partition(self.auth, self.store_url, tbmap,
2009                    self.muxmax, self.direct_transit)
2010            part.add_portals(top, topo, eid, pmasters, tbparams, ip_allocator,
2011                    connInfo, expid)
2012
2013            auth_attrs = set()
2014            # Now get access to the dynamic testbeds (those added above)
2015            for tb in [ t for t in topo if t not in allocated]:
2016                self.get_access(tb, tbparams, fid, masters, tbmap, 
2017                        expid, expcert_file)
2018                allocated[tb] = 1
2019                store_keys = topo[tb].get_attribute('store_keys')
2020                # Give the testbed access to keys it exports or imports
2021                if store_keys:
2022                    auth_attrs.update(set([
2023                        (tbparams[tb]['allocID']['fedid'], sk) \
2024                                for sk in store_keys.split(" ")]))
2025
2026            if auth_attrs:
2027                self.append_experiment_authorization(expid, auth_attrs)
2028
2029            # transit and disconnected testbeds may not have a connInfo entry.
2030            # Fill in the blanks.
2031            for t in allocated.keys():
2032                if not connInfo.has_key(t):
2033                    connInfo[t] = { }
2034
2035            self.wrangle_software(expid, top, topo, tbparams)
2036
2037            vtopo = topdl.topology_to_vtopo(top)
2038            vis = self.genviz(vtopo)
2039            proofs = self.save_federant_information(allocated, tbparams, 
2040                    eid, vtopo, vis, top)
2041        except service_error, e:
2042            # If something goes wrong in the parse (usually an access error)
2043            # clear the placeholder state.  From here on out the code delays
2044            # exceptions.  Failing at this point returns a fault to the remote
2045            # caller.
2046            self.clear_placeholder(eid, expid, tmpdir)
2047            raise e
2048
2049        # Start the background swapper and return the starting state.  From
2050        # here on out, the state will stick around a while.
2051
2052        # Create a logger that logs to the experiment's state object as well as
2053        # to the main log file.
2054        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
2055        alloc_collector = self.list_log(self.state[eid]['log'])
2056        h = logging.StreamHandler(alloc_collector)
2057        # XXX: there should be a global one of these rather than repeating the
2058        # code.
2059        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2060                    '%d %b %y %H:%M:%S'))
2061        alloc_log.addHandler(h)
2062
2063        # Start a thread to do the resource allocation
2064        t  = Thread(target=self.allocate_resources,
2065                args=(allocated, masters, eid, expid, tbparams, 
2066                    top, topo, tmpdir, alloc_log, alloc_collector, attrs,
2067                    connInfo, tbmap, expcert_file),
2068                name=eid)
2069        t.start()
2070
2071        rv = {
2072                'experimentID': [
2073                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2074                ],
2075                'experimentStatus': 'starting',
2076                'proof': [ proof.to_dict() ] + proofs,
2077            }
2078
2079        return rv
2080   
2081    def get_experiment_fedid(self, key):
2082        """
2083        find the fedid associated with the localname key in the state database.
2084        """
2085
2086        rv = None
2087        self.state_lock.acquire()
2088        if self.state.has_key(key):
2089            if isinstance(self.state[key], dict):
2090                try:
2091                    kl = [ f['fedid'] for f in \
2092                            self.state[key]['experimentID']\
2093                                if f.has_key('fedid') ]
2094                except KeyError:
2095                    self.state_lock.release()
2096                    raise service_error(service_error.internal, 
2097                            "No fedid for experiment %s when getting "+\
2098                                    "fedid(!?)" % key)
2099                if len(kl) == 1:
2100                    rv = kl[0]
2101                else:
2102                    self.state_lock.release()
2103                    raise service_error(service_error.internal, 
2104                            "multiple fedids for experiment %s when " +\
2105                                    "getting fedid(!?)" % key)
2106            else:
2107                self.state_lock.release()
2108                raise service_error(service_error.internal, 
2109                        "Unexpected state for %s" % key)
2110        self.state_lock.release()
2111        return rv
2112
2113    def check_experiment_access(self, fid, key):
2114        """
2115        Confirm that the fid has access to the experiment.  Though a request
2116        may be made in terms of a local name, the access attribute is always
2117        the experiment's fedid.
2118        """
2119        if not isinstance(key, fedid):
2120            key = self.get_experiment_fedid(key)
2121
2122        access_ok, proof = self.auth.check_attribute(fid, key, with_proof=True)
2123
2124        if access_ok:
2125            return proof
2126        else:
2127            raise service_error(service_error.access, "Access Denied",
2128                proof)
2129
2130
2131    def get_handler(self, path, fid):
2132        """
2133        Perhaps surprisingly named, this function handles HTTP GET requests to
2134        this server (SOAP requests are POSTs).
2135        """
2136        self.log.info("Get handler %s %s" % (path, fid))
2137        # XXX: log proofs?
2138        if self.auth.check_attribute(fid, path):
2139            return ("%s/%s" % (self.repodir, path), "application/binary")
2140        else:
2141            return (None, None)
2142
2143    def clean_info_response(self, rv, proof):
2144        """
2145        Remove the information in the experiment's state object that is not in
2146        the info response.
2147        """
2148        # Remove the owner info (should always be there, but...)
2149        if rv.has_key('owner'): del rv['owner']
2150        if 'auth' in rv: del rv['auth']
2151
2152        # Convert the log into the allocationLog parameter and remove the
2153        # log entry (with defensive programming)
2154        if rv.has_key('log'):
2155            rv['allocationLog'] = "".join(rv['log'])
2156            del rv['log']
2157        else:
2158            rv['allocationLog'] = ""
2159
2160        if rv['experimentStatus'] != 'active':
2161            if rv.has_key('federant'): del rv['federant']
2162        else:
2163            # remove the allocationID and uri info from each federant
2164            for f in rv.get('federant', []):
2165                if f.has_key('allocID'): del f['allocID']
2166                if f.has_key('uri'): del f['uri']
2167        rv['proof'] = proof.to_dict()
2168
2169        return rv
2170
2171    def get_info(self, req, fid):
2172        """
2173        Return all the stored info about this experiment
2174        """
2175        rv = None
2176
2177        req = req.get('InfoRequestBody', None)
2178        if not req:
2179            raise service_error(service_error.req,
2180                    "Bad request format (no InfoRequestBody)")
2181        exp = req.get('experiment', None)
2182        if exp:
2183            if exp.has_key('fedid'):
2184                key = exp['fedid']
2185                keytype = "fedid"
2186            elif exp.has_key('localname'):
2187                key = exp['localname']
2188                keytype = "localname"
2189            else:
2190                raise service_error(service_error.req, "Unknown lookup type")
2191        else:
2192            raise service_error(service_error.req, "No request?")
2193
2194        proof = self.check_experiment_access(fid, key)
2195
2196        # The state may be massaged by the service function that called
2197        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
2198        # state.
2199        self.state_lock.acquire()
2200        if self.state.has_key(key):
2201            rv = copy.deepcopy(self.state[key])
2202        self.state_lock.release()
2203
2204        if rv:
2205            return self.clean_info_response(rv, proof)
2206        else:
2207            raise service_error(service_error.req, "No such experiment")
2208
2209    def get_multi_info(self, req, fid):
2210        """
2211        Return all the stored info that this fedid can access
2212        """
2213        rv = { 'info': [ ], 'proof': [ ] }
2214
2215        self.state_lock.acquire()
2216        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2217            try:
2218                proof = self.check_experiment_access(fid, key)
2219            except service_error, e:
2220                if e.code == service_error.access:
2221                    continue
2222                else:
2223                    self.state_lock.release()
2224                    raise e
2225
2226            if self.state.has_key(key):
2227                e = copy.deepcopy(self.state[key])
2228                e = self.clean_info_response(e, proof)
2229                rv['info'].append(e)
2230                rv['proof'].append(proof.to_dict())
2231        self.state_lock.release()
2232        return rv
2233
2234    def check_termination_status(self, fed_exp, force):
2235        """
2236        Confirm that the experiment is sin a valid state to stop (or force it)
2237        return the state - invalid states for deletion and force settings cause
2238        exceptions.
2239        """
2240        self.state_lock.acquire()
2241        status = fed_exp.get('experimentStatus', None)
2242
2243        if status:
2244            if status in ('starting', 'terminating'):
2245                if not force:
2246                    self.state_lock.release()
2247                    raise service_error(service_error.partial, 
2248                            'Experiment still being created or destroyed')
2249                else:
2250                    self.log.warning('Experiment in %s state ' % status + \
2251                            'being terminated by force.')
2252            self.state_lock.release()
2253            return status
2254        else:
2255            # No status??? trouble
2256            self.state_lock.release()
2257            raise service_error(service_error.internal,
2258                    "Experiment has no status!?")
2259
2260
2261    def get_termination_info(self, fed_exp):
2262        ids = []
2263        term_params = { }
2264        self.state_lock.acquire()
2265        #  experimentID is a list of dicts that are self-describing
2266        #  identifiers.  This finds all the fedids and localnames - the
2267        #  keys of self.state - and puts them into ids, which is used to delete
2268        #  the state after everything is swapped out.
2269        for id in fed_exp.get('experimentID', []):
2270            if 'fedid' in id: 
2271                ids.append(id['fedid'])
2272                repo = "%s" % id['fedid']
2273            if 'localname' in id: ids.append(id['localname'])
2274
2275        # Get the experimentAccess - the principal for this experiment.  It
2276        # is this principal to which credentials have been delegated, and
2277        # as which the experiment controller must act.
2278        if 'experimentAccess' in fed_exp and \
2279                'X509' in fed_exp['experimentAccess']:
2280            expcert = fed_exp['experimentAccess']['X509']
2281        else:
2282            expcert = None
2283
2284        # Collect the allocation/segment ids into a dict keyed by the fedid
2285        # of the allocation (or a monotonically increasing integer) that
2286        # contains a tuple of uri, aid (which is a dict...)
2287        for i, fed in enumerate(fed_exp.get('federant', [])):
2288            try:
2289                uri = fed['uri']
2290                aid = fed['allocID']
2291                k = fed['allocID'].get('fedid', i)
2292            except KeyError, e:
2293                continue
2294            term_params[k] = (uri, aid)
2295        # Change the experiment state
2296        fed_exp['experimentStatus'] = 'terminating'
2297        if self.state_filename: self.write_state()
2298        self.state_lock.release()
2299
2300        return ids, term_params, expcert, repo
2301
2302
2303    def deallocate_resources(self, term_params, expcert, status, force, 
2304            dealloc_log):
2305        tmpdir = None
2306        # This try block makes sure the tempdir is cleared
2307        try:
2308            # If no expcert, try the deallocation as the experiment
2309            # controller instance.
2310            if expcert and self.auth_type != 'legacy': 
2311                try:
2312                    tmpdir = tempfile.mkdtemp(prefix="term-")
2313                except EnvironmentError:
2314                    raise service_error(service_error.internal, 
2315                            "Cannot create tmp dir")
2316                cert_file = self.make_temp_certfile(expcert, tmpdir)
2317                pw = None
2318            else: 
2319                cert_file = self.cert_file
2320                pw = self.cert_pwd
2321
2322            # Stop everyone.  NB, wait_for_all waits until a thread starts
2323            # and then completes, so we can't wait if nothing starts.  So,
2324            # no tbparams, no start.
2325            if len(term_params) > 0:
2326                tp = thread_pool(self.nthreads)
2327                for k, (uri, aid) in term_params.items():
2328                    # Create and start a thread to stop the segment
2329                    tp.wait_for_slot()
2330                    t  = pooled_thread(\
2331                            target=self.terminate_segment(log=dealloc_log,
2332                                testbed=uri,
2333                                cert_file=cert_file, 
2334                                cert_pwd=pw,
2335                                trusted_certs=self.trusted_certs,
2336                                caller=self.call_TerminateSegment),
2337                            args=(uri, aid), name=k,
2338                            pdata=tp, trace_file=self.trace_file)
2339                    t.start()
2340                # Wait for completions
2341                tp.wait_for_all_done()
2342
2343            # release the allocations (failed experiments have done this
2344            # already, and starting experiments may be in odd states, so we
2345            # ignore errors releasing those allocations
2346            try: 
2347                for k, (uri, aid)  in term_params.items():
2348                    self.release_access(None, aid, uri=uri,
2349                            cert_file=cert_file, cert_pwd=pw)
2350            except service_error, e:
2351                if status != 'failed' and not force:
2352                    raise e
2353
2354        # Clean up the tmpdir no matter what
2355        finally:
2356            if tmpdir: self.remove_dirs(tmpdir)
2357
2358    def terminate_experiment(self, req, fid):
2359        """
2360        Swap this experiment out on the federants and delete the shared
2361        information
2362        """
2363        tbparams = { }
2364        req = req.get('TerminateRequestBody', None)
2365        if not req:
2366            raise service_error(service_error.req,
2367                    "Bad request format (no TerminateRequestBody)")
2368
2369        key = self.get_experiment_key(req, 'experiment')
2370        proof = self.check_experiment_access(fid, key)
2371        exp = req.get('experiment', False)
2372        force = req.get('force', False)
2373
2374        dealloc_list = [ ]
2375
2376
2377        # Create a logger that logs to the dealloc_list as well as to the main
2378        # log file.
2379        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2380        h = logging.StreamHandler(self.list_log(dealloc_list))
2381        # XXX: there should be a global one of these rather than repeating the
2382        # code.
2383        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2384                    '%d %b %y %H:%M:%S'))
2385        dealloc_log.addHandler(h)
2386
2387        self.state_lock.acquire()
2388        fed_exp = self.state.get(key, None)
2389        self.state_lock.release()
2390        repo = None
2391
2392        if fed_exp:
2393            status = self.check_termination_status(fed_exp, force)
2394            ids, term_params, expcert, repo = self.get_termination_info(fed_exp)
2395            self.deallocate_resources(term_params, expcert, status, force, 
2396                    dealloc_log)
2397
2398            # Remove the terminated experiment
2399            self.state_lock.acquire()
2400            for id in ids:
2401                self.clear_experiment_authorization(id, need_state_lock=False)
2402                if id in self.state: del self.state[id]
2403
2404            if self.state_filename: self.write_state()
2405            self.state_lock.release()
2406
2407            # Delete any synch points associated with this experiment.  All
2408            # synch points begin with the fedid of the experiment.
2409            fedid_keys = set(["fedid:%s" % f for f in ids \
2410                    if isinstance(f, fedid)])
2411            for k in self.synch_store.all_keys():
2412                try:
2413                    if len(k) > 45 and k[0:46] in fedid_keys:
2414                        self.synch_store.del_value(k)
2415                except synch_store.BadDeletionError:
2416                    pass
2417            self.write_store()
2418
2419            # Remove software and other cached stuff from the filesystem.
2420            if repo:
2421                self.remove_dirs("%s/%s" % (self.repodir, repo))
2422       
2423            return { 
2424                    'experiment': exp , 
2425                    'deallocationLog': string.join(dealloc_list, ''),
2426                    'proof': [proof.to_dict()],
2427                    }
2428        else:
2429            raise service_error(service_error.req, "No saved state")
2430
2431
2432    def GetValue(self, req, fid):
2433        """
2434        Get a value from the synchronized store
2435        """
2436        req = req.get('GetValueRequestBody', None)
2437        if not req:
2438            raise service_error(service_error.req,
2439                    "Bad request format (no GetValueRequestBody)")
2440       
2441        name = req.get('name', None)
2442        wait = req.get('wait', False)
2443        rv = { 'name': name }
2444
2445        if not name:
2446            raise service_error(service_error.req, "No name?")
2447
2448        access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True)
2449
2450        if access_ok:
2451            self.log.debug("[GetValue] asking for %s " % name)
2452            try:
2453                v = self.synch_store.get_value(name, wait)
2454            except synch_store.RevokedKeyError:
2455                # No more synch on this key
2456                raise service_error(service_error.federant, 
2457                        "Synch key %s revoked" % name)
2458            if v is not None:
2459                rv['value'] = v
2460            rv['proof'] = proof.to_dict()
2461            self.log.debug("[GetValue] got %s from %s" % (v, name))
2462            return rv
2463        else:
2464            raise service_error(service_error.access, "Access Denied",
2465                    proof=proof)
2466       
2467
2468    def SetValue(self, req, fid):
2469        """
2470        Set a value in the synchronized store
2471        """
2472        req = req.get('SetValueRequestBody', None)
2473        if not req:
2474            raise service_error(service_error.req,
2475                    "Bad request format (no SetValueRequestBody)")
2476       
2477        name = req.get('name', None)
2478        v = req.get('value', '')
2479
2480        if not name:
2481            raise service_error(service_error.req, "No name?")
2482
2483        access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True)
2484
2485        if access_ok:
2486            try:
2487                self.synch_store.set_value(name, v)
2488                self.write_store()
2489                self.log.debug("[SetValue] set %s to %s" % (name, v))
2490            except synch_store.CollisionError:
2491                # Translate into a service_error
2492                raise service_error(service_error.req,
2493                        "Value already set: %s" %name)
2494            except synch_store.RevokedKeyError:
2495                # No more synch on this key
2496                raise service_error(service_error.federant, 
2497                        "Synch key %s revoked" % name)
2498                return { 'name': name, 'value': v, 'proof': proof.to_dict() }
2499        else:
2500            raise service_error(service_error.access, "Access Denied",
2501                    proof=proof)
Note: See TracBrowser for help on using the repository browser.