source: fedd/federation/experiment_control.py @ 6031c9d

compt_changesinfo-ops
Last change on this file since 6031c9d was 6031c9d, checked in by Ted Faber <faber@…>, 13 years ago

logging that closes #2

  • Property mode set to 100644
File size: 84.1 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import Lock, Thread, Condition
21from subprocess import call, Popen, PIPE
22from string import join
23
24from urlparse import urlparse
25from urllib2 import urlopen
26
27from util import *
28from fedid import fedid, generate_fedid
29from remote_service import xmlrpc_handler, soap_handler, service_caller
30from service_error import service_error
31from synch_store import synch_store
32from experiment_partition import experiment_partition
33from experiment_control_legacy import experiment_control_legacy
34from authorizer import abac_authorizer
35from thread_pool import thread_pool, pooled_thread
36
37import topdl
38import list_log
39from ip_allocator import ip_allocator
40from ip_addr import ip_addr
41
42
43class nullHandler(logging.Handler):
44    def emit(self, record): pass
45
46fl = logging.getLogger("fedd.experiment_control")
47fl.addHandler(nullHandler())
48
49
50# Right now, no support for composition.
51class federated_service:
52    def __init__(self, name, exporter=None, importers=None, params=None, 
53            reqs=None, portal=None):
54        self.name=name
55        self.exporter=exporter
56        if importers is None: self.importers = []
57        else: self.importers=importers
58        if params is None: self.params = { }
59        else: self.params = params
60        if reqs is None: self.reqs = []
61        else: self.reqs = reqs
62
63        if portal is not None:
64            self.portal = portal
65        else:
66            self.portal = (name in federated_service.needs_portal)
67
68    def __str__(self):
69        return "name %s export %s import %s params %s reqs %s" % \
70                (self.name, self.exporter, self.importers, self.params,
71                        [ (r['name'], r['visibility']) for r in self.reqs] )
72
73    needs_portal = ('SMB', 'seer', 'tmcd', 'project_export', 'seer_master')
74
75class experiment_control_local(experiment_control_legacy):
76    """
77    Control of experiments that this system can directly access.
78
79    Includes experiment creation, termination and information dissemination.
80    Thred safe.
81    """
82
83    class ssh_cmd_timeout(RuntimeError): pass
84   
85    call_RequestAccess = service_caller('RequestAccess')
86    call_ReleaseAccess = service_caller('ReleaseAccess')
87    call_StartSegment = service_caller('StartSegment')
88    call_TerminateSegment = service_caller('TerminateSegment')
89    call_Ns2Topdl = service_caller('Ns2Topdl')
90
91    def __init__(self, config=None, auth=None):
92        """
93        Intialize the various attributes, most from the config object
94        """
95
96        def parse_tarfile_list(tf):
97            """
98            Parse a tarfile list from the configuration.  This is a set of
99            paths and tarfiles separated by spaces.
100            """
101            rv = [ ]
102            if tf is not None:
103                tl = tf.split()
104                while len(tl) > 1:
105                    p, t = tl[0:2]
106                    del tl[0:2]
107                    rv.append((p, t))
108            return rv
109
110        self.list_log = list_log.list_log
111
112        self.cert_file = config.get("experiment_control", "cert_file")
113        if self.cert_file:
114            self.cert_pwd = config.get("experiment_control", "cert_pwd")
115        else:
116            self.cert_file = config.get("globals", "cert_file")
117            self.cert_pwd = config.get("globals", "cert_pwd")
118
119        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
120                or config.get("globals", "trusted_certs")
121
122        self.repodir = config.get("experiment_control", "repodir")
123        self.repo_url = config.get("experiment_control", "repo_url", 
124                "https://users.isi.deterlab.net:23235");
125
126        self.exp_stem = "fed-stem"
127        self.log = logging.getLogger("fedd.experiment_control")
128        set_log_level(config, "experiment_control", self.log)
129        self.muxmax = 2
130        self.nthreads = 10
131        self.randomize_experiments = False
132
133        self.splitter = None
134        self.ssh_keygen = "/usr/bin/ssh-keygen"
135        self.ssh_identity_file = None
136
137
138        self.debug = config.getboolean("experiment_control", "create_debug")
139        self.cleanup = not config.getboolean("experiment_control", 
140                "leave_tmpfiles")
141        self.state_filename = config.get("experiment_control", 
142                "experiment_state")
143        self.store_filename = config.get("experiment_control", 
144                "synch_store")
145        self.store_url = config.get("experiment_control", "store_url")
146        self.splitter_url = config.get("experiment_control", "ns2topdl_uri")
147        self.fedkit = parse_tarfile_list(\
148                config.get("experiment_control", "fedkit"))
149        self.gatewaykit = parse_tarfile_list(\
150                config.get("experiment_control", "gatewaykit"))
151        accessdb_file = config.get("experiment_control", "accessdb")
152
153        dt = config.get("experiment_control", "direct_transit")
154        self.auth_type = config.get('experiment_control', 'auth_type') \
155                or 'legacy'
156        self.auth_dir = config.get('experiment_control', 'auth_dir')
157        if dt: self.direct_transit = [ tb.strip() for tb in dt.split(",")]
158        else: self.direct_transit = [ ]
159        # NB for internal master/slave ops, not experiment setup
160        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
161
162        self.overrides = set([])
163        ovr = config.get('experiment_control', 'overrides')
164        if ovr:
165            for o in ovr.split(","):
166                o = o.strip()
167                if o.startswith('fedid:'): o = o[len('fedid:'):]
168                self.overrides.add(fedid(hexstr=o))
169
170        self.state = { }
171        self.state_lock = Lock()
172        self.tclsh = "/usr/local/bin/otclsh"
173        self.tcl_splitter = config.get("ns2topdl", "tcl_splitter") or \
174                config.get("experiment_control", "tcl_splitter",
175                        "/usr/testbed/lib/ns2ir/parse.tcl")
176        mapdb_file = config.get("experiment_control", "mapdb")
177        self.trace_file = sys.stderr
178
179        self.def_expstart = \
180                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
181                "/tmp/federate";
182        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
183                "FEDDIR/hosts";
184        self.def_gwstart = \
185                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
186                "/tmp/bridge.log";
187        self.def_mgwstart = \
188                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
189                "/tmp/bridge.log";
190        self.def_gwimage = "FBSD61-TUNNEL2";
191        self.def_gwtype = "pc";
192        self.local_access = { }
193
194        if self.auth_type == 'legacy':
195            if auth:
196                self.auth = auth
197            else:
198                self.log.error( "[access]: No authorizer initialized, " +\
199                        "creating local one.")
200                auth = authorizer()
201            self.get_access = self.legacy_get_access
202        elif self.auth_type == 'abac':
203            self.auth = abac_authorizer(load=self.auth_dir)
204        else:
205            raise service_error(service_error.internal, 
206                    "Unknown auth_type: %s" % self.auth_type)
207
208        if mapdb_file:
209            self.read_mapdb(mapdb_file)
210        else:
211            self.log.warn("[experiment_control] No testbed map, using defaults")
212            self.tbmap = { 
213                    'deter':'https://users.isi.deterlab.net:23235',
214                    'emulab':'https://users.isi.deterlab.net:23236',
215                    'ucb':'https://users.isi.deterlab.net:23237',
216                    }
217
218        if accessdb_file:
219                self.read_accessdb(accessdb_file)
220        else:
221            raise service_error(service_error.internal,
222                    "No accessdb specified in config")
223
224        # Grab saved state.  OK to do this w/o locking because it's read only
225        # and only one thread should be in existence that can see self.state at
226        # this point.
227        if self.state_filename:
228            self.read_state()
229
230        if self.store_filename:
231            self.read_store()
232        else:
233            self.log.warning("No saved synch store")
234            self.synch_store = synch_store
235
236        # Dispatch tables
237        self.soap_services = {\
238                'New': soap_handler('New', self.new_experiment),
239                'Create': soap_handler('Create', self.create_experiment),
240                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
241                'Vis': soap_handler('Vis', self.get_vis),
242                'Info': soap_handler('Info', self.get_info),
243                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
244                'Terminate': soap_handler('Terminate', 
245                    self.terminate_experiment),
246                'GetValue': soap_handler('GetValue', self.GetValue),
247                'SetValue': soap_handler('SetValue', self.SetValue),
248        }
249
250        self.xmlrpc_services = {\
251                'New': xmlrpc_handler('New', self.new_experiment),
252                'Create': xmlrpc_handler('Create', self.create_experiment),
253                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
254                'Vis': xmlrpc_handler('Vis', self.get_vis),
255                'Info': xmlrpc_handler('Info', self.get_info),
256                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
257                'Terminate': xmlrpc_handler('Terminate',
258                    self.terminate_experiment),
259                'GetValue': xmlrpc_handler('GetValue', self.GetValue),
260                'SetValue': xmlrpc_handler('SetValue', self.SetValue),
261        }
262
263    # Call while holding self.state_lock
264    def write_state(self):
265        """
266        Write a new copy of experiment state after copying the existing state
267        to a backup.
268
269        State format is a simple pickling of the state dictionary.
270        """
271        if os.access(self.state_filename, os.W_OK):
272            copy_file(self.state_filename, \
273                    "%s.bak" % self.state_filename)
274        try:
275            f = open(self.state_filename, 'w')
276            pickle.dump(self.state, f)
277        except EnvironmentError, e:
278            self.log.error("Can't write file %s: %s" % \
279                    (self.state_filename, e))
280        except pickle.PicklingError, e:
281            self.log.error("Pickling problem: %s" % e)
282        except TypeError, e:
283            self.log.error("Pickling problem (TypeError): %s" % e)
284
285    @staticmethod
286    def get_alloc_ids(state):
287        """
288        Pull the fedids of the identifiers of each allocation from the
289        state.  Again, a dict dive that's best isolated.
290
291        Used by read_store and read state
292        """
293
294        return [ f['allocID']['fedid'] 
295                for f in state.get('federant',[]) \
296                    if f.has_key('allocID') and \
297                        f['allocID'].has_key('fedid')]
298
299    # Call while holding self.state_lock
300    def read_state(self):
301        """
302        Read a new copy of experiment state.  Old state is overwritten.
303
304        State format is a simple pickling of the state dictionary.
305        """
306       
307        def get_experiment_id(state):
308            """
309            Pull the fedid experimentID out of the saved state.  This is kind
310            of a gross walk through the dict.
311            """
312
313            if state.has_key('experimentID'):
314                for e in state['experimentID']:
315                    if e.has_key('fedid'):
316                        return e['fedid']
317                else:
318                    return None
319            else:
320                return None
321
322        try:
323            f = open(self.state_filename, "r")
324            self.state = pickle.load(f)
325            self.log.debug("[read_state]: Read state from %s" % \
326                    self.state_filename)
327        except EnvironmentError, e:
328            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
329                    % (self.state_filename, e))
330        except pickle.UnpicklingError, e:
331            self.log.warning(("[read_state]: No saved state: " + \
332                    "Unpickling failed: %s") % e)
333       
334        for s in self.state.values():
335            try:
336
337                eid = get_experiment_id(s)
338                if eid : 
339                    if self.auth_type == 'legacy':
340                        # XXX: legacy
341                        # Give the owner rights to the experiment
342                        self.auth.set_attribute(s['owner'], eid)
343                        # And holders of the eid as well
344                        self.auth.set_attribute(eid, eid)
345                        # allow overrides to control experiments as well
346                        for o in self.overrides:
347                            self.auth.set_attribute(o, eid)
348                        # Set permissions to allow reading of the software
349                        # repo, if any, as well.
350                        for a in self.get_alloc_ids(s):
351                            self.auth.set_attribute(a, 'repo/%s' % eid)
352                else:
353                    raise KeyError("No experiment id")
354            except KeyError, e:
355                self.log.warning("[read_state]: State ownership or identity " +\
356                        "misformatted in %s: %s" % (self.state_filename, e))
357
358
359    def read_accessdb(self, accessdb_file):
360        """
361        Read the mapping from fedids that can create experiments to their name
362        in the 3-level access namespace.  All will be asserted from this
363        testbed and can include the local username and porject that will be
364        asserted on their behalf by this fedd.  Each fedid is also added to the
365        authorization system with the "create" attribute.
366        """
367        self.accessdb = {}
368        # These are the regexps for parsing the db
369        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
370        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
371                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
372        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
373                "\s*->\s*(" + name_expr + ")\s*$")
374        lineno = 0
375
376        # Parse the mappings and store in self.authdb, a dict of
377        # fedid -> (proj, user)
378        try:
379            f = open(accessdb_file, "r")
380            for line in f:
381                lineno += 1
382                line = line.strip()
383                if len(line) == 0 or line.startswith('#'):
384                    continue
385                m = project_line.match(line)
386                if m:
387                    fid = fedid(hexstr=m.group(1))
388                    project, user = m.group(2,3)
389                    if not self.accessdb.has_key(fid):
390                        self.accessdb[fid] = []
391                    self.accessdb[fid].append((project, user))
392                    continue
393
394                m = user_line.match(line)
395                if m:
396                    fid = fedid(hexstr=m.group(1))
397                    project = None
398                    user = m.group(2)
399                    if not self.accessdb.has_key(fid):
400                        self.accessdb[fid] = []
401                    self.accessdb[fid].append((project, user))
402                    continue
403                self.log.warn("[experiment_control] Error parsing access " +\
404                        "db %s at line %d" %  (accessdb_file, lineno))
405        except EnvironmentError:
406            raise service_error(service_error.internal,
407                    ("Error opening/reading %s as experiment " +\
408                            "control accessdb") %  accessdb_file)
409        f.close()
410
411        # Initialize the authorization attributes
412        # XXX: legacy
413        if self.auth_type == 'legacy':
414            for fid in self.accessdb.keys():
415                self.auth.set_attribute(fid, 'create')
416                self.auth.set_attribute(fid, 'new')
417
418    def read_mapdb(self, file):
419        """
420        Read a simple colon separated list of mappings for the
421        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
422        """
423
424        self.tbmap = { }
425        lineno =0
426        try:
427            f = open(file, "r")
428            for line in f:
429                lineno += 1
430                line = line.strip()
431                if line.startswith('#') or len(line) == 0:
432                    continue
433                try:
434                    label, url = line.split(':', 1)
435                    self.tbmap[label] = url
436                except ValueError, e:
437                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
438                            "map db: %s %s" % (lineno, line, e))
439        except EnvironmentError, e:
440            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
441                    "open %s: %s" % (file, e))
442        else:
443            f.close()
444
445    def read_store(self):
446        try:
447            self.synch_store = synch_store()
448            self.synch_store.load(self.store_filename)
449            self.log.debug("[read_store]: Read store from %s" % \
450                    self.store_filename)
451        except EnvironmentError, e:
452            self.log.warning("[read_store]: No saved store: Can't open %s: %s"\
453                    % (self.state_filename, e))
454            self.synch_store = synch_store()
455
456        # Set the initial permissions on data in the store.  XXX: This ad hoc
457        # authorization attribute initialization is getting out of hand.
458        # XXX: legacy
459        if self.auth_type == 'legacy':
460            for k in self.synch_store.all_keys():
461                try:
462                    if k.startswith('fedid:'):
463                        fid = fedid(hexstr=k[6:46])
464                        if self.state.has_key(fid):
465                            for a in self.get_alloc_ids(self.state[fid]):
466                                self.auth.set_attribute(a, k)
467                except ValueError, e:
468                    self.log.warn("Cannot deduce permissions for %s" % k)
469
470
471    def write_store(self):
472        """
473        Write a new copy of synch_store after writing current state
474        to a backup.  We use the internal synch_store pickle method to avoid
475        incinsistent data.
476
477        State format is a simple pickling of the store.
478        """
479        if os.access(self.store_filename, os.W_OK):
480            copy_file(self.store_filename, \
481                    "%s.bak" % self.store_filename)
482        try:
483            self.synch_store.save(self.store_filename)
484        except EnvironmentError, e:
485            self.log.error("Can't write file %s: %s" % \
486                    (self.store_filename, e))
487        except TypeError, e:
488            self.log.error("Pickling problem (TypeError): %s" % e)
489
490
491    def remove_dirs(self, dir):
492        """
493        Remove the directory tree and all files rooted at dir.  Log any errors,
494        but continue.
495        """
496        self.log.debug("[removedirs]: removing %s" % dir)
497        try:
498            for path, dirs, files in os.walk(dir, topdown=False):
499                for f in files:
500                    os.remove(os.path.join(path, f))
501                for d in dirs:
502                    os.rmdir(os.path.join(path, d))
503            os.rmdir(dir)
504        except EnvironmentError, e:
505            self.log.error("Error deleting directory tree in %s" % e);
506
507    @staticmethod
508    def make_temp_certfile(expcert, tmpdir):
509        """
510        make a protected copy of the access certificate so the experiment
511        controller can act as the experiment principal.  mkstemp is the most
512        secure way to do that. The directory should be created by
513        mkdtemp.  Return the filename.
514        """
515        if expcert and tmpdir:
516            try:
517                certf, certfn = tempfile.mkstemp(suffix=".pem", dir=tmpdir)
518                f = os.fdopen(certf, 'w')
519                print >> f, expcert
520                f.close()
521            except EnvironmentError, e:
522                raise service_error(service_error.internal, 
523                        "Cannot create temp cert file?")
524            return certfn
525        else:
526            return None
527
528       
529    def generate_ssh_keys(self, dest, type="rsa" ):
530        """
531        Generate a set of keys for the gateways to use to talk.
532
533        Keys are of type type and are stored in the required dest file.
534        """
535        valid_types = ("rsa", "dsa")
536        t = type.lower();
537        if t not in valid_types: raise ValueError
538        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
539
540        try:
541            trace = open("/dev/null", "w")
542        except EnvironmentError:
543            raise service_error(service_error.internal,
544                    "Cannot open /dev/null??");
545
546        # May raise CalledProcessError
547        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
548        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
549        if rv != 0:
550            raise service_error(service_error.internal, 
551                    "Cannot generate nonce ssh keys.  %s return code %d" \
552                            % (self.ssh_keygen, rv))
553
554    def gentopo(self, str):
555        """
556        Generate the topology data structure from the splitter's XML
557        representation of it.
558
559        The topology XML looks like:
560            <experiment>
561                <nodes>
562                    <node><vname></vname><ips>ip1:ip2</ips></node>
563                </nodes>
564                <lans>
565                    <lan>
566                        <vname></vname><vnode></vnode><ip></ip>
567                        <bandwidth></bandwidth><member>node:port</member>
568                    </lan>
569                </lans>
570        """
571        class topo_parse:
572            """
573            Parse the topology XML and create the dats structure.
574            """
575            def __init__(self):
576                # Typing of the subelements for data conversion
577                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
578                self.int_subelements = ( 'bandwidth',)
579                self.float_subelements = ( 'delay',)
580                # The final data structure
581                self.nodes = [ ]
582                self.lans =  [ ]
583                self.topo = { \
584                        'node': self.nodes,\
585                        'lan' : self.lans,\
586                    }
587                self.element = { }  # Current element being created
588                self.chars = ""     # Last text seen
589
590            def end_element(self, name):
591                # After each sub element the contents is added to the current
592                # element or to the appropriate list.
593                if name == 'node':
594                    self.nodes.append(self.element)
595                    self.element = { }
596                elif name == 'lan':
597                    self.lans.append(self.element)
598                    self.element = { }
599                elif name in self.str_subelements:
600                    self.element[name] = self.chars
601                    self.chars = ""
602                elif name in self.int_subelements:
603                    self.element[name] = int(self.chars)
604                    self.chars = ""
605                elif name in self.float_subelements:
606                    self.element[name] = float(self.chars)
607                    self.chars = ""
608
609            def found_chars(self, data):
610                self.chars += data.rstrip()
611
612
613        tp = topo_parse();
614        parser = xml.parsers.expat.ParserCreate()
615        parser.EndElementHandler = tp.end_element
616        parser.CharacterDataHandler = tp.found_chars
617
618        parser.Parse(str)
619
620        return tp.topo
621       
622
623    def genviz(self, topo):
624        """
625        Generate the visualization the virtual topology
626        """
627
628        neato = "/usr/local/bin/neato"
629        # These are used to parse neato output and to create the visualization
630        # file.
631        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="([\d\.]+),([\d\.]+)"')
632        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
633                "%s</type></node>"
634
635        try:
636            # Node names
637            nodes = [ n['vname'] for n in topo['node'] ]
638            topo_lans = topo['lan']
639        except KeyError, e:
640            raise service_error(service_error.internal, "Bad topology: %s" %e)
641
642        lans = { }
643        links = { }
644
645        # Walk through the virtual topology, organizing the connections into
646        # 2-node connections (links) and more-than-2-node connections (lans).
647        # When a lan is created, it's added to the list of nodes (there's a
648        # node in the visualization for the lan).
649        for l in topo_lans:
650            if links.has_key(l['vname']):
651                if len(links[l['vname']]) < 2:
652                    links[l['vname']].append(l['vnode'])
653                else:
654                    nodes.append(l['vname'])
655                    lans[l['vname']] = links[l['vname']]
656                    del links[l['vname']]
657                    lans[l['vname']].append(l['vnode'])
658            elif lans.has_key(l['vname']):
659                lans[l['vname']].append(l['vnode'])
660            else:
661                links[l['vname']] = [ l['vnode'] ]
662
663
664        # Open up a temporary file for dot to turn into a visualization
665        try:
666            df, dotname = tempfile.mkstemp()
667            dotfile = os.fdopen(df, 'w')
668        except EnvironmentError:
669            raise service_error(service_error.internal,
670                    "Failed to open file in genviz")
671
672        try:
673            dnull = open('/dev/null', 'w')
674        except EnvironmentError:
675            service_error(service_error.internal,
676                    "Failed to open /dev/null in genviz")
677
678        # Generate a dot/neato input file from the links, nodes and lans
679        try:
680            print >>dotfile, "graph G {"
681            for n in nodes:
682                print >>dotfile, '\t"%s"' % n
683            for l in links.keys():
684                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
685            for l in lans.keys():
686                for n in lans[l]:
687                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
688            print >>dotfile, "}"
689            dotfile.close()
690        except TypeError:
691            raise service_error(service_error.internal,
692                    "Single endpoint link in vtopo")
693        except EnvironmentError:
694            raise service_error(service_error.internal, "Cannot write dot file")
695
696        # Use dot to create a visualization
697        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
698                '-Gpack=true', dotname], stdout=PIPE, stderr=dnull,
699                close_fds=True)
700        dnull.close()
701
702        # Translate dot to vis format
703        vis_nodes = [ ]
704        vis = { 'node': vis_nodes }
705        for line in dot.stdout:
706            m = vis_re.match(line)
707            if m:
708                vn = m.group(1)
709                vis_node = {'name': vn, \
710                        'x': float(m.group(2)),\
711                        'y' : float(m.group(3)),\
712                    }
713                if vn in links.keys() or vn in lans.keys():
714                    vis_node['type'] = 'lan'
715                else:
716                    vis_node['type'] = 'node'
717                vis_nodes.append(vis_node)
718        rv = dot.wait()
719
720        os.remove(dotname)
721        if rv == 0 : return vis
722        else: return None
723
724
725    def release_access(self, tb, aid, tbmap=None, uri=None, cert_file=None,
726            cert_pwd=None):
727        """
728        Release access to testbed through fedd
729        """
730
731        if not uri and tbmap:
732            uri = tbmap.get(tb, None)
733        if not uri:
734            raise service_error(service_error.server_config, 
735                    "Unknown testbed: %s" % tb)
736
737        if self.local_access.has_key(uri):
738            resp = self.local_access[uri].ReleaseAccess(\
739                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
740                    fedid(file=cert_file))
741            resp = { 'ReleaseAccessResponseBody': resp } 
742        else:
743            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
744                    cert_file, cert_pwd, self.trusted_certs)
745
746        # better error coding
747
748    def remote_ns2topdl(self, uri, desc):
749
750        req = {
751                'description' : { 'ns2description': desc },
752            }
753
754        r = self.call_Ns2Topdl(uri, req, self.cert_file, self.cert_pwd, 
755                self.trusted_certs)
756
757        if r.has_key('Ns2TopdlResponseBody'):
758            r = r['Ns2TopdlResponseBody']
759            ed = r.get('experimentdescription', None)
760            if ed.has_key('topdldescription'):
761                return topdl.Topology(**ed['topdldescription'])
762            else:
763                raise service_error(service_error.protocol, 
764                        "Bad splitter response (no output)")
765        else:
766            raise service_error(service_error.protocol, "Bad splitter response")
767
768    class start_segment:
769        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
770                cert_pwd=None, trusted_certs=None, caller=None,
771                log_collector=None):
772            self.log = log
773            self.debug = debug
774            self.cert_file = cert_file
775            self.cert_pwd = cert_pwd
776            self.trusted_certs = None
777            self.caller = caller
778            self.testbed = testbed
779            self.log_collector = log_collector
780            self.response = None
781            self.node = { }
782            self.proof = None
783
784        def make_map(self, resp):
785            for e in resp.get('embedding', []):
786                if 'toponame' in e and 'physname' in e:
787                    self.node[e['toponame']] = e['physname'][0]
788
789        def __call__(self, uri, aid, topo, masters, attrs=None, connInfo=None):
790            req = {
791                    'allocID': { 'fedid' : aid }, 
792                    'segmentdescription': { 
793                        'topdldescription': topo.to_dict(),
794                    },
795                }
796
797            if connInfo:
798                req['connection'] = connInfo
799
800            import_svcs = [ s for m in masters.values() \
801                    for s in m if self.testbed in s.importers]
802
803            if import_svcs or self.testbed in masters:
804                req['service'] = []
805
806            for s in import_svcs:
807                for r in s.reqs:
808                    sr = copy.deepcopy(r)
809                    sr['visibility'] = 'import';
810                    req['service'].append(sr)
811
812            for s in masters.get(self.testbed, []):
813                for r in s.reqs:
814                    sr = copy.deepcopy(r)
815                    sr['visibility'] = 'export';
816                    req['service'].append(sr)
817
818            if attrs:
819                req['fedAttr'] = attrs
820
821            try:
822                self.log.debug("Calling StartSegment at %s " % uri)
823                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
824                        self.trusted_certs)
825                if r.has_key('StartSegmentResponseBody'):
826                    lval = r['StartSegmentResponseBody'].get('allocationLog',
827                            None)
828                    if lval and self.log_collector:
829                        for line in  lval.splitlines(True):
830                            self.log_collector.write(line)
831                    self.make_map(r['StartSegmentResponseBody'])
832                    if 'proof' in r: self.proof = r['proof']
833                    self.response = r
834                else:
835                    raise service_error(service_error.internal, 
836                            "Bad response!?: %s" %r)
837                return True
838            except service_error, e:
839                self.log.error("Start segment failed on %s: %s" % \
840                        (self.testbed, e))
841                return False
842
843
844
845    class terminate_segment:
846        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
847                cert_pwd=None, trusted_certs=None, caller=None):
848            self.log = log
849            self.debug = debug
850            self.cert_file = cert_file
851            self.cert_pwd = cert_pwd
852            self.trusted_certs = None
853            self.caller = caller
854            self.testbed = testbed
855
856        def __call__(self, uri, aid ):
857            req = {
858                    'allocID': aid , 
859                }
860            try:
861                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
862                        self.trusted_certs)
863                return True
864            except service_error, e:
865                self.log.error("Terminate segment failed on %s: %s" % \
866                        (self.testbed, e))
867                return False
868
869    def allocate_resources(self, allocated, masters, eid, expid, 
870            tbparams, top, topo, tmpdir, alloc_log=None, log_collector=None, 
871            attrs=None, connInfo={}, tbmap=None, expcert=None):
872
873        started = { }           # Testbeds where a sub-experiment started
874                                # successfully
875
876        # XXX
877        fail_soft = False
878
879        if tbmap is None: tbmap = { }
880
881        log = alloc_log or self.log
882
883        tp = thread_pool(self.nthreads)
884        threads = [ ]
885        starters = [ ]
886
887        if expcert:
888            cert = expcert
889            pw = None
890        else:
891            cert = self.cert_file
892            pw = self.cert_pwd
893
894        for tb in allocated.keys():
895            # Create and start a thread to start the segment, and save it
896            # to get the return value later
897            tb_attrs = copy.copy(attrs)
898            tp.wait_for_slot()
899            uri = tbparams[tb].get('uri', tbmap.get(testbed_base(tb), None))
900            base, suffix = split_testbed(tb)
901            if suffix:
902                tb_attrs.append({'attribute': 'experiment_name', 
903                    'value': "%s-%s" % (eid, suffix)})
904            else:
905                tb_attrs.append({'attribute': 'experiment_name', 'value': eid})
906            if not uri:
907                raise service_error(service_error.internal, 
908                        "Unknown testbed %s !?" % tb)
909
910            if tbparams[tb].has_key('allocID') and \
911                    tbparams[tb]['allocID'].has_key('fedid'):
912                aid = tbparams[tb]['allocID']['fedid']
913            else:
914                raise service_error(service_error.internal, 
915                        "No alloc id for testbed %s !?" % tb)
916
917            s = self.start_segment(log=log, debug=self.debug,
918                    testbed=tb, cert_file=cert,
919                    cert_pwd=pw, trusted_certs=self.trusted_certs,
920                    caller=self.call_StartSegment,
921                    log_collector=log_collector)
922            starters.append(s)
923            t  = pooled_thread(\
924                    target=s, name=tb,
925                    args=(uri, aid, topo[tb], masters, tb_attrs, connInfo[tb]),
926                    pdata=tp, trace_file=self.trace_file)
927            threads.append(t)
928            t.start()
929
930        # Wait until all finish (keep pinging the log, though)
931        mins = 0
932        revoked = False
933        while not tp.wait_for_all_done(60.0):
934            mins += 1
935            alloc_log.info("Waiting for sub threads (it has been %d mins)" \
936                    % mins)
937            if not revoked and \
938                    len([ t.getName() for t in threads if t.rv == False]) > 0:
939                # a testbed has failed.  Revoke this experiment's
940                # synchronizarion values so that sub experiments will not
941                # deadlock waiting for synchronization that will never happen
942                self.log.info("A subexperiment has failed to swap in, " + \
943                        "revoking synch keys")
944                var_key = "fedid:%s" % expid
945                for k in self.synch_store.all_keys():
946                    if len(k) > 45 and k[0:46] == var_key:
947                        self.synch_store.revoke_key(k)
948                revoked = True
949
950        failed = [ t.getName() for t in threads if not t.rv ]
951        succeeded = [tb for tb in allocated.keys() if tb not in failed]
952
953        # If one failed clean up, unless fail_soft is set
954        if failed:
955            if not fail_soft:
956                tp.clear()
957                for tb in succeeded:
958                    # Create and start a thread to stop the segment
959                    tp.wait_for_slot()
960                    uri = tbparams[tb]['uri']
961                    t  = pooled_thread(\
962                            target=self.terminate_segment(log=log,
963                                testbed=tb,
964                                cert_file=cert, 
965                                cert_pwd=pw,
966                                trusted_certs=self.trusted_certs,
967                                caller=self.call_TerminateSegment),
968                            args=(uri, tbparams[tb]['federant']['allocID']),
969                            name=tb,
970                            pdata=tp, trace_file=self.trace_file)
971                    t.start()
972                # Wait until all finish (if any are being stopped)
973                if succeeded:
974                    tp.wait_for_all_done()
975
976                # release the allocations
977                for tb in tbparams.keys():
978                    try:
979                        self.release_access(tb, tbparams[tb]['allocID'], 
980                                tbmap=tbmap, uri=tbparams[tb].get('uri', None),
981                                cert_file=cert, cert_pwd=pw)
982                    except service_error, e:
983                        self.log.warn("Error releasing access: %s" % e.desc)
984                # Remove the placeholder
985                self.state_lock.acquire()
986                self.state[eid]['experimentStatus'] = 'failed'
987                if self.state_filename: self.write_state()
988                self.state_lock.release()
989                # Remove the repo dir
990                self.remove_dirs("%s/%s" %(self.repodir, expid))
991                # Walk up tmpdir, deleting as we go
992                if self.cleanup:
993                    self.remove_dirs(tmpdir)
994                else:
995                    log.debug("[start_experiment]: not removing %s" % tmpdir)
996
997
998                log.error("Swap in failed on %s" % ",".join(failed))
999                return
1000        else:
1001            # Walk through the successes and gather the virtual to physical
1002            # mapping.
1003            embedding = [ ]
1004            proofs = { }
1005            for s in starters:
1006                for k, v in s.node.items():
1007                    embedding.append({
1008                        'toponame': k, 
1009                        'physname': [ v],
1010                        'testbed': s.testbed
1011                        })
1012                if s.proof:
1013                    proofs[s.testbed] = s.proof
1014            log.info("[start_segment]: Experiment %s active" % eid)
1015
1016
1017        # Walk up tmpdir, deleting as we go
1018        if self.cleanup:
1019            self.remove_dirs(tmpdir)
1020        else:
1021            log.debug("[start_experiment]: not removing %s" % tmpdir)
1022
1023        # Insert the experiment into our state and update the disk copy.
1024        self.state_lock.acquire()
1025        self.state[expid]['experimentStatus'] = 'active'
1026        self.state[eid] = self.state[expid]
1027        self.state[eid]['experimentdescription']['topdldescription'] = \
1028                top.to_dict()
1029        self.state[eid]['embedding'] = embedding
1030        # Append startup proofs
1031        for f in self.state[eid]['federant']:
1032            if 'name' in f and 'localname' in f['name']:
1033                if f['name']['localname'] in proofs:
1034                    f['proof'].append(proofs[f['name']['localname']])
1035
1036        if self.state_filename: self.write_state()
1037        self.state_lock.release()
1038        return
1039
1040
1041    def add_kit(self, e, kit):
1042        """
1043        Add a Software object created from the list of (install, location)
1044        tuples passed as kit  to the software attribute of an object e.  We
1045        do this enough to break out the code, but it's kind of a hack to
1046        avoid changing the old tuple rep.
1047        """
1048
1049        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1050
1051        if isinstance(e.software, list): e.software.extend(s)
1052        else: e.software = s
1053
1054    def append_experiment_authorization(self, expid, attrs, 
1055            need_state_lock=True):
1056        """
1057        Append the authorization information to system state
1058        """
1059
1060        for p, a in attrs:
1061            self.auth.set_attribute(p, a)
1062        self.auth.save()
1063
1064        if need_state_lock: self.state_lock.acquire()
1065        self.state[expid]['auth'].update(attrs)
1066        if self.state_filename: self.write_state()
1067        if need_state_lock: self.state_lock.release()
1068
1069    def clear_experiment_authorization(self, expid, need_state_lock=True):
1070        """
1071        Attrs is a set of attribute principal pairs that need to be removed
1072        from the authenticator.  Remove them and save the authenticator.
1073        """
1074
1075        if need_state_lock: self.state_lock.acquire()
1076        if expid in self.state and 'auth' in self.state[expid]:
1077            for p, a in self.state[expid]['auth']:
1078                self.auth.unset_attribute(p, a)
1079            self.state[expid]['auth'] = set()
1080        if self.state_filename: self.write_state()
1081        if need_state_lock: self.state_lock.release()
1082        self.auth.save()
1083
1084
1085    def create_experiment_state(self, fid, req, expid, expcert,
1086            state='starting'):
1087        """
1088        Create the initial entry in the experiment's state.  The expid and
1089        expcert are the experiment's fedid and certifacte that represents that
1090        ID, which are installed in the experiment state.  If the request
1091        includes a suggested local name that is used if possible.  If the local
1092        name is already taken by an experiment owned by this user that has
1093        failed, it is overwritten.  Otherwise new letters are added until a
1094        valid localname is found.  The generated local name is returned.
1095        """
1096
1097        if req.has_key('experimentID') and \
1098                req['experimentID'].has_key('localname'):
1099            overwrite = False
1100            eid = req['experimentID']['localname']
1101            # If there's an old failed experiment here with the same local name
1102            # and accessible by this user, we'll overwrite it, otherwise we'll
1103            # fall through and do the collision avoidance.
1104            old_expid = self.get_experiment_fedid(eid)
1105            if old_expid:
1106                users_experiment = True
1107                try:
1108                    self.check_experiment_access(fid, old_expid)
1109                except service_error, e:
1110                    if e.code == service_error.access: users_experiment = False
1111                    else: raise e
1112                if users_experiment:
1113                    self.state_lock.acquire()
1114                    status = self.state[eid].get('experimentStatus', None)
1115                    if status and status == 'failed':
1116                        # remove the old access attributes
1117                        self.clear_experiment_authorization(eid,
1118                                need_state_lock=False)
1119                        overwrite = True
1120                        del self.state[eid]
1121                        del self.state[old_expid]
1122                    self.state_lock.release()
1123                else:
1124                    self.log.info('Experiment %s exists, ' % eid + \
1125                            'but this user cannot access it')
1126            self.state_lock.acquire()
1127            while (self.state.has_key(eid) and not overwrite):
1128                eid += random.choice(string.ascii_letters)
1129            # Initial state
1130            self.state[eid] = {
1131                    'experimentID' : \
1132                            [ { 'localname' : eid }, {'fedid': expid } ],
1133                    'experimentStatus': state,
1134                    'experimentAccess': { 'X509' : expcert },
1135                    'owner': fid,
1136                    'log' : [],
1137                    'auth': set(),
1138                }
1139            self.state[expid] = self.state[eid]
1140            if self.state_filename: self.write_state()
1141            self.state_lock.release()
1142        else:
1143            eid = self.exp_stem
1144            for i in range(0,5):
1145                eid += random.choice(string.ascii_letters)
1146            self.state_lock.acquire()
1147            while (self.state.has_key(eid)):
1148                eid = self.exp_stem
1149                for i in range(0,5):
1150                    eid += random.choice(string.ascii_letters)
1151            # Initial state
1152            self.state[eid] = {
1153                    'experimentID' : \
1154                            [ { 'localname' : eid }, {'fedid': expid } ],
1155                    'experimentStatus': state,
1156                    'experimentAccess': { 'X509' : expcert },
1157                    'owner': fid,
1158                    'log' : [],
1159                    'auth': set(),
1160                }
1161            self.state[expid] = self.state[eid]
1162            if self.state_filename: self.write_state()
1163            self.state_lock.release()
1164
1165        # Let users touch the state.  Authorize this fid and the expid itself
1166        # to touch the experiment, as well as allowing th eoverrides.
1167        self.append_experiment_authorization(eid, 
1168                set([(fid, expid), (expid,expid)] + \
1169                        [ (o, expid) for o in self.overrides]))
1170
1171        return eid
1172
1173
1174    def allocate_ips_to_topo(self, top):
1175        """
1176        Add an ip4_address attribute to all the hosts in the topology, based on
1177        the shared substrates on which they sit.  An /etc/hosts file is also
1178        created and returned as a list of hostfiles entries.  We also return
1179        the allocator, because we may need to allocate IPs to portals
1180        (specifically DRAGON portals).
1181        """
1182        subs = sorted(top.substrates, 
1183                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1184                reverse=True)
1185        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1186        ifs = { }
1187        hosts = [ ]
1188
1189        for idx, s in enumerate(subs):
1190            net_size = len(s.interfaces)+2
1191
1192            a = ips.allocate(net_size)
1193            if a :
1194                base, num = a
1195                if num < net_size: 
1196                    raise service_error(service_error.internal,
1197                            "Allocator returned wrong number of IPs??")
1198            else:
1199                raise service_error(service_error.req, 
1200                        "Cannot allocate IP addresses")
1201            mask = ips.min_alloc
1202            while mask < net_size:
1203                mask *= 2
1204
1205            netmask = ((2**32-1) ^ (mask-1))
1206
1207            base += 1
1208            for i in s.interfaces:
1209                i.attribute.append(
1210                        topdl.Attribute('ip4_address', 
1211                            "%s" % ip_addr(base)))
1212                i.attribute.append(
1213                        topdl.Attribute('ip4_netmask', 
1214                            "%s" % ip_addr(int(netmask))))
1215
1216                hname = i.element.name
1217                if ifs.has_key(hname):
1218                    hosts.append("%s\t%s-%s %s-%d" % \
1219                            (ip_addr(base), hname, s.name, hname,
1220                                ifs[hname]))
1221                else:
1222                    ifs[hname] = 0
1223                    hosts.append("%s\t%s-%s %s-%d %s" % \
1224                            (ip_addr(base), hname, s.name, hname,
1225                                ifs[hname], hname))
1226
1227                ifs[hname] += 1
1228                base += 1
1229        return hosts, ips
1230
1231    def get_access_to_testbeds(self, testbeds, fid, allocated, 
1232            tbparam, masters, tbmap, expid=None, expcert=None):
1233        for tb in testbeds:
1234            self.get_access(tb, tbparam, fid, masters, tbmap, expid,
1235                    expcert)
1236            allocated[tb] = 1
1237
1238    def get_access(self, tb, tbparam,fid, masters, tbmap, expid=None, 
1239            expcert=None):
1240        """
1241        Get access to testbed through fedd and set the parameters for that tb
1242        """
1243        def get_export_project(svcs):
1244            """
1245            Look through for the list of federated_service for this testbed
1246            objects for a project_export service, and extract the project
1247            parameter.
1248            """
1249
1250            pe = [s for s in svcs if s.name=='project_export']
1251            if len(pe) == 1:
1252                return pe[0].params.get('project', None)
1253            elif len(pe) == 0:
1254                return None
1255            else:
1256                raise service_error(service_error.req,
1257                        "More than one project export is not supported")
1258
1259        def add_services(svcs, type, slist, keys):
1260            """
1261            Add the given services to slist.  type is import or export.  Also
1262            add a mapping entry from the assigned id to the original service
1263            record.
1264            """
1265            for i, s in enumerate(svcs):
1266                idx = '%s%d' % (type, i)
1267                keys[idx] = s
1268                sr = {'id': idx, 'name': s.name, 'visibility': type }
1269                if s.params:
1270                    sr['fedAttr'] = [ { 'attribute': k, 'value': v } \
1271                            for k, v in s.params.items()]
1272                slist.append(sr)
1273
1274        uri = tbmap.get(testbed_base(tb), None)
1275        if not uri:
1276            raise service_error(service_error.server_config, 
1277                    "Unknown testbed: %s" % tb)
1278
1279        export_svcs = masters.get(tb,[])
1280        import_svcs = [ s for m in masters.values() \
1281                for s in m \
1282                    if tb in s.importers ]
1283
1284        export_project = get_export_project(export_svcs)
1285        # Compose the credential list so that IDs come before attributes
1286        creds = set()
1287        keys = set()
1288        certs = self.auth.get_creds_for_principal(fid)
1289        if expid:
1290            certs.update(self.auth.get_creds_for_principal(expid))
1291        for c in certs:
1292            keys.add(c.issuer_cert())
1293            creds.add(c.attribute_cert())
1294        creds = list(keys) + list(creds)
1295
1296        if expcert: cert, pw = expcert, None
1297        else: cert, pw = self.cert_file, self.cert_pw
1298
1299        # Request credentials
1300        req = {
1301                'abac_credential': creds,
1302            }
1303        # Make the service request from the services we're importing and
1304        # exporting.  Keep track of the export request ids so we can
1305        # collect the resulting info from the access response.
1306        e_keys = { }
1307        if import_svcs or export_svcs:
1308            slist = []
1309            add_services(import_svcs, 'import', slist, e_keys)
1310            add_services(export_svcs, 'export', slist, e_keys)
1311            req['service'] = slist
1312
1313        if self.local_access.has_key(uri):
1314            # Local access call
1315            req = { 'RequestAccessRequestBody' : req }
1316            r = self.local_access[uri].RequestAccess(req, 
1317                    fedid(file=self.cert_file))
1318            r = { 'RequestAccessResponseBody' : r }
1319        else:
1320            r = self.call_RequestAccess(uri, req, cert, pw, self.trusted_certs)
1321
1322        if r.has_key('RequestAccessResponseBody'):
1323            # Through to here we have a valid response, not a fault.
1324            # Access denied is a fault, so something better or worse than
1325            # access denied has happened.
1326            r = r['RequestAccessResponseBody']
1327            self.log.debug("[get_access] Access granted")
1328        else:
1329            raise service_error(service_error.protocol,
1330                        "Bad proxy response")
1331        if 'proof' not in r:
1332            raise service_error(service_error.protocol,
1333                        "Bad access response (no access proof)")
1334       
1335        tbparam[tb] = { 
1336                "allocID" : r['allocID'],
1337                "uri": uri,
1338                "proof": [ r['proof'] ],
1339                }
1340
1341        # Collect the responses corresponding to the services this testbed
1342        # exports.  These will be the service requests that we will include in
1343        # the start segment requests (with appropriate visibility values) to
1344        # import and export the segments.
1345        for s in r.get('service', []):
1346            id = s.get('id', None)
1347            if id and id in e_keys:
1348                e_keys[id].reqs.append(s)
1349
1350        # Add attributes to parameter space.  We don't allow attributes to
1351        # overlay any parameters already installed.
1352        for a in r.get('fedAttr', []):
1353            try:
1354                if a['attribute'] and \
1355                        isinstance(a['attribute'], basestring)\
1356                        and not tbparam[tb].has_key(a['attribute'].lower()):
1357                    tbparam[tb][a['attribute'].lower()] = a['value']
1358            except KeyError:
1359                self.log.error("Bad attribute in response: %s" % a)
1360
1361
1362    def split_topology(self, top, topo, testbeds):
1363        """
1364        Create the sub-topologies that are needed for experiment instantiation.
1365        """
1366        for tb in testbeds:
1367            topo[tb] = top.clone()
1368            # copy in for loop allows deletions from the original
1369            for e in [ e for e in topo[tb].elements]:
1370                etb = e.get_attribute('testbed')
1371                # NB: elements without a testbed attribute won't appear in any
1372                # sub topologies. 
1373                if not etb or etb != tb:
1374                    for i in e.interface:
1375                        for s in i.subs:
1376                            try:
1377                                s.interfaces.remove(i)
1378                            except ValueError:
1379                                raise service_error(service_error.internal,
1380                                        "Can't remove interface??")
1381                    topo[tb].elements.remove(e)
1382            topo[tb].make_indices()
1383
1384    def confirm_software(self, top):
1385        """
1386        Make sure that the software to be loaded in the topo is all available
1387        before we begin making access requests, etc.  This is a subset of
1388        wrangle_software.
1389        """
1390        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1391        pkgs.update([x.location for e in top.elements for x in e.software])
1392
1393        for pkg in pkgs:
1394            loc = pkg
1395
1396            scheme, host, path = urlparse(loc)[0:3]
1397            dest = os.path.basename(path)
1398            if not scheme:
1399                if not loc.startswith('/'):
1400                    loc = "/%s" % loc
1401                loc = "file://%s" %loc
1402            # NB: if scheme was found, loc == pkg
1403            try:
1404                u = urlopen(loc)
1405                u.close()
1406            except Exception, e:
1407                raise service_error(service_error.req, 
1408                        "Cannot open %s: %s" % (loc, e))
1409        return True
1410
1411    def wrangle_software(self, expid, top, topo, tbparams):
1412        """
1413        Copy software out to the repository directory, allocate permissions and
1414        rewrite the segment topologies to look for the software in local
1415        places.
1416        """
1417
1418        # Copy the rpms and tarfiles to a distribution directory from
1419        # which the federants can retrieve them
1420        linkpath = "%s/software" %  expid
1421        softdir ="%s/%s" % ( self.repodir, linkpath)
1422        softmap = { }
1423
1424        # self.fedkit and self.gateway kit are lists of tuples of
1425        # (install_location, download_location) this extracts the download
1426        # locations.
1427        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1428        pkgs.update([x.location for e in top.elements for x in e.software])
1429        try:
1430            os.makedirs(softdir)
1431        except EnvironmentError, e:
1432            raise service_error(
1433                    "Cannot create software directory: %s" % e)
1434        # The actual copying.  Everything's converted into a url for copying.
1435        auth_attrs = set()
1436        for pkg in pkgs:
1437            loc = pkg
1438
1439            scheme, host, path = urlparse(loc)[0:3]
1440            dest = os.path.basename(path)
1441            if not scheme:
1442                if not loc.startswith('/'):
1443                    loc = "/%s" % loc
1444                loc = "file://%s" %loc
1445            # NB: if scheme was found, loc == pkg
1446            try:
1447                u = urlopen(loc)
1448            except Exception, e:
1449                raise service_error(service_error.req, 
1450                        "Cannot open %s: %s" % (loc, e))
1451            try:
1452                f = open("%s/%s" % (softdir, dest) , "w")
1453                self.log.debug("Writing %s/%s" % (softdir,dest) )
1454                data = u.read(4096)
1455                while data:
1456                    f.write(data)
1457                    data = u.read(4096)
1458                f.close()
1459                u.close()
1460            except Exception, e:
1461                raise service_error(service_error.internal,
1462                        "Could not copy %s: %s" % (loc, e))
1463            path = re.sub("/tmp", "", linkpath)
1464            # XXX
1465            softmap[pkg] = \
1466                    "%s/%s/%s" %\
1467                    ( self.repo_url, path, dest)
1468
1469            # Allow the individual segments to access the software by assigning
1470            # an attribute to each testbed allocation that encodes the data to
1471            # be released.  This expression collects the data for each run of
1472            # the loop.
1473            auth_attrs.update([
1474                (tbparams[tb]['allocID']['fedid'], "/%s/%s" % ( path, dest)) \
1475                        for tb in tbparams.keys()])
1476
1477        self.append_experiment_authorization(expid, auth_attrs)
1478
1479        # Convert the software locations in the segments into the local
1480        # copies on this host
1481        for soft in [ s for tb in topo.values() \
1482                for e in tb.elements \
1483                    if getattr(e, 'software', False) \
1484                        for s in e.software ]:
1485            if softmap.has_key(soft.location):
1486                soft.location = softmap[soft.location]
1487
1488
1489    def new_experiment(self, req, fid):
1490        """
1491        The external interface to empty initial experiment creation called from
1492        the dispatcher.
1493
1494        Creates a working directory, splits the incoming description using the
1495        splitter script and parses out the avrious subsections using the
1496        lcasses above.  Once each sub-experiment is created, use pooled threads
1497        to instantiate them and start it all up.
1498        """
1499        req = req.get('NewRequestBody', None)
1500        if not req:
1501            raise service_error(service_error.req,
1502                    "Bad request format (no NewRequestBody)")
1503
1504        if self.auth.import_credentials(data_list=req.get('credential', [])):
1505            self.auth.save()
1506
1507        access_ok, proof = self.auth.check_attribute(fid, 'new', 
1508                with_proof=True)
1509
1510        if not access_ok:
1511            raise service_error(service_error.access, "New access denied",
1512                    proof=[proof])
1513
1514        try:
1515            tmpdir = tempfile.mkdtemp(prefix="split-")
1516        except EnvironmentError:
1517            raise service_error(service_error.internal, "Cannot create tmp dir")
1518
1519        try:
1520            access_user = self.accessdb[fid]
1521        except KeyError:
1522            raise service_error(service_error.internal,
1523                    "Access map and authorizer out of sync in " + \
1524                            "new_experiment for fedid %s"  % fid)
1525
1526        # Generate an ID for the experiment (slice) and a certificate that the
1527        # allocator can use to prove they own it.  We'll ship it back through
1528        # the encrypted connection.  If the requester supplied one, use it.
1529        if 'experimentAccess' in req and 'X509' in req['experimentAccess']:
1530            expcert = req['experimentAccess']['X509']
1531            expid = fedid(certstr=expcert)
1532            self.state_lock.acquire()
1533            if expid in self.state:
1534                self.state_lock.release()
1535                raise service_error(service_error.req, 
1536                        'fedid %s identifies an existing experiment' % expid)
1537            self.state_lock.release()
1538        else:
1539            (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1540
1541        #now we're done with the tmpdir, and it should be empty
1542        if self.cleanup:
1543            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1544            os.rmdir(tmpdir)
1545        else:
1546            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1547
1548        eid = self.create_experiment_state(fid, req, expid, expcert, 
1549                state='empty')
1550
1551        rv = {
1552                'experimentID': [
1553                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1554                ],
1555                'experimentStatus': 'empty',
1556                'experimentAccess': { 'X509' : expcert },
1557                'proof': proof.to_dict(),
1558            }
1559
1560        return rv
1561
1562    # create_experiment sub-functions
1563
1564    @staticmethod
1565    def get_experiment_key(req, field='experimentID'):
1566        """
1567        Parse the experiment identifiers out of the request (the request body
1568        tag has been removed).  Specifically this pulls either the fedid or the
1569        localname out of the experimentID field.  A fedid is preferred.  If
1570        neither is present or the request does not contain the fields,
1571        service_errors are raised.
1572        """
1573        # Get the experiment access
1574        exp = req.get(field, None)
1575        if exp:
1576            if exp.has_key('fedid'):
1577                key = exp['fedid']
1578            elif exp.has_key('localname'):
1579                key = exp['localname']
1580            else:
1581                raise service_error(service_error.req, "Unknown lookup type")
1582        else:
1583            raise service_error(service_error.req, "No request?")
1584
1585        return key
1586
1587    def get_experiment_ids_and_start(self, key, tmpdir):
1588        """
1589        Get the experiment name, id and access certificate from the state, and
1590        set the experiment state to 'starting'.  returns a triple (fedid,
1591        localname, access_cert_file). The access_cert_file is a copy of the
1592        contents of the access certificate, created in the tempdir with
1593        restricted permissions.  If things are confused, raise an exception.
1594        """
1595
1596        expid = eid = None
1597        self.state_lock.acquire()
1598        if self.state.has_key(key):
1599            self.state[key]['experimentStatus'] = "starting"
1600            for e in self.state[key].get('experimentID',[]):
1601                if not expid and e.has_key('fedid'):
1602                    expid = e['fedid']
1603                elif not eid and e.has_key('localname'):
1604                    eid = e['localname']
1605            if 'experimentAccess' in self.state[key] and \
1606                    'X509' in self.state[key]['experimentAccess']:
1607                expcert = self.state[key]['experimentAccess']['X509']
1608            else:
1609                expcert = None
1610        self.state_lock.release()
1611
1612        # make a protected copy of the access certificate so the experiment
1613        # controller can act as the experiment principal.
1614        if expcert:
1615            expcert_file = self.make_temp_certfile(expcert, tmpdir)
1616            if not expcert_file:
1617                raise service_error(service_error.internal, 
1618                        "Cannot create temp cert file?")
1619        else:
1620            expcert_file = None
1621
1622        return (eid, expid, expcert_file)
1623
1624    def get_topology(self, req, tmpdir):
1625        """
1626        Get the ns2 content and put it into a file for parsing.  Call the local
1627        or remote parser and return the topdl.Topology.  Errors result in
1628        exceptions.  req is the request and tmpdir is a work directory.
1629        """
1630
1631        # The tcl parser needs to read a file so put the content into that file
1632        descr=req.get('experimentdescription', None)
1633        if descr:
1634            if 'ns2description' in descr:
1635                file_content=descr['ns2description']
1636            elif 'topdldescription' in descr:
1637                return topdl.Topology(**descr['topdldescription'])
1638            else:
1639                raise service_error(service_error.req, 
1640                        'Unknown experiment description type')
1641        else:
1642            raise service_error(service_error.req, "No experiment description")
1643
1644
1645        if self.splitter_url:
1646            self.log.debug("Calling remote topdl translator at %s" % \
1647                    self.splitter_url)
1648            top = self.remote_ns2topdl(self.splitter_url, file_content)
1649        else:
1650            tclfile = os.path.join(tmpdir, "experiment.tcl")
1651            if file_content:
1652                try:
1653                    f = open(tclfile, 'w')
1654                    f.write(file_content)
1655                    f.close()
1656                except EnvironmentError:
1657                    raise service_error(service_error.internal,
1658                            "Cannot write temp experiment description")
1659            else:
1660                raise service_error(service_error.req, 
1661                        "Only ns2descriptions supported")
1662            pid = "dummy"
1663            gid = "dummy"
1664            eid = "dummy"
1665
1666            tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
1667                str(self.muxmax), '-m', 'dummy']
1668
1669            tclcmd.extend([pid, gid, eid, tclfile])
1670
1671            self.log.debug("running local splitter %s", " ".join(tclcmd))
1672            # This is just fantastic.  As a side effect the parser copies
1673            # tb_compat.tcl into the current directory, so that directory
1674            # must be writable by the fedd user.  Doing this in the
1675            # temporary subdir ensures this is the case.
1676            tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
1677                    cwd=tmpdir)
1678            split_data = tclparser.stdout
1679
1680            top = topdl.topology_from_xml(file=split_data, top="experiment")
1681            os.remove(tclfile)
1682
1683        return top
1684
1685    def get_testbed_services(self, req, testbeds):
1686        """
1687        Parse the services section of the request into into two dicts mapping
1688        testbed to lists of federated_service objects.  The first lists all
1689        exporters of services, and the second all exporters of services that
1690        need control portals int the experiment.
1691        """
1692        masters = { }
1693        pmasters = { }
1694        for s in req.get('service', []):
1695            # If this is a service request with the importall field
1696            # set, fill it out.
1697
1698            if s.get('importall', False):
1699                s['import'] = [ tb for tb in testbeds \
1700                        if tb not in s.get('export',[])]
1701                del s['importall']
1702
1703            # Add the service to masters
1704            for tb in s.get('export', []):
1705                if s.get('name', None):
1706
1707                    params = { }
1708                    for a in s.get('fedAttr', []):
1709                        params[a.get('attribute', '')] = a.get('value','')
1710
1711                    fser = federated_service(name=s['name'],
1712                            exporter=tb, importers=s.get('import',[]),
1713                            params=params)
1714                    if fser.name == 'hide_hosts' \
1715                            and 'hosts' not in fser.params:
1716                        fser.params['hosts'] = \
1717                                ",".join(tb_hosts.get(fser.exporter, []))
1718                    if tb in masters: masters[tb].append(fser)
1719                    else: masters[tb] = [fser]
1720
1721                    if fser.portal:
1722                        if tb not in pmasters: pmasters[tb] = [ fser ]
1723                        else: pmasters[tb].append(fser)
1724                else:
1725                    self.log.error('Testbed service does not have name " + \
1726                            "and importers')
1727        return masters, pmasters
1728
1729    def generate_keys_and_hosts(self, tmpdir, expid, hosts, tbparams):
1730        """
1731        Create the ssh keys necessary for interconnecting the potral nodes and
1732        the global hosts file for letting each segment know about the IP
1733        addresses in play.  Save these into the repo.  Add attributes to the
1734        autorizer allowing access controllers to download them and return a set
1735        of attributes that inform the segments where to find this stuff.  Mau
1736        raise service_errors in if there are problems.
1737        """
1738        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1739        gw_secretkey_base = "fed.%s" % self.ssh_type
1740        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1741        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1742
1743        try:
1744            self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1745        except ValueError:
1746            raise service_error(service_error.server_config, 
1747                    "Bad key type (%s)" % self.ssh_type)
1748
1749
1750        # Copy configuration files into the remote file store
1751        # The config urlpath
1752        configpath = "/%s/config" % expid
1753        # The config file system location
1754        configdir ="%s%s" % ( self.repodir, configpath)
1755        try:
1756            os.makedirs(configdir)
1757        except EnvironmentError, e:
1758            raise service_error(service_error.internal,
1759                    "Cannot create config directory: %s" % e)
1760        try:
1761            f = open("%s/hosts" % configdir, "w")
1762            print >> f, string.join(hosts, '\n')
1763            f.close()
1764        except EnvironmentError, e:
1765            raise service_error(service_error.internal, 
1766                    "Cannot write hosts file: %s" % e)
1767        try:
1768            copy_file("%s" % gw_pubkey, "%s/%s" % \
1769                    (configdir, gw_pubkey_base))
1770            copy_file("%s" % gw_secretkey, "%s/%s" % \
1771                    (configdir, gw_secretkey_base))
1772        except EnvironmentError, e:
1773            raise service_error(service_error.internal, 
1774                    "Cannot copy keyfiles: %s" % e)
1775
1776        # Allow the individual testbeds to access the configuration files,
1777        # again by setting an attribute for the relevant pathnames on each
1778        # allocation principal.  Yeah, that's a long list comprehension.
1779        self.append_experiment_authorization(expid, set([
1780            (tbparams[tb]['allocID']['fedid'], "%s/%s" % (configpath, f)) \
1781                    for tb in tbparams.keys() \
1782                        for f in ("hosts", gw_secretkey_base, gw_pubkey_base)]))
1783
1784        attrs = [ 
1785                {
1786                    'attribute': 'ssh_pubkey', 
1787                    'value': '%s/%s/config/%s' % \
1788                            (self.repo_url, expid, gw_pubkey_base)
1789                },
1790                {
1791                    'attribute': 'ssh_secretkey', 
1792                    'value': '%s/%s/config/%s' % \
1793                            (self.repo_url, expid, gw_secretkey_base)
1794                },
1795                {
1796                    'attribute': 'hosts', 
1797                    'value': '%s/%s/config/hosts' % \
1798                            (self.repo_url, expid)
1799                },
1800            ]
1801        return attrs
1802
1803
1804    def get_vtopo(self, req, fid):
1805        """
1806        Return the stored virtual topology for this experiment
1807        """
1808        rv = None
1809        state = None
1810
1811        req = req.get('VtopoRequestBody', None)
1812        if not req:
1813            raise service_error(service_error.req,
1814                    "Bad request format (no VtopoRequestBody)")
1815        exp = req.get('experiment', None)
1816        if exp:
1817            if exp.has_key('fedid'):
1818                key = exp['fedid']
1819                keytype = "fedid"
1820            elif exp.has_key('localname'):
1821                key = exp['localname']
1822                keytype = "localname"
1823            else:
1824                raise service_error(service_error.req, "Unknown lookup type")
1825        else:
1826            raise service_error(service_error.req, "No request?")
1827
1828        proof = self.check_experiment_access(fid, key)
1829
1830        self.state_lock.acquire()
1831        if self.state.has_key(key):
1832            if self.state[key].has_key('vtopo'):
1833                rv = { 'experiment' : {keytype: key },
1834                        'vtopo': self.state[key]['vtopo'],
1835                        'proof': proof.to_dict(), 
1836                    }
1837            else:
1838                state = self.state[key]['experimentStatus']
1839        self.state_lock.release()
1840
1841        if rv: return rv
1842        else: 
1843            if state:
1844                raise service_error(service_error.partial, 
1845                        "Not ready: %s" % state)
1846            else:
1847                raise service_error(service_error.req, "No such experiment")
1848
1849    def get_vis(self, req, fid):
1850        """
1851        Return the stored visualization for this experiment
1852        """
1853        rv = None
1854        state = None
1855
1856        req = req.get('VisRequestBody', None)
1857        if not req:
1858            raise service_error(service_error.req,
1859                    "Bad request format (no VisRequestBody)")
1860        exp = req.get('experiment', None)
1861        if exp:
1862            if exp.has_key('fedid'):
1863                key = exp['fedid']
1864                keytype = "fedid"
1865            elif exp.has_key('localname'):
1866                key = exp['localname']
1867                keytype = "localname"
1868            else:
1869                raise service_error(service_error.req, "Unknown lookup type")
1870        else:
1871            raise service_error(service_error.req, "No request?")
1872
1873        proof = self.check_experiment_access(fid, key)
1874
1875        self.state_lock.acquire()
1876        if self.state.has_key(key):
1877            if self.state[key].has_key('vis'):
1878                rv =  { 'experiment' : {keytype: key },
1879                        'vis': self.state[key]['vis'],
1880                        'proof': proof.to_dict(), 
1881                        }
1882            else:
1883                state = self.state[key]['experimentStatus']
1884        self.state_lock.release()
1885
1886        if rv: return rv
1887        else:
1888            if state:
1889                raise service_error(service_error.partial, 
1890                        "Not ready: %s" % state)
1891            else:
1892                raise service_error(service_error.req, "No such experiment")
1893
1894   
1895    def save_federant_information(self, allocated, tbparams, eid, vtopo, vis,
1896            top):
1897        """
1898        Store the various data that have changed in the experiment state
1899        between when it was started and the beginning of resource allocation.
1900        This is basically the information about each local allocation.  This
1901        fills in the values of the placeholder allocation in the state.  It
1902        also collects the access proofs and returns them as dicts for a
1903        response message.
1904        """
1905        # save federant information
1906        for k in allocated.keys():
1907            tbparams[k]['federant'] = {
1908                    'name': [ { 'localname' : eid} ],
1909                    'allocID' : tbparams[k]['allocID'],
1910                    'uri': tbparams[k]['uri'],
1911                    'proof': tbparams[k]['proof'],
1912                }
1913
1914        self.state_lock.acquire()
1915        self.state[eid]['vtopo'] = vtopo
1916        self.state[eid]['vis'] = vis
1917        self.state[eid]['experimentdescription'] = \
1918                { 'topdldescription': top.to_dict() }
1919        self.state[eid]['federant'] = \
1920                [ tbparams[tb]['federant'] for tb in tbparams.keys() \
1921                    if tbparams[tb].has_key('federant') ]
1922        # Access proofs for the response message
1923        proofs = [copy.deepcopy(p) for k in tbparams.keys()\
1924                    for p in tbparams[k]['federant']['proof']]
1925        if self.state_filename: 
1926            self.write_state()
1927        self.state_lock.release()
1928        return proofs
1929
1930    def clear_placeholder(self, eid, expid, tmpdir):
1931        """
1932        Clear the placeholder and remove any allocated temporary dir.
1933        """
1934
1935        self.state_lock.acquire()
1936        del self.state[eid]
1937        del self.state[expid]
1938        if self.state_filename: self.write_state()
1939        self.state_lock.release()
1940        if tmpdir and self.cleanup:
1941            self.remove_dirs(tmpdir)
1942
1943    # end of create_experiment sub-functions
1944
1945    def create_experiment(self, req, fid):
1946        """
1947        The external interface to experiment creation called from the
1948        dispatcher.
1949
1950        Creates a working directory, splits the incoming description using the
1951        splitter script and parses out the various subsections using the
1952        classes above.  Once each sub-experiment is created, use pooled threads
1953        to instantiate them and start it all up.
1954        """
1955
1956        req = req.get('CreateRequestBody', None)
1957        if req:
1958            key = self.get_experiment_key(req)
1959        else:
1960            raise service_error(service_error.req,
1961                    "Bad request format (no CreateRequestBody)")
1962
1963        # Import information from the requester
1964        if self.auth.import_credentials(data_list=req.get('credential', [])):
1965            self.auth.save()
1966
1967        # Make sure that the caller can talk to us
1968        proof = self.check_experiment_access(fid, key)
1969
1970        # Install the testbed map entries supplied with the request into a copy
1971        # of the testbed map.
1972        tbmap = dict(self.tbmap)
1973        for m in req.get('testbedmap', []):
1974            if 'testbed' in m and 'uri' in m:
1975                tbmap[m['testbed']] = m['uri']
1976
1977        # a place to work
1978        try:
1979            tmpdir = tempfile.mkdtemp(prefix="split-")
1980            os.mkdir(tmpdir+"/keys")
1981        except EnvironmentError:
1982            raise service_error(service_error.internal, "Cannot create tmp dir")
1983
1984        tbparams = { }
1985
1986        eid, expid, expcert_file = \
1987                self.get_experiment_ids_and_start(key, tmpdir)
1988
1989        # This catches exceptions to clear the placeholder if necessary
1990        try: 
1991            if not (eid and expid):
1992                raise service_error(service_error.internal, 
1993                        "Cannot find local experiment info!?")
1994
1995            top = self.get_topology(req, tmpdir)
1996            self.confirm_software(top)
1997            # Assign the IPs
1998            hosts, ip_allocator = self.allocate_ips_to_topo(top)
1999            # Find the testbeds to look up
2000            tb_hosts = { }
2001            testbeds = [ ]
2002            for e in top.elements:
2003                if isinstance(e, topdl.Computer):
2004                    tb = e.get_attribute('testbed') or 'default'
2005                    if tb in tb_hosts: tb_hosts[tb].append(e.name)
2006                    else: 
2007                        tb_hosts[tb] = [ e.name ]
2008                        testbeds.append(tb)
2009
2010            masters, pmasters = self.get_testbed_services(req, testbeds)
2011            allocated = { }         # Testbeds we can access
2012            topo ={ }               # Sub topologies
2013            connInfo = { }          # Connection information
2014
2015            self.split_topology(top, topo, testbeds)
2016
2017            self.get_access_to_testbeds(testbeds, fid, allocated, 
2018                    tbparams, masters, tbmap, expid, expcert_file)
2019
2020            attrs = self.generate_keys_and_hosts(tmpdir, expid, hosts, tbparams)
2021
2022            part = experiment_partition(self.auth, self.store_url, tbmap,
2023                    self.muxmax, self.direct_transit)
2024            part.add_portals(top, topo, eid, pmasters, tbparams, ip_allocator,
2025                    connInfo, expid)
2026
2027            auth_attrs = set()
2028            # Now get access to the dynamic testbeds (those added above)
2029            for tb in [ t for t in topo if t not in allocated]:
2030                self.get_access(tb, tbparams, fid, masters, tbmap, 
2031                        expid, expcert_file)
2032                allocated[tb] = 1
2033                store_keys = topo[tb].get_attribute('store_keys')
2034                # Give the testbed access to keys it exports or imports
2035                if store_keys:
2036                    auth_attrs.update(set([
2037                        (tbparams[tb]['allocID']['fedid'], sk) \
2038                                for sk in store_keys.split(" ")]))
2039
2040            if auth_attrs:
2041                self.append_experiment_authorization(expid, auth_attrs)
2042
2043            # transit and disconnected testbeds may not have a connInfo entry.
2044            # Fill in the blanks.
2045            for t in allocated.keys():
2046                if not connInfo.has_key(t):
2047                    connInfo[t] = { }
2048
2049            self.wrangle_software(expid, top, topo, tbparams)
2050
2051            vtopo = topdl.topology_to_vtopo(top)
2052            vis = self.genviz(vtopo)
2053            proofs = self.save_federant_information(allocated, tbparams, 
2054                    eid, vtopo, vis, top)
2055        except service_error, e:
2056            # If something goes wrong in the parse (usually an access error)
2057            # clear the placeholder state.  From here on out the code delays
2058            # exceptions.  Failing at this point returns a fault to the remote
2059            # caller.
2060            self.clear_placeholder(eid, expid, tmpdir)
2061            raise e
2062
2063        # Start the background swapper and return the starting state.  From
2064        # here on out, the state will stick around a while.
2065
2066        # Create a logger that logs to the experiment's state object as well as
2067        # to the main log file.
2068        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
2069        alloc_collector = self.list_log(self.state[eid]['log'])
2070        h = logging.StreamHandler(alloc_collector)
2071        # XXX: there should be a global one of these rather than repeating the
2072        # code.
2073        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2074                    '%d %b %y %H:%M:%S'))
2075        alloc_log.addHandler(h)
2076
2077        # Start a thread to do the resource allocation
2078        t  = Thread(target=self.allocate_resources,
2079                args=(allocated, masters, eid, expid, tbparams, 
2080                    top, topo, tmpdir, alloc_log, alloc_collector, attrs,
2081                    connInfo, tbmap, expcert_file),
2082                name=eid)
2083        t.start()
2084
2085        rv = {
2086                'experimentID': [
2087                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2088                ],
2089                'experimentStatus': 'starting',
2090                'proof': [ proof.to_dict() ] + proofs,
2091            }
2092
2093        return rv
2094   
2095    def get_experiment_fedid(self, key):
2096        """
2097        find the fedid associated with the localname key in the state database.
2098        """
2099
2100        rv = None
2101        self.state_lock.acquire()
2102        if self.state.has_key(key):
2103            if isinstance(self.state[key], dict):
2104                try:
2105                    kl = [ f['fedid'] for f in \
2106                            self.state[key]['experimentID']\
2107                                if f.has_key('fedid') ]
2108                except KeyError:
2109                    self.state_lock.release()
2110                    raise service_error(service_error.internal, 
2111                            "No fedid for experiment %s when getting "+\
2112                                    "fedid(!?)" % key)
2113                if len(kl) == 1:
2114                    rv = kl[0]
2115                else:
2116                    self.state_lock.release()
2117                    raise service_error(service_error.internal, 
2118                            "multiple fedids for experiment %s when " +\
2119                                    "getting fedid(!?)" % key)
2120            else:
2121                self.state_lock.release()
2122                raise service_error(service_error.internal, 
2123                        "Unexpected state for %s" % key)
2124        self.state_lock.release()
2125        return rv
2126
2127    def check_experiment_access(self, fid, key):
2128        """
2129        Confirm that the fid has access to the experiment.  Though a request
2130        may be made in terms of a local name, the access attribute is always
2131        the experiment's fedid.
2132        """
2133        if not isinstance(key, fedid):
2134            key = self.get_experiment_fedid(key)
2135
2136        access_ok, proof = self.auth.check_attribute(fid, key, with_proof=True)
2137
2138        if access_ok:
2139            return proof
2140        else:
2141            raise service_error(service_error.access, "Access Denied",
2142                proof)
2143
2144
2145    def get_handler(self, path, fid):
2146        """
2147        Perhaps surprisingly named, this function handles HTTP GET requests to
2148        this server (SOAP requests are POSTs).
2149        """
2150        self.log.info("Get handler %s %s" % (path, fid))
2151        # XXX: log proofs?
2152        if self.auth.check_attribute(fid, path):
2153            return ("%s/%s" % (self.repodir, path), "application/binary")
2154        else:
2155            return (None, None)
2156
2157    def clean_info_response(self, rv, proof):
2158        """
2159        Remove the information in the experiment's state object that is not in
2160        the info response.
2161        """
2162        # Remove the owner info (should always be there, but...)
2163        if rv.has_key('owner'): del rv['owner']
2164        if 'auth' in rv: del rv['auth']
2165
2166        # Convert the log into the allocationLog parameter and remove the
2167        # log entry (with defensive programming)
2168        if rv.has_key('log'):
2169            rv['allocationLog'] = "".join(rv['log'])
2170            del rv['log']
2171        else:
2172            rv['allocationLog'] = ""
2173
2174        if rv['experimentStatus'] != 'active':
2175            if rv.has_key('federant'): del rv['federant']
2176        else:
2177            # remove the allocationID and uri info from each federant
2178            for f in rv.get('federant', []):
2179                if f.has_key('allocID'): del f['allocID']
2180                if f.has_key('uri'): del f['uri']
2181        rv['proof'] = proof.to_dict()
2182
2183        return rv
2184
2185    def get_info(self, req, fid):
2186        """
2187        Return all the stored info about this experiment
2188        """
2189        rv = None
2190
2191        req = req.get('InfoRequestBody', None)
2192        if not req:
2193            raise service_error(service_error.req,
2194                    "Bad request format (no InfoRequestBody)")
2195        exp = req.get('experiment', None)
2196        if exp:
2197            if exp.has_key('fedid'):
2198                key = exp['fedid']
2199                keytype = "fedid"
2200            elif exp.has_key('localname'):
2201                key = exp['localname']
2202                keytype = "localname"
2203            else:
2204                raise service_error(service_error.req, "Unknown lookup type")
2205        else:
2206            raise service_error(service_error.req, "No request?")
2207
2208        proof = self.check_experiment_access(fid, key)
2209
2210        # The state may be massaged by the service function that called
2211        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
2212        # state.
2213        self.state_lock.acquire()
2214        if self.state.has_key(key):
2215            rv = copy.deepcopy(self.state[key])
2216        self.state_lock.release()
2217
2218        if rv:
2219            return self.clean_info_response(rv, proof)
2220        else:
2221            raise service_error(service_error.req, "No such experiment")
2222
2223    def get_multi_info(self, req, fid):
2224        """
2225        Return all the stored info that this fedid can access
2226        """
2227        rv = { 'info': [ ], 'proof': [ ] }
2228
2229        self.state_lock.acquire()
2230        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2231            try:
2232                proof = self.check_experiment_access(fid, key)
2233            except service_error, e:
2234                if e.code == service_error.access:
2235                    continue
2236                else:
2237                    self.state_lock.release()
2238                    raise e
2239
2240            if self.state.has_key(key):
2241                e = copy.deepcopy(self.state[key])
2242                e = self.clean_info_response(e, proof)
2243                rv['info'].append(e)
2244                rv['proof'].append(proof.to_dict())
2245        self.state_lock.release()
2246        return rv
2247
2248    def check_termination_status(self, fed_exp, force):
2249        """
2250        Confirm that the experiment is sin a valid state to stop (or force it)
2251        return the state - invalid states for deletion and force settings cause
2252        exceptions.
2253        """
2254        self.state_lock.acquire()
2255        status = fed_exp.get('experimentStatus', None)
2256
2257        if status:
2258            if status in ('starting', 'terminating'):
2259                if not force:
2260                    self.state_lock.release()
2261                    raise service_error(service_error.partial, 
2262                            'Experiment still being created or destroyed')
2263                else:
2264                    self.log.warning('Experiment in %s state ' % status + \
2265                            'being terminated by force.')
2266            self.state_lock.release()
2267            return status
2268        else:
2269            # No status??? trouble
2270            self.state_lock.release()
2271            raise service_error(service_error.internal,
2272                    "Experiment has no status!?")
2273
2274
2275    def get_termination_info(self, fed_exp):
2276        ids = []
2277        term_params = { }
2278        self.state_lock.acquire()
2279        #  experimentID is a list of dicts that are self-describing
2280        #  identifiers.  This finds all the fedids and localnames - the
2281        #  keys of self.state - and puts them into ids, which is used to delete
2282        #  the state after everything is swapped out.
2283        for id in fed_exp.get('experimentID', []):
2284            if 'fedid' in id: 
2285                ids.append(id['fedid'])
2286                repo = "%s" % id['fedid']
2287            if 'localname' in id: ids.append(id['localname'])
2288
2289        # Get the experimentAccess - the principal for this experiment.  It
2290        # is this principal to which credentials have been delegated, and
2291        # as which the experiment controller must act.
2292        if 'experimentAccess' in fed_exp and \
2293                'X509' in fed_exp['experimentAccess']:
2294            expcert = fed_exp['experimentAccess']['X509']
2295        else:
2296            expcert = None
2297
2298        # Collect the allocation/segment ids into a dict keyed by the fedid
2299        # of the allocation (or a monotonically increasing integer) that
2300        # contains a tuple of uri, aid (which is a dict...)
2301        for i, fed in enumerate(fed_exp.get('federant', [])):
2302            try:
2303                uri = fed['uri']
2304                aid = fed['allocID']
2305                k = fed['allocID'].get('fedid', i)
2306            except KeyError, e:
2307                continue
2308            term_params[k] = (uri, aid)
2309        # Change the experiment state
2310        fed_exp['experimentStatus'] = 'terminating'
2311        if self.state_filename: self.write_state()
2312        self.state_lock.release()
2313
2314        return ids, term_params, expcert, repo
2315
2316
2317    def deallocate_resources(self, term_params, expcert, status, force, 
2318            dealloc_log):
2319        tmpdir = None
2320        # This try block makes sure the tempdir is cleared
2321        try:
2322            # If no expcert, try the deallocation as the experiment
2323            # controller instance.
2324            if expcert and self.auth_type != 'legacy': 
2325                try:
2326                    tmpdir = tempfile.mkdtemp(prefix="term-")
2327                except EnvironmentError:
2328                    raise service_error(service_error.internal, 
2329                            "Cannot create tmp dir")
2330                cert_file = self.make_temp_certfile(expcert, tmpdir)
2331                pw = None
2332            else: 
2333                cert_file = self.cert_file
2334                pw = self.cert_pwd
2335
2336            # Stop everyone.  NB, wait_for_all waits until a thread starts
2337            # and then completes, so we can't wait if nothing starts.  So,
2338            # no tbparams, no start.
2339            if len(term_params) > 0:
2340                tp = thread_pool(self.nthreads)
2341                for k, (uri, aid) in term_params.items():
2342                    # Create and start a thread to stop the segment
2343                    tp.wait_for_slot()
2344                    t  = pooled_thread(\
2345                            target=self.terminate_segment(log=dealloc_log,
2346                                testbed=uri,
2347                                cert_file=cert_file, 
2348                                cert_pwd=pw,
2349                                trusted_certs=self.trusted_certs,
2350                                caller=self.call_TerminateSegment),
2351                            args=(uri, aid), name=k,
2352                            pdata=tp, trace_file=self.trace_file)
2353                    t.start()
2354                # Wait for completions
2355                tp.wait_for_all_done()
2356
2357            # release the allocations (failed experiments have done this
2358            # already, and starting experiments may be in odd states, so we
2359            # ignore errors releasing those allocations
2360            try: 
2361                for k, (uri, aid)  in term_params.items():
2362                    self.release_access(None, aid, uri=uri,
2363                            cert_file=cert_file, cert_pwd=pw)
2364            except service_error, e:
2365                if status != 'failed' and not force:
2366                    raise e
2367
2368        # Clean up the tmpdir no matter what
2369        finally:
2370            if tmpdir: self.remove_dirs(tmpdir)
2371
2372    def terminate_experiment(self, req, fid):
2373        """
2374        Swap this experiment out on the federants and delete the shared
2375        information
2376        """
2377        tbparams = { }
2378        req = req.get('TerminateRequestBody', None)
2379        if not req:
2380            raise service_error(service_error.req,
2381                    "Bad request format (no TerminateRequestBody)")
2382
2383        key = self.get_experiment_key(req, 'experiment')
2384        proof = self.check_experiment_access(fid, key)
2385        exp = req.get('experiment', False)
2386        force = req.get('force', False)
2387
2388        dealloc_list = [ ]
2389
2390
2391        # Create a logger that logs to the dealloc_list as well as to the main
2392        # log file.
2393        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2394        h = logging.StreamHandler(self.list_log(dealloc_list))
2395        # XXX: there should be a global one of these rather than repeating the
2396        # code.
2397        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2398                    '%d %b %y %H:%M:%S'))
2399        dealloc_log.addHandler(h)
2400
2401        self.state_lock.acquire()
2402        fed_exp = self.state.get(key, None)
2403        self.state_lock.release()
2404        repo = None
2405
2406        if fed_exp:
2407            status = self.check_termination_status(fed_exp, force)
2408            ids, term_params, expcert, repo = self.get_termination_info(fed_exp)
2409            self.deallocate_resources(term_params, expcert, status, force, 
2410                    dealloc_log)
2411
2412            # Remove the terminated experiment
2413            self.state_lock.acquire()
2414            for id in ids:
2415                self.clear_experiment_authorization(id, need_state_lock=False)
2416                if id in self.state: del self.state[id]
2417
2418            if self.state_filename: self.write_state()
2419            self.state_lock.release()
2420
2421            # Delete any synch points associated with this experiment.  All
2422            # synch points begin with the fedid of the experiment.
2423            fedid_keys = set(["fedid:%s" % f for f in ids \
2424                    if isinstance(f, fedid)])
2425            for k in self.synch_store.all_keys():
2426                try:
2427                    if len(k) > 45 and k[0:46] in fedid_keys:
2428                        self.synch_store.del_value(k)
2429                except synch_store.BadDeletionError:
2430                    pass
2431            self.write_store()
2432
2433            # Remove software and other cached stuff from the filesystem.
2434            if repo:
2435                self.remove_dirs("%s/%s" % (self.repodir, repo))
2436       
2437            return { 
2438                    'experiment': exp , 
2439                    'deallocationLog': string.join(dealloc_list, ''),
2440                    'proof': [proof.to_dict()],
2441                    }
2442        else:
2443            raise service_error(service_error.req, "No saved state")
2444
2445
2446    def GetValue(self, req, fid):
2447        """
2448        Get a value from the synchronized store
2449        """
2450        req = req.get('GetValueRequestBody', None)
2451        if not req:
2452            raise service_error(service_error.req,
2453                    "Bad request format (no GetValueRequestBody)")
2454       
2455        name = req.get('name', None)
2456        wait = req.get('wait', False)
2457        rv = { 'name': name }
2458
2459        if not name:
2460            raise service_error(service_error.req, "No name?")
2461
2462        access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True)
2463
2464        if access_ok:
2465            self.log.debug("[GetValue] asking for %s " % name)
2466            try:
2467                v = self.synch_store.get_value(name, wait)
2468            except synch_store.RevokedKeyError:
2469                # No more synch on this key
2470                raise service_error(service_error.federant, 
2471                        "Synch key %s revoked" % name)
2472            if v is not None:
2473                rv['value'] = v
2474            rv['proof'] = proof.to_dict()
2475            self.log.debug("[GetValue] got %s from %s" % (v, name))
2476            return rv
2477        else:
2478            raise service_error(service_error.access, "Access Denied",
2479                    proof=proof)
2480       
2481
2482    def SetValue(self, req, fid):
2483        """
2484        Set a value in the synchronized store
2485        """
2486        req = req.get('SetValueRequestBody', None)
2487        if not req:
2488            raise service_error(service_error.req,
2489                    "Bad request format (no SetValueRequestBody)")
2490       
2491        name = req.get('name', None)
2492        v = req.get('value', '')
2493
2494        if not name:
2495            raise service_error(service_error.req, "No name?")
2496
2497        access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True)
2498
2499        if access_ok:
2500            try:
2501                self.synch_store.set_value(name, v)
2502                self.write_store()
2503                self.log.debug("[SetValue] set %s to %s" % (name, v))
2504            except synch_store.CollisionError:
2505                # Translate into a service_error
2506                raise service_error(service_error.req,
2507                        "Value already set: %s" %name)
2508            except synch_store.RevokedKeyError:
2509                # No more synch on this key
2510                raise service_error(service_error.federant, 
2511                        "Synch key %s revoked" % name)
2512                return { 'name': name, 'value': v, 'proof': proof.to_dict() }
2513        else:
2514            raise service_error(service_error.access, "Access Denied",
2515                    proof=proof)
Note: See TracBrowser for help on using the repository browser.