source: fedd/federation/experiment_control.py @ 1d73342

axis_examplecompt_changesinfo-ops
Last change on this file since 1d73342 was 1d73342, checked in by Ted Faber <faber@…>, 13 years ago

Move non ABAC code out

  • Property mode set to 100644
File size: 79.7 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import Lock, Thread, Condition
21from subprocess import call, Popen, PIPE
22from string import join
23
24from urlparse import urlparse
25from urllib2 import urlopen
26
27from util import *
28from fedid import fedid, generate_fedid
29from remote_service import xmlrpc_handler, soap_handler, service_caller
30from service_error import service_error
31from synch_store import synch_store
32from experiment_partition import experiment_partition
33from experiment_control_legacy import experiment_control_legacy
34from authorizer import abac_authorizer
35from thread_pool import thread_pool, pooled_thread
36
37import topdl
38import list_log
39from ip_allocator import ip_allocator
40from ip_addr import ip_addr
41
42
43class nullHandler(logging.Handler):
44    def emit(self, record): pass
45
46fl = logging.getLogger("fedd.experiment_control")
47fl.addHandler(nullHandler())
48
49
50# Right now, no support for composition.
51class federated_service:
52    def __init__(self, name, exporter=None, importers=None, params=None, 
53            reqs=None, portal=None):
54        self.name=name
55        self.exporter=exporter
56        if importers is None: self.importers = []
57        else: self.importers=importers
58        if params is None: self.params = { }
59        else: self.params = params
60        if reqs is None: self.reqs = []
61        else: self.reqs = reqs
62
63        if portal is not None:
64            self.portal = portal
65        else:
66            self.portal = (name in federated_service.needs_portal)
67
68    def __str__(self):
69        return "name %s export %s import %s params %s reqs %s" % \
70                (self.name, self.exporter, self.importers, self.params,
71                        [ (r['name'], r['visibility']) for r in self.reqs] )
72
73    needs_portal = ('SMB', 'seer', 'tmcd', 'project_export', 'seer_master')
74
75class experiment_control_local(experiment_control_legacy):
76    """
77    Control of experiments that this system can directly access.
78
79    Includes experiment creation, termination and information dissemination.
80    Thred safe.
81    """
82
83    class ssh_cmd_timeout(RuntimeError): pass
84   
85    call_RequestAccess = service_caller('RequestAccess')
86    call_ReleaseAccess = service_caller('ReleaseAccess')
87    call_StartSegment = service_caller('StartSegment')
88    call_TerminateSegment = service_caller('TerminateSegment')
89    call_Ns2Topdl = service_caller('Ns2Topdl')
90
91    def __init__(self, config=None, auth=None):
92        """
93        Intialize the various attributes, most from the config object
94        """
95
96        def parse_tarfile_list(tf):
97            """
98            Parse a tarfile list from the configuration.  This is a set of
99            paths and tarfiles separated by spaces.
100            """
101            rv = [ ]
102            if tf is not None:
103                tl = tf.split()
104                while len(tl) > 1:
105                    p, t = tl[0:2]
106                    del tl[0:2]
107                    rv.append((p, t))
108            return rv
109
110        self.list_log = list_log.list_log
111
112        self.cert_file = config.get("experiment_control", "cert_file")
113        if self.cert_file:
114            self.cert_pwd = config.get("experiment_control", "cert_pwd")
115        else:
116            self.cert_file = config.get("globals", "cert_file")
117            self.cert_pwd = config.get("globals", "cert_pwd")
118
119        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
120                or config.get("globals", "trusted_certs")
121
122        self.repodir = config.get("experiment_control", "repodir")
123        self.repo_url = config.get("experiment_control", "repo_url", 
124                "https://users.isi.deterlab.net:23235");
125
126        self.exp_stem = "fed-stem"
127        self.log = logging.getLogger("fedd.experiment_control")
128        set_log_level(config, "experiment_control", self.log)
129        self.muxmax = 2
130        self.nthreads = 10
131        self.randomize_experiments = False
132
133        self.splitter = None
134        self.ssh_keygen = "/usr/bin/ssh-keygen"
135        self.ssh_identity_file = None
136
137
138        self.debug = config.getboolean("experiment_control", "create_debug")
139        self.cleanup = not config.getboolean("experiment_control", 
140                "leave_tmpfiles")
141        self.state_filename = config.get("experiment_control", 
142                "experiment_state")
143        self.store_filename = config.get("experiment_control", 
144                "synch_store")
145        self.store_url = config.get("experiment_control", "store_url")
146        self.splitter_url = config.get("experiment_control", "ns2topdl_uri")
147        self.fedkit = parse_tarfile_list(\
148                config.get("experiment_control", "fedkit"))
149        self.gatewaykit = parse_tarfile_list(\
150                config.get("experiment_control", "gatewaykit"))
151        accessdb_file = config.get("experiment_control", "accessdb")
152
153        self.ssh_pubkey_file = config.get("experiment_control", 
154                "ssh_pubkey_file")
155        self.ssh_privkey_file = config.get("experiment_control",
156                "ssh_privkey_file")
157        dt = config.get("experiment_control", "direct_transit")
158        self.auth_type = config.get('experiment_control', 'auth_type') \
159                or 'legacy'
160        self.auth_dir = config.get('experiment_control', 'auth_dir')
161        if dt: self.direct_transit = [ tb.strip() for tb in dt.split(",")]
162        else: self.direct_transit = [ ]
163        # NB for internal master/slave ops, not experiment setup
164        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
165
166        self.overrides = set([])
167        ovr = config.get('experiment_control', 'overrides')
168        if ovr:
169            for o in ovr.split(","):
170                o = o.strip()
171                if o.startswith('fedid:'): o = o[len('fedid:'):]
172                self.overrides.add(fedid(hexstr=o))
173
174        self.state = { }
175        self.state_lock = Lock()
176        self.tclsh = "/usr/local/bin/otclsh"
177        self.tcl_splitter = config.get("ns2topdl", "tcl_splitter") or \
178                config.get("experiment_control", "tcl_splitter",
179                        "/usr/testbed/lib/ns2ir/parse.tcl")
180        mapdb_file = config.get("experiment_control", "mapdb")
181        self.trace_file = sys.stderr
182
183        self.def_expstart = \
184                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
185                "/tmp/federate";
186        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
187                "FEDDIR/hosts";
188        self.def_gwstart = \
189                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
190                "/tmp/bridge.log";
191        self.def_mgwstart = \
192                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
193                "/tmp/bridge.log";
194        self.def_gwimage = "FBSD61-TUNNEL2";
195        self.def_gwtype = "pc";
196        self.local_access = { }
197
198        if self.auth_type == 'legacy':
199            if auth:
200                self.auth = auth
201            else:
202                self.log.error( "[access]: No authorizer initialized, " +\
203                        "creating local one.")
204                auth = authorizer()
205        elif self.auth_type == 'abac':
206            self.auth = abac_authorizer(load=self.auth_dir)
207        else:
208            raise service_error(service_error.internal, 
209                    "Unknown auth_type: %s" % self.auth_type)
210
211
212        if self.ssh_pubkey_file:
213            try:
214                f = open(self.ssh_pubkey_file, 'r')
215                self.ssh_pubkey = f.read()
216                f.close()
217            except EnvironmentError:
218                raise service_error(service_error.internal,
219                        "Cannot read sshpubkey")
220        else:
221            raise service_error(service_error.internal, 
222                    "No SSH public key file?")
223
224        if not self.ssh_privkey_file:
225            raise service_error(service_error.internal, 
226                    "No SSH public key file?")
227
228
229        if mapdb_file:
230            self.read_mapdb(mapdb_file)
231        else:
232            self.log.warn("[experiment_control] No testbed map, using defaults")
233            self.tbmap = { 
234                    'deter':'https://users.isi.deterlab.net:23235',
235                    'emulab':'https://users.isi.deterlab.net:23236',
236                    'ucb':'https://users.isi.deterlab.net:23237',
237                    }
238
239        if accessdb_file:
240                self.read_accessdb(accessdb_file)
241        else:
242            raise service_error(service_error.internal,
243                    "No accessdb specified in config")
244
245        # Grab saved state.  OK to do this w/o locking because it's read only
246        # and only one thread should be in existence that can see self.state at
247        # this point.
248        if self.state_filename:
249            self.read_state()
250
251        if self.store_filename:
252            self.read_store()
253        else:
254            self.log.warning("No saved synch store")
255            self.synch_store = synch_store
256
257        # Dispatch tables
258        self.soap_services = {\
259                'New': soap_handler('New', self.new_experiment),
260                'Create': soap_handler('Create', self.create_experiment),
261                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
262                'Vis': soap_handler('Vis', self.get_vis),
263                'Info': soap_handler('Info', self.get_info),
264                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
265                'Terminate': soap_handler('Terminate', 
266                    self.terminate_experiment),
267                'GetValue': soap_handler('GetValue', self.GetValue),
268                'SetValue': soap_handler('SetValue', self.SetValue),
269        }
270
271        self.xmlrpc_services = {\
272                'New': xmlrpc_handler('New', self.new_experiment),
273                'Create': xmlrpc_handler('Create', self.create_experiment),
274                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
275                'Vis': xmlrpc_handler('Vis', self.get_vis),
276                'Info': xmlrpc_handler('Info', self.get_info),
277                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
278                'Terminate': xmlrpc_handler('Terminate',
279                    self.terminate_experiment),
280                'GetValue': xmlrpc_handler('GetValue', self.GetValue),
281                'SetValue': xmlrpc_handler('SetValue', self.SetValue),
282        }
283
284    # Call while holding self.state_lock
285    def write_state(self):
286        """
287        Write a new copy of experiment state after copying the existing state
288        to a backup.
289
290        State format is a simple pickling of the state dictionary.
291        """
292        if os.access(self.state_filename, os.W_OK):
293            copy_file(self.state_filename, \
294                    "%s.bak" % self.state_filename)
295        try:
296            f = open(self.state_filename, 'w')
297            pickle.dump(self.state, f)
298        except EnvironmentError, e:
299            self.log.error("Can't write file %s: %s" % \
300                    (self.state_filename, e))
301        except pickle.PicklingError, e:
302            self.log.error("Pickling problem: %s" % e)
303        except TypeError, e:
304            self.log.error("Pickling problem (TypeError): %s" % e)
305
306    @staticmethod
307    def get_alloc_ids(state):
308        """
309        Pull the fedids of the identifiers of each allocation from the
310        state.  Again, a dict dive that's best isolated.
311
312        Used by read_store and read state
313        """
314
315        return [ f['allocID']['fedid'] 
316                for f in state.get('federant',[]) \
317                    if f.has_key('allocID') and \
318                        f['allocID'].has_key('fedid')]
319
320    # Call while holding self.state_lock
321    def read_state(self):
322        """
323        Read a new copy of experiment state.  Old state is overwritten.
324
325        State format is a simple pickling of the state dictionary.
326        """
327       
328        def get_experiment_id(state):
329            """
330            Pull the fedid experimentID out of the saved state.  This is kind
331            of a gross walk through the dict.
332            """
333
334            if state.has_key('experimentID'):
335                for e in state['experimentID']:
336                    if e.has_key('fedid'):
337                        return e['fedid']
338                else:
339                    return None
340            else:
341                return None
342
343        try:
344            f = open(self.state_filename, "r")
345            self.state = pickle.load(f)
346            self.log.debug("[read_state]: Read state from %s" % \
347                    self.state_filename)
348        except EnvironmentError, e:
349            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
350                    % (self.state_filename, e))
351        except pickle.UnpicklingError, e:
352            self.log.warning(("[read_state]: No saved state: " + \
353                    "Unpickling failed: %s") % e)
354       
355        for s in self.state.values():
356            try:
357
358                eid = get_experiment_id(s)
359                if eid : 
360                    if self.auth_type == 'legacy':
361                        # XXX: legacy
362                        # Give the owner rights to the experiment
363                        self.auth.set_attribute(s['owner'], eid)
364                        # And holders of the eid as well
365                        self.auth.set_attribute(eid, eid)
366                        # allow overrides to control experiments as well
367                        for o in self.overrides:
368                            self.auth.set_attribute(o, eid)
369                        # Set permissions to allow reading of the software
370                        # repo, if any, as well.
371                        for a in self.get_alloc_ids(s):
372                            self.auth.set_attribute(a, 'repo/%s' % eid)
373                else:
374                    raise KeyError("No experiment id")
375            except KeyError, e:
376                self.log.warning("[read_state]: State ownership or identity " +\
377                        "misformatted in %s: %s" % (self.state_filename, e))
378
379
380    def read_accessdb(self, accessdb_file):
381        """
382        Read the mapping from fedids that can create experiments to their name
383        in the 3-level access namespace.  All will be asserted from this
384        testbed and can include the local username and porject that will be
385        asserted on their behalf by this fedd.  Each fedid is also added to the
386        authorization system with the "create" attribute.
387        """
388        self.accessdb = {}
389        # These are the regexps for parsing the db
390        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
391        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
392                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
393        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
394                "\s*->\s*(" + name_expr + ")\s*$")
395        lineno = 0
396
397        # Parse the mappings and store in self.authdb, a dict of
398        # fedid -> (proj, user)
399        try:
400            f = open(accessdb_file, "r")
401            for line in f:
402                lineno += 1
403                line = line.strip()
404                if len(line) == 0 or line.startswith('#'):
405                    continue
406                m = project_line.match(line)
407                if m:
408                    fid = fedid(hexstr=m.group(1))
409                    project, user = m.group(2,3)
410                    if not self.accessdb.has_key(fid):
411                        self.accessdb[fid] = []
412                    self.accessdb[fid].append((project, user))
413                    continue
414
415                m = user_line.match(line)
416                if m:
417                    fid = fedid(hexstr=m.group(1))
418                    project = None
419                    user = m.group(2)
420                    if not self.accessdb.has_key(fid):
421                        self.accessdb[fid] = []
422                    self.accessdb[fid].append((project, user))
423                    continue
424                self.log.warn("[experiment_control] Error parsing access " +\
425                        "db %s at line %d" %  (accessdb_file, lineno))
426        except EnvironmentError:
427            raise service_error(service_error.internal,
428                    ("Error opening/reading %s as experiment " +\
429                            "control accessdb") %  accessdb_file)
430        f.close()
431
432        # Initialize the authorization attributes
433        # XXX: legacy
434        if self.auth_type == 'legacy':
435            for fid in self.accessdb.keys():
436                self.auth.set_attribute(fid, 'create')
437                self.auth.set_attribute(fid, 'new')
438
439    def read_mapdb(self, file):
440        """
441        Read a simple colon separated list of mappings for the
442        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
443        """
444
445        self.tbmap = { }
446        lineno =0
447        try:
448            f = open(file, "r")
449            for line in f:
450                lineno += 1
451                line = line.strip()
452                if line.startswith('#') or len(line) == 0:
453                    continue
454                try:
455                    label, url = line.split(':', 1)
456                    self.tbmap[label] = url
457                except ValueError, e:
458                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
459                            "map db: %s %s" % (lineno, line, e))
460        except EnvironmentError, e:
461            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
462                    "open %s: %s" % (file, e))
463        f.close()
464
465    def read_store(self):
466        try:
467            self.synch_store = synch_store()
468            self.synch_store.load(self.store_filename)
469            self.log.debug("[read_store]: Read store from %s" % \
470                    self.store_filename)
471        except EnvironmentError, e:
472            self.log.warning("[read_store]: No saved store: Can't open %s: %s"\
473                    % (self.state_filename, e))
474            self.synch_store = synch_store()
475
476        # Set the initial permissions on data in the store.  XXX: This ad hoc
477        # authorization attribute initialization is getting out of hand.
478        # XXX: legacy
479        if self.auth_type == 'legacy':
480            for k in self.synch_store.all_keys():
481                try:
482                    if k.startswith('fedid:'):
483                        fid = fedid(hexstr=k[6:46])
484                        if self.state.has_key(fid):
485                            for a in self.get_alloc_ids(self.state[fid]):
486                                self.auth.set_attribute(a, k)
487                except ValueError, e:
488                    self.log.warn("Cannot deduce permissions for %s" % k)
489
490
491    def write_store(self):
492        """
493        Write a new copy of synch_store after writing current state
494        to a backup.  We use the internal synch_store pickle method to avoid
495        incinsistent data.
496
497        State format is a simple pickling of the store.
498        """
499        if os.access(self.store_filename, os.W_OK):
500            copy_file(self.store_filename, \
501                    "%s.bak" % self.store_filename)
502        try:
503            self.synch_store.save(self.store_filename)
504        except EnvironmentError, e:
505            self.log.error("Can't write file %s: %s" % \
506                    (self.store_filename, e))
507        except TypeError, e:
508            self.log.error("Pickling problem (TypeError): %s" % e)
509
510       
511    def generate_ssh_keys(self, dest, type="rsa" ):
512        """
513        Generate a set of keys for the gateways to use to talk.
514
515        Keys are of type type and are stored in the required dest file.
516        """
517        valid_types = ("rsa", "dsa")
518        t = type.lower();
519        if t not in valid_types: raise ValueError
520        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
521
522        try:
523            trace = open("/dev/null", "w")
524        except EnvironmentError:
525            raise service_error(service_error.internal,
526                    "Cannot open /dev/null??");
527
528        # May raise CalledProcessError
529        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
530        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
531        if rv != 0:
532            raise service_error(service_error.internal, 
533                    "Cannot generate nonce ssh keys.  %s return code %d" \
534                            % (self.ssh_keygen, rv))
535
536    def gentopo(self, str):
537        """
538        Generate the topology data structure from the splitter's XML
539        representation of it.
540
541        The topology XML looks like:
542            <experiment>
543                <nodes>
544                    <node><vname></vname><ips>ip1:ip2</ips></node>
545                </nodes>
546                <lans>
547                    <lan>
548                        <vname></vname><vnode></vnode><ip></ip>
549                        <bandwidth></bandwidth><member>node:port</member>
550                    </lan>
551                </lans>
552        """
553        class topo_parse:
554            """
555            Parse the topology XML and create the dats structure.
556            """
557            def __init__(self):
558                # Typing of the subelements for data conversion
559                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
560                self.int_subelements = ( 'bandwidth',)
561                self.float_subelements = ( 'delay',)
562                # The final data structure
563                self.nodes = [ ]
564                self.lans =  [ ]
565                self.topo = { \
566                        'node': self.nodes,\
567                        'lan' : self.lans,\
568                    }
569                self.element = { }  # Current element being created
570                self.chars = ""     # Last text seen
571
572            def end_element(self, name):
573                # After each sub element the contents is added to the current
574                # element or to the appropriate list.
575                if name == 'node':
576                    self.nodes.append(self.element)
577                    self.element = { }
578                elif name == 'lan':
579                    self.lans.append(self.element)
580                    self.element = { }
581                elif name in self.str_subelements:
582                    self.element[name] = self.chars
583                    self.chars = ""
584                elif name in self.int_subelements:
585                    self.element[name] = int(self.chars)
586                    self.chars = ""
587                elif name in self.float_subelements:
588                    self.element[name] = float(self.chars)
589                    self.chars = ""
590
591            def found_chars(self, data):
592                self.chars += data.rstrip()
593
594
595        tp = topo_parse();
596        parser = xml.parsers.expat.ParserCreate()
597        parser.EndElementHandler = tp.end_element
598        parser.CharacterDataHandler = tp.found_chars
599
600        parser.Parse(str)
601
602        return tp.topo
603       
604
605    def genviz(self, topo):
606        """
607        Generate the visualization the virtual topology
608        """
609
610        neato = "/usr/local/bin/neato"
611        # These are used to parse neato output and to create the visualization
612        # file.
613        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="([\d\.]+),([\d\.]+)"')
614        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
615                "%s</type></node>"
616
617        try:
618            # Node names
619            nodes = [ n['vname'] for n in topo['node'] ]
620            topo_lans = topo['lan']
621        except KeyError, e:
622            raise service_error(service_error.internal, "Bad topology: %s" %e)
623
624        lans = { }
625        links = { }
626
627        # Walk through the virtual topology, organizing the connections into
628        # 2-node connections (links) and more-than-2-node connections (lans).
629        # When a lan is created, it's added to the list of nodes (there's a
630        # node in the visualization for the lan).
631        for l in topo_lans:
632            if links.has_key(l['vname']):
633                if len(links[l['vname']]) < 2:
634                    links[l['vname']].append(l['vnode'])
635                else:
636                    nodes.append(l['vname'])
637                    lans[l['vname']] = links[l['vname']]
638                    del links[l['vname']]
639                    lans[l['vname']].append(l['vnode'])
640            elif lans.has_key(l['vname']):
641                lans[l['vname']].append(l['vnode'])
642            else:
643                links[l['vname']] = [ l['vnode'] ]
644
645
646        # Open up a temporary file for dot to turn into a visualization
647        try:
648            df, dotname = tempfile.mkstemp()
649            dotfile = os.fdopen(df, 'w')
650        except EnvironmentError:
651            raise service_error(service_error.internal,
652                    "Failed to open file in genviz")
653
654        try:
655            dnull = open('/dev/null', 'w')
656        except EnvironmentError:
657            service_error(service_error.internal,
658                    "Failed to open /dev/null in genviz")
659
660        # Generate a dot/neato input file from the links, nodes and lans
661        try:
662            print >>dotfile, "graph G {"
663            for n in nodes:
664                print >>dotfile, '\t"%s"' % n
665            for l in links.keys():
666                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
667            for l in lans.keys():
668                for n in lans[l]:
669                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
670            print >>dotfile, "}"
671            dotfile.close()
672        except TypeError:
673            raise service_error(service_error.internal,
674                    "Single endpoint link in vtopo")
675        except EnvironmentError:
676            raise service_error(service_error.internal, "Cannot write dot file")
677
678        # Use dot to create a visualization
679        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
680                '-Gpack=true', dotname], stdout=PIPE, stderr=dnull,
681                close_fds=True)
682        dnull.close()
683
684        # Translate dot to vis format
685        vis_nodes = [ ]
686        vis = { 'node': vis_nodes }
687        for line in dot.stdout:
688            m = vis_re.match(line)
689            if m:
690                vn = m.group(1)
691                vis_node = {'name': vn, \
692                        'x': float(m.group(2)),\
693                        'y' : float(m.group(3)),\
694                    }
695                if vn in links.keys() or vn in lans.keys():
696                    vis_node['type'] = 'lan'
697                else:
698                    vis_node['type'] = 'node'
699                vis_nodes.append(vis_node)
700        rv = dot.wait()
701
702        os.remove(dotname)
703        if rv == 0 : return vis
704        else: return None
705
706
707    def release_access(self, tb, aid, tbmap=None, uri=None, cert_file=None,
708            cert_pwd=None):
709        """
710        Release access to testbed through fedd
711        """
712
713        if not uri and tbmap:
714            uri = tbmap.get(tb, None)
715        if not uri:
716            raise service_error(service_error.server_config, 
717                    "Unknown testbed: %s" % tb)
718
719        if self.local_access.has_key(uri):
720            resp = self.local_access[uri].ReleaseAccess(\
721                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
722                    fedid(file=cert_file))
723            resp = { 'ReleaseAccessResponseBody': resp } 
724        else:
725            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
726                    cert_file, cert_pwd, self.trusted_certs)
727
728        # better error coding
729
730    def remote_ns2topdl(self, uri, desc):
731
732        req = {
733                'description' : { 'ns2description': desc },
734            }
735
736        r = self.call_Ns2Topdl(uri, req, self.cert_file, self.cert_pwd, 
737                self.trusted_certs)
738
739        if r.has_key('Ns2TopdlResponseBody'):
740            r = r['Ns2TopdlResponseBody']
741            ed = r.get('experimentdescription', None)
742            if ed.has_key('topdldescription'):
743                return topdl.Topology(**ed['topdldescription'])
744            else:
745                raise service_error(service_error.protocol, 
746                        "Bad splitter response (no output)")
747        else:
748            raise service_error(service_error.protocol, "Bad splitter response")
749
750    class start_segment:
751        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
752                cert_pwd=None, trusted_certs=None, caller=None,
753                log_collector=None):
754            self.log = log
755            self.debug = debug
756            self.cert_file = cert_file
757            self.cert_pwd = cert_pwd
758            self.trusted_certs = None
759            self.caller = caller
760            self.testbed = testbed
761            self.log_collector = log_collector
762            self.response = None
763            self.node = { }
764
765        def make_map(self, resp):
766            for e in resp.get('embedding', []):
767                if 'toponame' in e and 'physname' in e:
768                    self.node[e['toponame']] = e['physname'][0]
769
770        def __call__(self, uri, aid, topo, masters, attrs=None, connInfo=None):
771            req = {
772                    'allocID': { 'fedid' : aid }, 
773                    'segmentdescription': { 
774                        'topdldescription': topo.to_dict(),
775                    },
776                }
777
778            if connInfo:
779                req['connection'] = connInfo
780
781            import_svcs = [ s for m in masters.values() \
782                    for s in m if self.testbed in s.importers]
783
784            if import_svcs or self.testbed in masters:
785                req['service'] = []
786
787            for s in import_svcs:
788                for r in s.reqs:
789                    sr = copy.deepcopy(r)
790                    sr['visibility'] = 'import';
791                    req['service'].append(sr)
792
793            for s in masters.get(self.testbed, []):
794                for r in s.reqs:
795                    sr = copy.deepcopy(r)
796                    sr['visibility'] = 'export';
797                    req['service'].append(sr)
798
799            if attrs:
800                req['fedAttr'] = attrs
801
802            try:
803                self.log.debug("Calling StartSegment at %s " % uri)
804                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
805                        self.trusted_certs)
806                if r.has_key('StartSegmentResponseBody'):
807                    lval = r['StartSegmentResponseBody'].get('allocationLog',
808                            None)
809                    if lval and self.log_collector:
810                        for line in  lval.splitlines(True):
811                            self.log_collector.write(line)
812                    self.make_map(r['StartSegmentResponseBody'])
813                    self.response = r
814                else:
815                    raise service_error(service_error.internal, 
816                            "Bad response!?: %s" %r)
817                return True
818            except service_error, e:
819                self.log.error("Start segment failed on %s: %s" % \
820                        (self.testbed, e))
821                return False
822
823
824
825    class terminate_segment:
826        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
827                cert_pwd=None, trusted_certs=None, caller=None):
828            self.log = log
829            self.debug = debug
830            self.cert_file = cert_file
831            self.cert_pwd = cert_pwd
832            self.trusted_certs = None
833            self.caller = caller
834            self.testbed = testbed
835
836        def __call__(self, uri, aid ):
837            req = {
838                    'allocID': aid , 
839                }
840            try:
841                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
842                        self.trusted_certs)
843                return True
844            except service_error, e:
845                self.log.error("Terminate segment failed on %s: %s" % \
846                        (self.testbed, e))
847                return False
848   
849
850    def allocate_resources(self, allocated, masters, eid, expid, 
851            tbparams, top, topo, tmpdir, alloc_log=None, log_collector=None, 
852            attrs=None, connInfo={}, tbmap=None, expcert=None):
853
854        started = { }           # Testbeds where a sub-experiment started
855                                # successfully
856
857        # XXX
858        fail_soft = False
859
860        if tbmap is None: tbmap = { }
861
862        log = alloc_log or self.log
863
864        tp = thread_pool(self.nthreads)
865        threads = [ ]
866        starters = [ ]
867
868        if expcert:
869            cert = expcert
870            pw = None
871        else:
872            cert = self.cert_file
873            pw = self.cert_pwd
874
875        for tb in allocated.keys():
876            # Create and start a thread to start the segment, and save it
877            # to get the return value later
878            tb_attrs = copy.copy(attrs)
879            tp.wait_for_slot()
880            uri = tbparams[tb].get('uri', tbmap.get(testbed_base(tb), None))
881            base, suffix = split_testbed(tb)
882            if suffix:
883                tb_attrs.append({'attribute': 'experiment_name', 
884                    'value': "%s-%s" % (eid, suffix)})
885            else:
886                tb_attrs.append({'attribute': 'experiment_name', 'value': eid})
887            if not uri:
888                raise service_error(service_error.internal, 
889                        "Unknown testbed %s !?" % tb)
890
891            if tbparams[tb].has_key('allocID') and \
892                    tbparams[tb]['allocID'].has_key('fedid'):
893                aid = tbparams[tb]['allocID']['fedid']
894            else:
895                raise service_error(service_error.internal, 
896                        "No alloc id for testbed %s !?" % tb)
897
898            s = self.start_segment(log=log, debug=self.debug,
899                    testbed=tb, cert_file=cert,
900                    cert_pwd=pw, trusted_certs=self.trusted_certs,
901                    caller=self.call_StartSegment,
902                    log_collector=log_collector)
903            starters.append(s)
904            t  = pooled_thread(\
905                    target=s, name=tb,
906                    args=(uri, aid, topo[tb], masters, tb_attrs, connInfo[tb]),
907                    pdata=tp, trace_file=self.trace_file)
908            threads.append(t)
909            t.start()
910
911        # Wait until all finish (keep pinging the log, though)
912        mins = 0
913        revoked = False
914        while not tp.wait_for_all_done(60.0):
915            mins += 1
916            alloc_log.info("Waiting for sub threads (it has been %d mins)" \
917                    % mins)
918            if not revoked and \
919                    len([ t.getName() for t in threads if t.rv == False]) > 0:
920                # a testbed has failed.  Revoke this experiment's
921                # synchronizarion values so that sub experiments will not
922                # deadlock waiting for synchronization that will never happen
923                self.log.info("A subexperiment has failed to swap in, " + \
924                        "revoking synch keys")
925                var_key = "fedid:%s" % expid
926                for k in self.synch_store.all_keys():
927                    if len(k) > 45 and k[0:46] == var_key:
928                        self.synch_store.revoke_key(k)
929                revoked = True
930
931        failed = [ t.getName() for t in threads if not t.rv ]
932        succeeded = [tb for tb in allocated.keys() if tb not in failed]
933
934        # If one failed clean up, unless fail_soft is set
935        if failed:
936            if not fail_soft:
937                tp.clear()
938                for tb in succeeded:
939                    # Create and start a thread to stop the segment
940                    tp.wait_for_slot()
941                    uri = tbparams[tb]['uri']
942                    t  = pooled_thread(\
943                            target=self.terminate_segment(log=log,
944                                testbed=tb,
945                                cert_file=cert, 
946                                cert_pwd=pw,
947                                trusted_certs=self.trusted_certs,
948                                caller=self.call_TerminateSegment),
949                            args=(uri, tbparams[tb]['federant']['allocID']),
950                            name=tb,
951                            pdata=tp, trace_file=self.trace_file)
952                    t.start()
953                # Wait until all finish (if any are being stopped)
954                if succeeded:
955                    tp.wait_for_all_done()
956
957                # release the allocations
958                for tb in tbparams.keys():
959                    try:
960                        self.release_access(tb, tbparams[tb]['allocID'], 
961                                tbmap=tbmap, uri=tbparams[tb].get('uri', None),
962                                cert_file=cert, cert_pwd=pw)
963                    except service_error, e:
964                        self.log.warn("Error releasing access: %s" % e.desc)
965                # Remove the placeholder
966                self.state_lock.acquire()
967                self.state[eid]['experimentStatus'] = 'failed'
968                if self.state_filename: self.write_state()
969                self.state_lock.release()
970                # Remove the repo dir
971                self.remove_dirs("%s/%s" %(self.repodir, expid))
972                # Walk up tmpdir, deleting as we go
973                if self.cleanup:
974                    self.remove_dirs(tmpdir)
975                else:
976                    log.debug("[start_experiment]: not removing %s" % tmpdir)
977
978
979                log.error("Swap in failed on %s" % ",".join(failed))
980                return
981        else:
982            # Walk through the successes and gather the virtual to physical
983            # mapping.
984            embedding = [ ]
985            for s in starters:
986                for k, v in s.node.items():
987                    embedding.append({
988                        'toponame': k, 
989                        'physname': [ v],
990                        'testbed': s.testbed
991                        })
992            log.info("[start_segment]: Experiment %s active" % eid)
993
994
995        # Walk up tmpdir, deleting as we go
996        if self.cleanup:
997            self.remove_dirs(tmpdir)
998        else:
999            log.debug("[start_experiment]: not removing %s" % tmpdir)
1000
1001        # Insert the experiment into our state and update the disk copy.
1002        self.state_lock.acquire()
1003        self.state[expid]['experimentStatus'] = 'active'
1004        self.state[eid] = self.state[expid]
1005        self.state[eid]['experimentdescription']['topdldescription'] = \
1006                top.to_dict()
1007        self.state[eid]['embedding'] = embedding
1008        if self.state_filename: self.write_state()
1009        self.state_lock.release()
1010        return
1011
1012
1013    def add_kit(self, e, kit):
1014        """
1015        Add a Software object created from the list of (install, location)
1016        tuples passed as kit  to the software attribute of an object e.  We
1017        do this enough to break out the code, but it's kind of a hack to
1018        avoid changing the old tuple rep.
1019        """
1020
1021        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1022
1023        if isinstance(e.software, list): e.software.extend(s)
1024        else: e.software = s
1025
1026
1027    def create_experiment_state(self, fid, req, expid, expcert,
1028            state='starting'):
1029        """
1030        Create the initial entry in the experiment's state.  The expid and
1031        expcert are the experiment's fedid and certifacte that represents that
1032        ID, which are installed in the experiment state.  If the request
1033        includes a suggested local name that is used if possible.  If the local
1034        name is already taken by an experiment owned by this user that has
1035        failed, it is overwritten.  Otherwise new letters are added until a
1036        valid localname is found.  The generated local name is returned.
1037        """
1038
1039        if req.has_key('experimentID') and \
1040                req['experimentID'].has_key('localname'):
1041            overwrite = False
1042            eid = req['experimentID']['localname']
1043            # If there's an old failed experiment here with the same local name
1044            # and accessible by this user, we'll overwrite it, otherwise we'll
1045            # fall through and do the collision avoidance.
1046            old_expid = self.get_experiment_fedid(eid)
1047            if old_expid and self.check_experiment_access(fid, old_expid):
1048                self.state_lock.acquire()
1049                status = self.state[eid].get('experimentStatus', None)
1050                if status and status == 'failed':
1051                    # remove the old access attribute
1052                    self.auth.unset_attribute(fid, old_expid)
1053                    self.auth.save()
1054                    overwrite = True
1055                    del self.state[eid]
1056                    del self.state[old_expid]
1057                self.state_lock.release()
1058            self.state_lock.acquire()
1059            while (self.state.has_key(eid) and not overwrite):
1060                eid += random.choice(string.ascii_letters)
1061            # Initial state
1062            self.state[eid] = {
1063                    'experimentID' : \
1064                            [ { 'localname' : eid }, {'fedid': expid } ],
1065                    'experimentStatus': state,
1066                    'experimentAccess': { 'X509' : expcert },
1067                    'owner': fid,
1068                    'log' : [],
1069                }
1070            self.state[expid] = self.state[eid]
1071            if self.state_filename: self.write_state()
1072            self.state_lock.release()
1073        else:
1074            eid = self.exp_stem
1075            for i in range(0,5):
1076                eid += random.choice(string.ascii_letters)
1077            self.state_lock.acquire()
1078            while (self.state.has_key(eid)):
1079                eid = self.exp_stem
1080                for i in range(0,5):
1081                    eid += random.choice(string.ascii_letters)
1082            # Initial state
1083            self.state[eid] = {
1084                    'experimentID' : \
1085                            [ { 'localname' : eid }, {'fedid': expid } ],
1086                    'experimentStatus': state,
1087                    'experimentAccess': { 'X509' : expcert },
1088                    'owner': fid,
1089                    'log' : [],
1090                }
1091            self.state[expid] = self.state[eid]
1092            if self.state_filename: self.write_state()
1093            self.state_lock.release()
1094
1095        return eid
1096
1097
1098    def allocate_ips_to_topo(self, top):
1099        """
1100        Add an ip4_address attribute to all the hosts in the topology, based on
1101        the shared substrates on which they sit.  An /etc/hosts file is also
1102        created and returned as a list of hostfiles entries.  We also return
1103        the allocator, because we may need to allocate IPs to portals
1104        (specifically DRAGON portals).
1105        """
1106        subs = sorted(top.substrates, 
1107                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1108                reverse=True)
1109        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1110        ifs = { }
1111        hosts = [ ]
1112
1113        for idx, s in enumerate(subs):
1114            net_size = len(s.interfaces)+2
1115
1116            a = ips.allocate(net_size)
1117            if a :
1118                base, num = a
1119                if num < net_size: 
1120                    raise service_error(service_error.internal,
1121                            "Allocator returned wrong number of IPs??")
1122            else:
1123                raise service_error(service_error.req, 
1124                        "Cannot allocate IP addresses")
1125            mask = ips.min_alloc
1126            while mask < net_size:
1127                mask *= 2
1128
1129            netmask = ((2**32-1) ^ (mask-1))
1130
1131            base += 1
1132            for i in s.interfaces:
1133                i.attribute.append(
1134                        topdl.Attribute('ip4_address', 
1135                            "%s" % ip_addr(base)))
1136                i.attribute.append(
1137                        topdl.Attribute('ip4_netmask', 
1138                            "%s" % ip_addr(int(netmask))))
1139
1140                hname = i.element.name
1141                if ifs.has_key(hname):
1142                    hosts.append("%s\t%s-%s %s-%d" % \
1143                            (ip_addr(base), hname, s.name, hname,
1144                                ifs[hname]))
1145                else:
1146                    ifs[hname] = 0
1147                    hosts.append("%s\t%s-%s %s-%d %s" % \
1148                            (ip_addr(base), hname, s.name, hname,
1149                                ifs[hname], hname))
1150
1151                ifs[hname] += 1
1152                base += 1
1153        return hosts, ips
1154
1155    def get_access_to_testbeds(self, testbeds, fid, allocated, 
1156            tbparam, masters, tbmap, expid=None, expcert=None):
1157        for tb in testbeds:
1158            self.get_access(tb, tbparam, fid, masters, tbmap, expid,
1159                    expcert)
1160            allocated[tb] = 1
1161
1162    def get_access(self, tb, tbparam,fid, masters, tbmap, expid=None, 
1163            expcert=None):
1164        """
1165        Get access to testbed through fedd and set the parameters for that tb
1166        """
1167        def get_export_project(svcs):
1168            """
1169            Look through for the list of federated_service for this testbed
1170            objects for a project_export service, and extract the project
1171            parameter.
1172            """
1173
1174            pe = [s for s in svcs if s.name=='project_export']
1175            if len(pe) == 1:
1176                return pe[0].params.get('project', None)
1177            elif len(pe) == 0:
1178                return None
1179            else:
1180                raise service_error(service_error.req,
1181                        "More than one project export is not supported")
1182
1183        uri = tbmap.get(testbed_base(tb), None)
1184        if not uri:
1185            raise service_error(service_error.server_config, 
1186                    "Unknown testbed: %s" % tb)
1187
1188        export_svcs = masters.get(tb,[])
1189        import_svcs = [ s for m in masters.values() \
1190                for s in m \
1191                    if tb in s.importers ]
1192
1193        export_project = get_export_project(export_svcs)
1194        # Compose the credential list so that IDs come before attributes
1195        creds = set()
1196        keys = set()
1197        certs = self.auth.get_creds_for_principal(fid)
1198        if expid:
1199            certs.update(self.auth.get_creds_for_principal(expid))
1200        for c in certs:
1201            keys.add(c.issuer_cert())
1202            creds.add(c.attribute_cert())
1203        creds = list(keys) + list(creds)
1204
1205        if expcert: cert, pw = expcert, None
1206        else: cert, pw = self.cert_file, self.cert_pw
1207
1208        # Request credentials
1209        req = {
1210                'abac_credential': creds,
1211            }
1212        # Make the service request from the services we're importing and
1213        # exporting.  Keep track of the export request ids so we can
1214        # collect the resulting info from the access response.
1215        e_keys = { }
1216        if import_svcs or export_svcs:
1217            req['service'] = [ ]
1218
1219            for i, s in enumerate(import_svcs):
1220                idx = 'import%d' % i
1221                sr = {'id': idx, 'name': s.name, 'visibility': 'import' }
1222                if s.params:
1223                    sr['fedAttr'] = [ { 'attribute': k, 'value': v } \
1224                            for k, v in s.params.items()]
1225                req['service'].append(sr)
1226
1227            for i, s in enumerate(export_svcs):
1228                idx = 'export%d' % i
1229                e_keys[idx] = s
1230                sr = {'id': idx, 'name': s.name, 'visibility': 'export' }
1231                if s.params:
1232                    sr['fedAttr'] = [ { 'attribute': k, 'value': v } 
1233                            for k, v in s.params.items()]
1234                req['service'].append(sr)
1235
1236
1237        if self.local_access.has_key(uri):
1238            # Local access call
1239            req = { 'RequestAccessRequestBody' : req }
1240            r = self.local_access[uri].RequestAccess(req, 
1241                    fedid(file=self.cert_file))
1242            r = { 'RequestAccessResponseBody' : r }
1243        else:
1244            r = self.call_RequestAccess(uri, req, cert, pw, self.trusted_certs)
1245
1246        if r.has_key('RequestAccessResponseBody'):
1247            # Through to here we have a valid response, not a fault.
1248            # Access denied is a fault, so something better or worse than
1249            # access denied has happened.
1250            r = r['RequestAccessResponseBody']
1251            self.log.debug("[get_access] Access granted")
1252        else:
1253            raise service_error(service_error.protocol,
1254                        "Bad proxy response")
1255       
1256        tbparam[tb] = { 
1257                "allocID" : r['allocID'],
1258                "uri": uri,
1259                }
1260
1261        # Collect the responses corresponding to the services this testbed
1262        # exports.  These will be the service requests that we will include in
1263        # the start segment requests (with appropriate visibility values) to
1264        # import and export the segments.
1265        for s in r.get('service', []):
1266            id = s.get('id', None)
1267            if id and id in e_keys:
1268                e_keys[id].reqs.append(s)
1269
1270        # Add attributes to parameter space.  We don't allow attributes to
1271        # overlay any parameters already installed.
1272        for a in r.get('fedAttr', []):
1273            try:
1274                if a['attribute'] and \
1275                        isinstance(a['attribute'], basestring)\
1276                        and not tbparam[tb].has_key(a['attribute'].lower()):
1277                    tbparam[tb][a['attribute'].lower()] = a['value']
1278            except KeyError:
1279                self.log.error("Bad attribute in response: %s" % a)
1280
1281
1282    def split_topology(self, top, topo, testbeds):
1283        """
1284        Create the sub-topologies that are needed for experiment instantiation.
1285        """
1286        for tb in testbeds:
1287            topo[tb] = top.clone()
1288            # copy in for loop allows deletions from the original
1289            for e in [ e for e in topo[tb].elements]:
1290                etb = e.get_attribute('testbed')
1291                # NB: elements without a testbed attribute won't appear in any
1292                # sub topologies. 
1293                if not etb or etb != tb:
1294                    for i in e.interface:
1295                        for s in i.subs:
1296                            try:
1297                                s.interfaces.remove(i)
1298                            except ValueError:
1299                                raise service_error(service_error.internal,
1300                                        "Can't remove interface??")
1301                    topo[tb].elements.remove(e)
1302            topo[tb].make_indices()
1303
1304    def wrangle_software(self, expid, top, topo, tbparams):
1305        """
1306        Copy software out to the repository directory, allocate permissions and
1307        rewrite the segment topologies to look for the software in local
1308        places.
1309        """
1310
1311        # Copy the rpms and tarfiles to a distribution directory from
1312        # which the federants can retrieve them
1313        linkpath = "%s/software" %  expid
1314        softdir ="%s/%s" % ( self.repodir, linkpath)
1315        softmap = { }
1316        # These are in a list of tuples format (each kit).  This comprehension
1317        # unwraps them into a single list of tuples that initilaizes the set of
1318        # tuples.
1319        pkgs = set([ t for l in [self.fedkit, self.gatewaykit] \
1320                for p, t in l ])
1321        pkgs.update([x.location for e in top.elements \
1322                for x in e.software])
1323        try:
1324            os.makedirs(softdir)
1325        except EnvironmentError, e:
1326            raise service_error(
1327                    "Cannot create software directory: %s" % e)
1328        # The actual copying.  Everything's converted into a url for copying.
1329        for pkg in pkgs:
1330            loc = pkg
1331
1332            scheme, host, path = urlparse(loc)[0:3]
1333            dest = os.path.basename(path)
1334            if not scheme:
1335                if not loc.startswith('/'):
1336                    loc = "/%s" % loc
1337                loc = "file://%s" %loc
1338            try:
1339                u = urlopen(loc)
1340            except Exception, e:
1341                raise service_error(service_error.req, 
1342                        "Cannot open %s: %s" % (loc, e))
1343            try:
1344                f = open("%s/%s" % (softdir, dest) , "w")
1345                self.log.debug("Writing %s/%s" % (softdir,dest) )
1346                data = u.read(4096)
1347                while data:
1348                    f.write(data)
1349                    data = u.read(4096)
1350                f.close()
1351                u.close()
1352            except Exception, e:
1353                raise service_error(service_error.internal,
1354                        "Could not copy %s: %s" % (loc, e))
1355            path = re.sub("/tmp", "", linkpath)
1356            # XXX
1357            softmap[pkg] = \
1358                    "%s/%s/%s" %\
1359                    ( self.repo_url, path, dest)
1360
1361            # Allow the individual segments to access the software.
1362            for tb in tbparams.keys():
1363                self.auth.set_attribute(tbparams[tb]['allocID']['fedid'], 
1364                        "/%s/%s" % ( path, dest))
1365            self.auth.save()
1366
1367        # Convert the software locations in the segments into the local
1368        # copies on this host
1369        for soft in [ s for tb in topo.values() \
1370                for e in tb.elements \
1371                    if getattr(e, 'software', False) \
1372                        for s in e.software ]:
1373            if softmap.has_key(soft.location):
1374                soft.location = softmap[soft.location]
1375
1376
1377    def new_experiment(self, req, fid):
1378        """
1379        The external interface to empty initial experiment creation called from
1380        the dispatcher.
1381
1382        Creates a working directory, splits the incoming description using the
1383        splitter script and parses out the avrious subsections using the
1384        lcasses above.  Once each sub-experiment is created, use pooled threads
1385        to instantiate them and start it all up.
1386        """
1387        req = req.get('NewRequestBody', None)
1388        if not req:
1389            raise service_error(service_error.req,
1390                    "Bad request format (no NewRequestBody)")
1391
1392        if self.auth.import_credentials(data_list=req.get('credential', [])):
1393            self.auth.save()
1394
1395        if not self.auth.check_attribute(fid, 'new'):
1396            raise service_error(service_error.access, "New access denied")
1397
1398        try:
1399            tmpdir = tempfile.mkdtemp(prefix="split-")
1400        except EnvironmentError:
1401            raise service_error(service_error.internal, "Cannot create tmp dir")
1402
1403        try:
1404            access_user = self.accessdb[fid]
1405        except KeyError:
1406            raise service_error(service_error.internal,
1407                    "Access map and authorizer out of sync in " + \
1408                            "new_experiment for fedid %s"  % fid)
1409
1410        pid = "dummy"
1411        gid = "dummy"
1412
1413        # Generate an ID for the experiment (slice) and a certificate that the
1414        # allocator can use to prove they own it.  We'll ship it back through
1415        # the encrypted connection.  If the requester supplied one, use it.
1416        if 'experimentAccess' in req and 'X509' in req['experimentAccess']:
1417            expcert = req['experimentAccess']['X509']
1418            expid = fedid(certstr=expcert)
1419            self.state_lock.acquire()
1420            if expid in self.state:
1421                self.state_lock.release()
1422                raise service_error(service_error.req, 
1423                        'fedid %s identifies an existing experiment' % expid)
1424            self.state_lock.release()
1425        else:
1426            (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1427
1428        #now we're done with the tmpdir, and it should be empty
1429        if self.cleanup:
1430            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1431            os.rmdir(tmpdir)
1432        else:
1433            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1434
1435        eid = self.create_experiment_state(fid, req, expid, expcert, 
1436                state='empty')
1437
1438        # Let users touch the state
1439        self.auth.set_attribute(fid, expid)
1440        self.auth.set_attribute(expid, expid)
1441        # Override fedids can manipulate state as well
1442        for o in self.overrides:
1443            self.auth.set_attribute(o, expid)
1444        self.auth.save()
1445
1446        rv = {
1447                'experimentID': [
1448                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1449                ],
1450                'experimentStatus': 'empty',
1451                'experimentAccess': { 'X509' : expcert }
1452            }
1453
1454        return rv
1455
1456    def create_experiment(self, req, fid):
1457        """
1458        The external interface to experiment creation called from the
1459        dispatcher.
1460
1461        Creates a working directory, splits the incoming description using the
1462        splitter script and parses out the various subsections using the
1463        classes above.  Once each sub-experiment is created, use pooled threads
1464        to instantiate them and start it all up.
1465        """
1466
1467        req = req.get('CreateRequestBody', None)
1468        if not req:
1469            raise service_error(service_error.req,
1470                    "Bad request format (no CreateRequestBody)")
1471
1472        # Get the experiment access
1473        exp = req.get('experimentID', None)
1474        if exp:
1475            if exp.has_key('fedid'):
1476                key = exp['fedid']
1477                expid = key
1478                eid = None
1479            elif exp.has_key('localname'):
1480                key = exp['localname']
1481                eid = key
1482                expid = None
1483            else:
1484                raise service_error(service_error.req, "Unknown lookup type")
1485        else:
1486            raise service_error(service_error.req, "No request?")
1487
1488        # Import information from the requester
1489        if self.auth.import_credentials(data_list=req.get('credential', [])):
1490            self.auth.save()
1491
1492        self.check_experiment_access(fid, key)
1493
1494        # Install the testbed map entries supplied with the request into a copy
1495        # of the testbed map.
1496        tbmap = dict(self.tbmap)
1497        for m in req.get('testbedmap', []):
1498            if 'testbed' in m and 'uri' in m:
1499                tbmap[m['testbed']] = m['uri']
1500
1501        try:
1502            tmpdir = tempfile.mkdtemp(prefix="split-")
1503            os.mkdir(tmpdir+"/keys")
1504        except EnvironmentError:
1505            raise service_error(service_error.internal, "Cannot create tmp dir")
1506
1507        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1508        gw_secretkey_base = "fed.%s" % self.ssh_type
1509        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1510        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1511        tclfile = tmpdir + "/experiment.tcl"
1512        tbparams = { }
1513        try:
1514            access_user = self.accessdb[fid]
1515        except KeyError:
1516            raise service_error(service_error.internal,
1517                    "Access map and authorizer out of sync in " + \
1518                            "create_experiment for fedid %s"  % fid)
1519
1520        pid = "dummy"
1521        gid = "dummy"
1522
1523        # The tcl parser needs to read a file so put the content into that file
1524        descr=req.get('experimentdescription', None)
1525        if descr:
1526            file_content=descr.get('ns2description', None)
1527            if file_content:
1528                try:
1529                    f = open(tclfile, 'w')
1530                    f.write(file_content)
1531                    f.close()
1532                except EnvironmentError:
1533                    raise service_error(service_error.internal,
1534                            "Cannot write temp experiment description")
1535            else:
1536                raise service_error(service_error.req, 
1537                        "Only ns2descriptions supported")
1538        else:
1539            raise service_error(service_error.req, "No experiment description")
1540
1541        self.state_lock.acquire()
1542        if self.state.has_key(key):
1543            self.state[key]['experimentStatus'] = "starting"
1544            for e in self.state[key].get('experimentID',[]):
1545                if not expid and e.has_key('fedid'):
1546                    expid = e['fedid']
1547                elif not eid and e.has_key('localname'):
1548                    eid = e['localname']
1549            if 'experimentAccess' in self.state[key] and \
1550                    'X509' in self.state[key]['experimentAccess']:
1551                expcert = self.state[key]['experimentAccess']['X509']
1552            else:
1553                expcert = None
1554        self.state_lock.release()
1555
1556        if not (eid and expid):
1557            raise service_error(service_error.internal, 
1558                    "Cannot find local experiment info!?")
1559
1560        # make a protected copy of the access certificate so the experiment
1561        # controller can act as the experiment principal.
1562        if expcert and self.auth_type != 'legacy':
1563            expcert_file = self.make_temp_certfile(expcert, tmpdir)
1564            if not expcert_file:
1565                raise service_error(service_error.internal, 
1566                        "Cannot create temp cert file?")
1567        else:
1568            expcert_file = None
1569
1570        try: 
1571            # This catches exceptions to clear the placeholder if necessary
1572            try:
1573                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1574            except ValueError:
1575                raise service_error(service_error.server_config, 
1576                        "Bad key type (%s)" % self.ssh_type)
1577
1578            # Copy the service request
1579            tb_services = [ s for s in req.get('service',[]) ]
1580            # Translate to topdl
1581            if self.splitter_url:
1582                self.log.debug("Calling remote topdl translator at %s" % \
1583                        self.splitter_url)
1584                top = self.remote_ns2topdl(self.splitter_url, file_content)
1585            else:
1586                tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
1587                    str(self.muxmax), '-m', 'dummy']
1588
1589                tclcmd.extend([pid, gid, eid, tclfile])
1590
1591                self.log.debug("running local splitter %s", " ".join(tclcmd))
1592                # This is just fantastic.  As a side effect the parser copies
1593                # tb_compat.tcl into the current directory, so that directory
1594                # must be writable by the fedd user.  Doing this in the
1595                # temporary subdir ensures this is the case.
1596                tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
1597                        cwd=tmpdir)
1598                split_data = tclparser.stdout
1599
1600                top = topdl.topology_from_xml(file=split_data, top="experiment")
1601
1602            hosts, ip_allocator = self.allocate_ips_to_topo(top)
1603            # Find the testbeds to look up
1604            testbeds = set([ a.value for e in top.elements \
1605                    for a in e.attribute \
1606                        if a.attribute == 'testbed'])
1607
1608            tb_hosts = { }
1609            for tb in testbeds:
1610                tb_hosts[tb] = [ e.name for e in top.elements \
1611                        if isinstance(e, topdl.Computer) and \
1612                            e.get_attribute('testbed') and \
1613                            e.get_attribute('testbed') == tb]
1614
1615            masters = { }           # testbeds exporting services
1616            pmasters = { }          # Testbeds exporting services that
1617                                    # need portals
1618            for s in tb_services:
1619                # If this is a service request with the importall field
1620                # set, fill it out.
1621
1622                if s.get('importall', False):
1623                    s['import'] = [ tb for tb in testbeds \
1624                            if tb not in s.get('export',[])]
1625                    del s['importall']
1626
1627                # Add the service to masters
1628                for tb in s.get('export', []):
1629                    if s.get('name', None):
1630                        if tb not in masters:
1631                            masters[tb] = [ ]
1632
1633                        params = { }
1634                        if 'fedAttr' in s:
1635                            for a in s['fedAttr']:
1636                                params[a.get('attribute', '')] = \
1637                                        a.get('value','')
1638
1639                        fser = federated_service(name=s['name'],
1640                                exporter=tb, importers=s.get('import',[]),
1641                                params=params)
1642                        if fser.name == 'hide_hosts' \
1643                                and 'hosts' not in fser.params:
1644                            fser.params['hosts'] = \
1645                                    ",".join(tb_hosts.get(fser.exporter, []))
1646                        masters[tb].append(fser)
1647
1648                        if fser.portal:
1649                            if tb not in pmasters: pmasters[tb] = [ fser ]
1650                            else: pmasters[tb].append(fser)
1651                    else:
1652                        self.log.error('Testbed service does not have name " + \
1653                                "and importers')
1654
1655
1656            allocated = { }         # Testbeds we can access
1657            topo ={ }               # Sub topologies
1658            connInfo = { }          # Connection information
1659
1660            if self.auth_type == 'legacy':
1661                self.get_legcay_access_to_testbeds(testbeds, access_user, 
1662                        allocated, tbparams, masters, tbmap)
1663            elif self.auth_type == 'abac':
1664                self.get_access_to_testbeds(testbeds, fid, allocated, 
1665                        tbparams, masters, tbmap, expid, expcert_file)
1666            else:
1667                raise service_error(service_error.internal, 
1668                        "Unknown auth_type %s" % self.auth_type)
1669
1670            self.split_topology(top, topo, testbeds)
1671
1672            # Copy configuration files into the remote file store
1673            # The config urlpath
1674            configpath = "/%s/config" % expid
1675            # The config file system location
1676            configdir ="%s%s" % ( self.repodir, configpath)
1677            try:
1678                os.makedirs(configdir)
1679            except EnvironmentError, e:
1680                raise service_error(service_error.internal,
1681                        "Cannot create config directory: %s" % e)
1682            try:
1683                f = open("%s/hosts" % configdir, "w")
1684                f.write('\n'.join(hosts))
1685                f.close()
1686            except EnvironmentError, e:
1687                raise service_error(service_error.internal, 
1688                        "Cannot write hosts file: %s" % e)
1689            try:
1690                copy_file("%s" % gw_pubkey, "%s/%s" % \
1691                        (configdir, gw_pubkey_base))
1692                copy_file("%s" % gw_secretkey, "%s/%s" % \
1693                        (configdir, gw_secretkey_base))
1694            except EnvironmentError, e:
1695                raise service_error(service_error.internal, 
1696                        "Cannot copy keyfiles: %s" % e)
1697
1698            # Allow the individual testbeds to access the configuration files.
1699            for tb in tbparams.keys():
1700                asignee = tbparams[tb]['allocID']['fedid']
1701                for f in ("hosts", gw_secretkey_base, gw_pubkey_base):
1702                    self.auth.set_attribute(asignee, "%s/%s" % \
1703                            (configpath, f))
1704
1705            part = experiment_partition(self.auth, self.store_url, tbmap,
1706                    self.muxmax, self.direct_transit)
1707            part.add_portals(top, topo, eid, pmasters, tbparams, ip_allocator,
1708                    connInfo, expid)
1709            # Now get access to the dynamic testbeds (those added above)
1710            for tb in [ t for t in topo if t not in allocated]:
1711                #XXX: ABAC
1712                if self.auth_type =='legacy':
1713                    self.get_legacy_access(tb, None, tbparams, access_user, 
1714                            masters, tbmap)
1715                elif self.auth_type == 'abac':
1716                    self.get_access(tb, tbparams, fid, masters, tbmap, 
1717                            expid, expcert_file)
1718                else:
1719                    raise service_error(service_error.internal, 
1720                            "Unknown auth_type %s" % self.auth_type)
1721                allocated[tb] = 1
1722                store_keys = topo[tb].get_attribute('store_keys')
1723                # Give the testbed access to keys it exports or imports
1724                if store_keys:
1725                    for sk in store_keys.split(" "):
1726                        self.auth.set_attribute(\
1727                                tbparams[tb]['allocID']['fedid'], sk)
1728            self.auth.save()
1729
1730            self.wrangle_software(expid, top, topo, tbparams)
1731
1732            vtopo = topdl.topology_to_vtopo(top)
1733            vis = self.genviz(vtopo)
1734
1735            # save federant information
1736            for k in allocated.keys():
1737                tbparams[k]['federant'] = {
1738                        'name': [ { 'localname' : eid} ],
1739                        'allocID' : tbparams[k]['allocID'],
1740                        'uri': tbparams[k]['uri'],
1741                    }
1742
1743            self.state_lock.acquire()
1744            self.state[eid]['vtopo'] = vtopo
1745            self.state[eid]['vis'] = vis
1746            self.state[eid]['experimentdescription'] = \
1747                    { 'topdldescription': top.to_dict() }
1748            self.state[eid]['federant'] = \
1749                    [ tbparams[tb]['federant'] for tb in tbparams.keys() \
1750                        if tbparams[tb].has_key('federant') ]
1751            if self.state_filename: 
1752                self.write_state()
1753            self.state_lock.release()
1754        except service_error, e:
1755            # If something goes wrong in the parse (usually an access error)
1756            # clear the placeholder state.  From here on out the code delays
1757            # exceptions.  Failing at this point returns a fault to the remote
1758            # caller.
1759
1760            self.state_lock.acquire()
1761            del self.state[eid]
1762            del self.state[expid]
1763            if self.state_filename: self.write_state()
1764            self.state_lock.release()
1765            if tmpdir and self.cleanup:
1766                self.remove_dirs(tmpdir)
1767            raise e
1768
1769
1770        # Start the background swapper and return the starting state.  From
1771        # here on out, the state will stick around a while.
1772
1773        # Let users touch the state
1774        self.auth.set_attribute(fid, expid)
1775        self.auth.set_attribute(expid, expid)
1776        # Override fedids can manipulate state as well
1777        for o in self.overrides:
1778            self.auth.set_attribute(o, expid)
1779        self.auth.save()
1780
1781        # Create a logger that logs to the experiment's state object as well as
1782        # to the main log file.
1783        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
1784        alloc_collector = self.list_log(self.state[eid]['log'])
1785        h = logging.StreamHandler(alloc_collector)
1786        # XXX: there should be a global one of these rather than repeating the
1787        # code.
1788        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
1789                    '%d %b %y %H:%M:%S'))
1790        alloc_log.addHandler(h)
1791       
1792        attrs = [ 
1793                {
1794                    'attribute': 'ssh_pubkey', 
1795                    'value': '%s/%s/config/%s' % \
1796                            (self.repo_url, expid, gw_pubkey_base)
1797                },
1798                {
1799                    'attribute': 'ssh_secretkey', 
1800                    'value': '%s/%s/config/%s' % \
1801                            (self.repo_url, expid, gw_secretkey_base)
1802                },
1803                {
1804                    'attribute': 'hosts', 
1805                    'value': '%s/%s/config/hosts' % \
1806                            (self.repo_url, expid)
1807                },
1808            ]
1809
1810        # transit and disconnected testbeds may not have a connInfo entry.
1811        # Fill in the blanks.
1812        for t in allocated.keys():
1813            if not connInfo.has_key(t):
1814                connInfo[t] = { }
1815
1816        # Start a thread to do the resource allocation
1817        t  = Thread(target=self.allocate_resources,
1818                args=(allocated, masters, eid, expid, tbparams, 
1819                    top, topo, tmpdir, alloc_log, alloc_collector, attrs,
1820                    connInfo, tbmap, expcert_file),
1821                name=eid)
1822        t.start()
1823
1824        rv = {
1825                'experimentID': [
1826                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1827                ],
1828                'experimentStatus': 'starting',
1829            }
1830
1831        return rv
1832   
1833    def get_experiment_fedid(self, key):
1834        """
1835        find the fedid associated with the localname key in the state database.
1836        """
1837
1838        rv = None
1839        self.state_lock.acquire()
1840        if self.state.has_key(key):
1841            if isinstance(self.state[key], dict):
1842                try:
1843                    kl = [ f['fedid'] for f in \
1844                            self.state[key]['experimentID']\
1845                                if f.has_key('fedid') ]
1846                except KeyError:
1847                    self.state_lock.release()
1848                    raise service_error(service_error.internal, 
1849                            "No fedid for experiment %s when getting "+\
1850                                    "fedid(!?)" % key)
1851                if len(kl) == 1:
1852                    rv = kl[0]
1853                else:
1854                    self.state_lock.release()
1855                    raise service_error(service_error.internal, 
1856                            "multiple fedids for experiment %s when " +\
1857                                    "getting fedid(!?)" % key)
1858            else:
1859                self.state_lock.release()
1860                raise service_error(service_error.internal, 
1861                        "Unexpected state for %s" % key)
1862        self.state_lock.release()
1863        return rv
1864
1865    def check_experiment_access(self, fid, key):
1866        """
1867        Confirm that the fid has access to the experiment.  Though a request
1868        may be made in terms of a local name, the access attribute is always
1869        the experiment's fedid.
1870        """
1871        if not isinstance(key, fedid):
1872            key = self.get_experiment_fedid(key)
1873
1874        if self.auth.check_attribute(fid, key):
1875            return True
1876        else:
1877            raise service_error(service_error.access, "Access Denied")
1878
1879
1880    def get_handler(self, path, fid):
1881        self.log.info("Get handler %s %s" % (path, fid))
1882        if self.auth.check_attribute(fid, path):
1883            return ("%s/%s" % (self.repodir, path), "application/binary")
1884        else:
1885            return (None, None)
1886
1887    def get_vtopo(self, req, fid):
1888        """
1889        Return the stored virtual topology for this experiment
1890        """
1891        rv = None
1892        state = None
1893
1894        req = req.get('VtopoRequestBody', None)
1895        if not req:
1896            raise service_error(service_error.req,
1897                    "Bad request format (no VtopoRequestBody)")
1898        exp = req.get('experiment', None)
1899        if exp:
1900            if exp.has_key('fedid'):
1901                key = exp['fedid']
1902                keytype = "fedid"
1903            elif exp.has_key('localname'):
1904                key = exp['localname']
1905                keytype = "localname"
1906            else:
1907                raise service_error(service_error.req, "Unknown lookup type")
1908        else:
1909            raise service_error(service_error.req, "No request?")
1910
1911        self.check_experiment_access(fid, key)
1912
1913        self.state_lock.acquire()
1914        if self.state.has_key(key):
1915            if self.state[key].has_key('vtopo'):
1916                rv = { 'experiment' : {keytype: key },\
1917                        'vtopo': self.state[key]['vtopo'],\
1918                    }
1919            else:
1920                state = self.state[key]['experimentStatus']
1921        self.state_lock.release()
1922
1923        if rv: return rv
1924        else: 
1925            if state:
1926                raise service_error(service_error.partial, 
1927                        "Not ready: %s" % state)
1928            else:
1929                raise service_error(service_error.req, "No such experiment")
1930
1931    def get_vis(self, req, fid):
1932        """
1933        Return the stored visualization for this experiment
1934        """
1935        rv = None
1936        state = None
1937
1938        req = req.get('VisRequestBody', None)
1939        if not req:
1940            raise service_error(service_error.req,
1941                    "Bad request format (no VisRequestBody)")
1942        exp = req.get('experiment', None)
1943        if exp:
1944            if exp.has_key('fedid'):
1945                key = exp['fedid']
1946                keytype = "fedid"
1947            elif exp.has_key('localname'):
1948                key = exp['localname']
1949                keytype = "localname"
1950            else:
1951                raise service_error(service_error.req, "Unknown lookup type")
1952        else:
1953            raise service_error(service_error.req, "No request?")
1954
1955        self.check_experiment_access(fid, key)
1956
1957        self.state_lock.acquire()
1958        if self.state.has_key(key):
1959            if self.state[key].has_key('vis'):
1960                rv =  { 'experiment' : {keytype: key },\
1961                        'vis': self.state[key]['vis'],\
1962                        }
1963            else:
1964                state = self.state[key]['experimentStatus']
1965        self.state_lock.release()
1966
1967        if rv: return rv
1968        else:
1969            if state:
1970                raise service_error(service_error.partial, 
1971                        "Not ready: %s" % state)
1972            else:
1973                raise service_error(service_error.req, "No such experiment")
1974
1975    def clean_info_response(self, rv):
1976        """
1977        Remove the information in the experiment's state object that is not in
1978        the info response.
1979        """
1980        # Remove the owner info (should always be there, but...)
1981        if rv.has_key('owner'): del rv['owner']
1982
1983        # Convert the log into the allocationLog parameter and remove the
1984        # log entry (with defensive programming)
1985        if rv.has_key('log'):
1986            rv['allocationLog'] = "".join(rv['log'])
1987            del rv['log']
1988        else:
1989            rv['allocationLog'] = ""
1990
1991        if rv['experimentStatus'] != 'active':
1992            if rv.has_key('federant'): del rv['federant']
1993        else:
1994            # remove the allocationID and uri info from each federant
1995            for f in rv.get('federant', []):
1996                if f.has_key('allocID'): del f['allocID']
1997                if f.has_key('uri'): del f['uri']
1998
1999        return rv
2000
2001    def get_info(self, req, fid):
2002        """
2003        Return all the stored info about this experiment
2004        """
2005        rv = None
2006
2007        req = req.get('InfoRequestBody', None)
2008        if not req:
2009            raise service_error(service_error.req,
2010                    "Bad request format (no InfoRequestBody)")
2011        exp = req.get('experiment', None)
2012        if exp:
2013            if exp.has_key('fedid'):
2014                key = exp['fedid']
2015                keytype = "fedid"
2016            elif exp.has_key('localname'):
2017                key = exp['localname']
2018                keytype = "localname"
2019            else:
2020                raise service_error(service_error.req, "Unknown lookup type")
2021        else:
2022            raise service_error(service_error.req, "No request?")
2023
2024        self.check_experiment_access(fid, key)
2025
2026        # The state may be massaged by the service function that called
2027        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
2028        # state.
2029        self.state_lock.acquire()
2030        if self.state.has_key(key):
2031            rv = copy.deepcopy(self.state[key])
2032        self.state_lock.release()
2033
2034        if rv:
2035            return self.clean_info_response(rv)
2036        else:
2037            raise service_error(service_error.req, "No such experiment")
2038
2039    def get_multi_info(self, req, fid):
2040        """
2041        Return all the stored info that this fedid can access
2042        """
2043        rv = { 'info': [ ] }
2044
2045        self.state_lock.acquire()
2046        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2047            try:
2048                self.check_experiment_access(fid, key)
2049            except service_error, e:
2050                if e.code == service_error.access:
2051                    continue
2052                else:
2053                    self.state_lock.release()
2054                    raise e
2055
2056            if self.state.has_key(key):
2057                e = copy.deepcopy(self.state[key])
2058                e = self.clean_info_response(e)
2059                rv['info'].append(e)
2060        self.state_lock.release()
2061        return rv
2062
2063    def remove_dirs(self, dir):
2064        """
2065        Remove the directory tree and all files rooted at dir.  Log any errors,
2066        but continue.
2067        """
2068        self.log.debug("[removedirs]: removing %s" % dir)
2069        try:
2070            for path, dirs, files in os.walk(dir, topdown=False):
2071                for f in files:
2072                    os.remove(os.path.join(path, f))
2073                for d in dirs:
2074                    os.rmdir(os.path.join(path, d))
2075            os.rmdir(dir)
2076        except EnvironmentError, e:
2077            self.log.error("Error deleting directory tree in %s" % e);
2078
2079    @staticmethod
2080    def make_temp_certfile(expcert, tmpdir):
2081        """
2082        make a protected copy of the access certificate so the experiment
2083        controller can act as the experiment principal.  mkstemp is the most
2084        secure way to do that. The directory should be created by
2085        mkdtemp.  Return the filename.
2086        """
2087        if expcert and tmpdir:
2088            try:
2089                certf, certfn = tempfile.mkstemp(suffix=".pem", dir=tmpdir)
2090                f = os.fdopen(certf, 'w')
2091                print >> f, expcert
2092                f.close()
2093            except EnvironmentError, e:
2094                raise service_error(service_error.internal, 
2095                        "Cannot create temp cert file?")
2096            return certfn
2097        else:
2098            return None
2099
2100    def terminate_experiment(self, req, fid):
2101        """
2102        Swap this experiment out on the federants and delete the shared
2103        information
2104        """
2105        tbparams = { }
2106        req = req.get('TerminateRequestBody', None)
2107        if not req:
2108            raise service_error(service_error.req,
2109                    "Bad request format (no TerminateRequestBody)")
2110        force = req.get('force', False)
2111        exp = req.get('experiment', None)
2112        if exp:
2113            if exp.has_key('fedid'):
2114                key = exp['fedid']
2115                keytype = "fedid"
2116            elif exp.has_key('localname'):
2117                key = exp['localname']
2118                keytype = "localname"
2119            else:
2120                raise service_error(service_error.req, "Unknown lookup type")
2121        else:
2122            raise service_error(service_error.req, "No request?")
2123
2124        self.check_experiment_access(fid, key)
2125
2126        dealloc_list = [ ]
2127
2128
2129        # Create a logger that logs to the dealloc_list as well as to the main
2130        # log file.
2131        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2132        h = logging.StreamHandler(self.list_log(dealloc_list))
2133        # XXX: there should be a global one of these rather than repeating the
2134        # code.
2135        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2136                    '%d %b %y %H:%M:%S'))
2137        dealloc_log.addHandler(h)
2138
2139        self.state_lock.acquire()
2140        fed_exp = self.state.get(key, None)
2141        repo = None
2142
2143        if fed_exp:
2144            # This branch of the conditional holds the lock to generate a
2145            # consistent temporary tbparams variable to deallocate experiments.
2146            # It releases the lock to do the deallocations and reacquires it to
2147            # remove the experiment state when the termination is complete.
2148
2149            # First make sure that the experiment creation is complete.
2150            status = fed_exp.get('experimentStatus', None)
2151
2152            if status:
2153                if status in ('starting', 'terminating'):
2154                    if not force:
2155                        self.state_lock.release()
2156                        raise service_error(service_error.partial, 
2157                                'Experiment still being created or destroyed')
2158                    else:
2159                        self.log.warning('Experiment in %s state ' % status + \
2160                                'being terminated by force.')
2161            else:
2162                # No status??? trouble
2163                self.state_lock.release()
2164                raise service_error(service_error.internal,
2165                        "Experiment has no status!?")
2166
2167            ids = []
2168            #  experimentID is a list of dicts that are self-describing
2169            #  identifiers.  This finds all the fedids and localnames - the
2170            #  keys of self.state - and puts them into ids.
2171            for id in fed_exp.get('experimentID', []):
2172                if id.has_key('fedid'): 
2173                    ids.append(id['fedid'])
2174                    repo = "%s" % id['fedid']
2175                if id.has_key('localname'): ids.append(id['localname'])
2176
2177            # Get the experimentAccess - the principal for this experiment.  It
2178            # is this principal to which credentials have been delegated, and
2179            # as which the experiment controller must act.
2180            if 'experimentAccess' in self.state[key] and \
2181                    'X509' in self.state[key]['experimentAccess']:
2182                expcert = self.state[key]['experimentAccess']['X509']
2183            else:
2184                expcert = None
2185            # Collect the allocation/segment ids into a dict keyed by the fedid
2186            # of the allocation (or a monotonically increasing integer) that
2187            # contains a tuple of uri, aid (which is a dict...)
2188            for i, fed in enumerate(fed_exp.get('federant', [])):
2189                try:
2190                    uri = fed['uri']
2191                    aid = fed['allocID']
2192                    k = fed['allocID'].get('fedid', i)
2193                except KeyError, e:
2194                    continue
2195                tbparams[k] = (uri, aid)
2196            fed_exp['experimentStatus'] = 'terminating'
2197            if self.state_filename: self.write_state()
2198            self.state_lock.release()
2199
2200            try:
2201                tmpdir = tempfile.mkdtemp(prefix="split-")
2202            except EnvironmentError:
2203                raise service_error(service_error.internal, 
2204                        "Cannot create tmp dir")
2205            # This try block makes sure the tempdir is cleared
2206            try:
2207                # If no expcert, try the deallocation as the experiment
2208                # controller instance.
2209                if expcert and self.auth_type != 'legacy': 
2210                    cert_file = self.make_temp_certfile(expcert, tmpdir)
2211                    pw = None
2212                else: 
2213                    cert_file = self.cert_file
2214                    pw = self.cert_pwd
2215
2216                # Stop everyone.  NB, wait_for_all waits until a thread starts
2217                # and then completes, so we can't wait if nothing starts.  So,
2218                # no tbparams, no start.
2219                if len(tbparams) > 0:
2220                    tp = thread_pool(self.nthreads)
2221                    for k in tbparams.keys():
2222                        # Create and start a thread to stop the segment
2223                        tp.wait_for_slot()
2224                        uri, aid = tbparams[k]
2225                        t  = pooled_thread(\
2226                                target=self.terminate_segment(log=dealloc_log,
2227                                    testbed=uri,
2228                                    cert_file=cert_file, 
2229                                    cert_pwd=pw,
2230                                    trusted_certs=self.trusted_certs,
2231                                    caller=self.call_TerminateSegment),
2232                                args=(uri, aid), name=k,
2233                                pdata=tp, trace_file=self.trace_file)
2234                        t.start()
2235                    # Wait for completions
2236                    tp.wait_for_all_done()
2237
2238                # release the allocations (failed experiments have done this
2239                # already, and starting experiments may be in odd states, so we
2240                # ignore errors releasing those allocations
2241                try: 
2242                    for k in tbparams.keys():
2243                        # This releases access by uri
2244                        uri, aid = tbparams[k]
2245                        self.release_access(None, aid, uri=uri,
2246                                cert_file=cert_file, cert_pwd=pw)
2247                except service_error, e:
2248                    if status != 'failed' and not force:
2249                        raise e
2250
2251            # Clean up the tmpdir no matter what
2252            finally:
2253                self.remove_dirs(tmpdir)
2254
2255            # Remove the terminated experiment
2256            self.state_lock.acquire()
2257            for id in ids:
2258                if self.state.has_key(id): del self.state[id]
2259
2260            if self.state_filename: self.write_state()
2261            self.state_lock.release()
2262
2263            # Delete any synch points associated with this experiment.  All
2264            # synch points begin with the fedid of the experiment.
2265            fedid_keys = set(["fedid:%s" % f for f in ids \
2266                    if isinstance(f, fedid)])
2267            for k in self.synch_store.all_keys():
2268                try:
2269                    if len(k) > 45 and k[0:46] in fedid_keys:
2270                        self.synch_store.del_value(k)
2271                except synch_store.BadDeletionError:
2272                    pass
2273            self.write_store()
2274
2275            # Remove software and other cached stuff from the filesystem.
2276            if repo:
2277                self.remove_dirs("%s/%s" % (self.repodir, repo))
2278               
2279            return { 
2280                    'experiment': exp , 
2281                    'deallocationLog': "".join(dealloc_list),
2282                    }
2283        else:
2284            # Don't forget to release the lock
2285            self.state_lock.release()
2286            raise service_error(service_error.req, "No saved state")
2287
2288
2289    def GetValue(self, req, fid):
2290        """
2291        Get a value from the synchronized store
2292        """
2293        req = req.get('GetValueRequestBody', None)
2294        if not req:
2295            raise service_error(service_error.req,
2296                    "Bad request format (no GetValueRequestBody)")
2297       
2298        name = req['name']
2299        wait = req['wait']
2300        rv = { 'name': name }
2301
2302        if self.auth.check_attribute(fid, name):
2303            self.log.debug("[GetValue] asking for %s " % name)
2304            try:
2305                v = self.synch_store.get_value(name, wait)
2306            except synch_store.RevokedKeyError:
2307                # No more synch on this key
2308                raise service_error(service_error.federant, 
2309                        "Synch key %s revoked" % name)
2310            if v is not None:
2311                rv['value'] = v
2312            self.log.debug("[GetValue] got %s from %s" % (v, name))
2313            return rv
2314        else:
2315            raise service_error(service_error.access, "Access Denied")
2316       
2317
2318    def SetValue(self, req, fid):
2319        """
2320        Set a value in the synchronized store
2321        """
2322        req = req.get('SetValueRequestBody', None)
2323        if not req:
2324            raise service_error(service_error.req,
2325                    "Bad request format (no SetValueRequestBody)")
2326       
2327        name = req['name']
2328        v = req['value']
2329
2330        if self.auth.check_attribute(fid, name):
2331            try:
2332                self.synch_store.set_value(name, v)
2333                self.write_store()
2334                self.log.debug("[SetValue] set %s to %s" % (name, v))
2335            except synch_store.CollisionError:
2336                # Translate into a service_error
2337                raise service_error(service_error.req,
2338                        "Value already set: %s" %name)
2339            except synch_store.RevokedKeyError:
2340                # No more synch on this key
2341                raise service_error(service_error.federant, 
2342                        "Synch key %s revoked" % name)
2343            return { 'name': name, 'value': v }
2344        else:
2345            raise service_error(service_error.access, "Access Denied")
Note: See TracBrowser for help on using the repository browser.