source: fedd/federation/experiment_control.py @ 696a689

axis_examplecompt_changesinfo-ops
Last change on this file since 696a689 was 696a689, checked in by Ted Faber <faber@…>, 13 years ago

Fail gracefully on a failed swapin. I think this

fixes #6

  • Property mode set to 100644
File size: 90.0 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import Lock, Thread, Condition
21from subprocess import call, Popen, PIPE
22from string import join
23
24from urlparse import urlparse
25from urllib2 import urlopen
26
27from util import *
28from fedid import fedid, generate_fedid
29from remote_service import xmlrpc_handler, soap_handler, service_caller
30from service_error import service_error
31from synch_store import synch_store
32from experiment_partition import experiment_partition
33from authorizer import abac_authorizer
34
35import topdl
36import list_log
37from ip_allocator import ip_allocator
38from ip_addr import ip_addr
39
40
41class nullHandler(logging.Handler):
42    def emit(self, record): pass
43
44fl = logging.getLogger("fedd.experiment_control")
45fl.addHandler(nullHandler())
46
47
48# Right now, no support for composition.
49class federated_service:
50    def __init__(self, name, exporter=None, importers=None, params=None, 
51            reqs=None, portal=None):
52        self.name=name
53        self.exporter=exporter
54        if importers is None: self.importers = []
55        else: self.importers=importers
56        if params is None: self.params = { }
57        else: self.params = params
58        if reqs is None: self.reqs = []
59        else: self.reqs = reqs
60
61        if portal is not None:
62            self.portal = portal
63        else:
64            self.portal = (name in federated_service.needs_portal)
65
66    def __str__(self):
67        return "name %s export %s import %s params %s reqs %s" % \
68                (self.name, self.exporter, self.importers, self.params,
69                        [ (r['name'], r['visibility']) for r in self.reqs] )
70
71    needs_portal = ('SMB', 'seer', 'tmcd', 'project_export', 'seer_master')
72
73class experiment_control_local:
74    """
75    Control of experiments that this system can directly access.
76
77    Includes experiment creation, termination and information dissemination.
78    Thred safe.
79    """
80
81    class ssh_cmd_timeout(RuntimeError): pass
82   
83    class thread_pool:
84        """
85        A class to keep track of a set of threads all invoked for the same
86        task.  Manages the mutual exclusion of the states.
87        """
88        def __init__(self, nthreads):
89            """
90            Start a pool.
91            """
92            self.changed = Condition()
93            self.started = 0
94            self.terminated = 0
95            self.nthreads = nthreads
96
97        def acquire(self):
98            """
99            Get the pool's lock.
100            """
101            self.changed.acquire()
102
103        def release(self):
104            """
105            Release the pool's lock.
106            """
107            self.changed.release()
108
109        def wait(self, timeout = None):
110            """
111            Wait for a pool thread to start or stop.
112            """
113            self.changed.wait(timeout)
114
115        def start(self):
116            """
117            Called by a pool thread to report starting.
118            """
119            self.changed.acquire()
120            self.started += 1
121            self.changed.notifyAll()
122            self.changed.release()
123
124        def terminate(self):
125            """
126            Called by a pool thread to report finishing.
127            """
128            self.changed.acquire()
129            self.terminated += 1
130            self.changed.notifyAll()
131            self.changed.release()
132
133        def clear(self):
134            """
135            Clear all pool data.
136            """
137            self.changed.acquire()
138            self.started = 0
139            self.terminated =0
140            self.changed.notifyAll()
141            self.changed.release()
142
143        def wait_for_slot(self):
144            """
145            Wait until we have a free slot to start another pooled thread
146            """
147            self.acquire()
148            while self.started - self.terminated >= self.nthreads:
149                self.wait()
150            self.release()
151
152        def wait_for_all_done(self, timeout=None):
153            """
154            Wait until all active threads finish (and at least one has
155            started).  If a timeout is given, return after waiting that long
156            for termination.  If all threads are done (and one has started in
157            the since the last clear()) return True, otherwise False.
158            """
159            if timeout:
160                deadline = time.time() + timeout
161            self.acquire()
162            while self.started == 0 or self.started > self.terminated:
163                self.wait(timeout)
164                if timeout:
165                    if time.time() > deadline:
166                        break
167                    timeout = deadline - time.time()
168            self.release()
169            return not (self.started == 0 or self.started > self.terminated)
170
171    class pooled_thread(Thread):
172        """
173        One of a set of threads dedicated to a specific task.  Uses the
174        thread_pool class above for coordination.
175        """
176        def __init__(self, group=None, target=None, name=None, args=(), 
177                kwargs={}, pdata=None, trace_file=None):
178            Thread.__init__(self, group, target, name, args, kwargs)
179            self.rv = None          # Return value of the ops in this thread
180            self.exception = None   # Exception that terminated this thread
181            self.target=target      # Target function to run on start()
182            self.args = args        # Args to pass to target
183            self.kwargs = kwargs    # Additional kw args
184            self.pdata = pdata      # thread_pool for this class
185            # Logger for this thread
186            self.log = logging.getLogger("fedd.experiment_control")
187       
188        def run(self):
189            """
190            Emulate Thread.run, except add pool data manipulation and error
191            logging.
192            """
193            if self.pdata:
194                self.pdata.start()
195
196            if self.target:
197                try:
198                    self.rv = self.target(*self.args, **self.kwargs)
199                except service_error, s:
200                    self.exception = s
201                    self.log.error("Thread exception: %s %s" % \
202                            (s.code_string(), s.desc))
203                except:
204                    self.exception = sys.exc_info()[1]
205                    self.log.error(("Unexpected thread exception: %s" +\
206                            "Trace %s") % (self.exception,\
207                                traceback.format_exc()))
208            if self.pdata:
209                self.pdata.terminate()
210
211    call_RequestAccess = service_caller('RequestAccess')
212    call_ReleaseAccess = service_caller('ReleaseAccess')
213    call_StartSegment = service_caller('StartSegment')
214    call_TerminateSegment = service_caller('TerminateSegment')
215    call_Ns2Topdl = service_caller('Ns2Topdl')
216
217    def __init__(self, config=None, auth=None):
218        """
219        Intialize the various attributes, most from the config object
220        """
221
222        def parse_tarfile_list(tf):
223            """
224            Parse a tarfile list from the configuration.  This is a set of
225            paths and tarfiles separated by spaces.
226            """
227            rv = [ ]
228            if tf is not None:
229                tl = tf.split()
230                while len(tl) > 1:
231                    p, t = tl[0:2]
232                    del tl[0:2]
233                    rv.append((p, t))
234            return rv
235
236        self.thread_with_rv = experiment_control_local.pooled_thread
237        self.thread_pool = experiment_control_local.thread_pool
238        self.list_log = list_log.list_log
239
240        self.cert_file = config.get("experiment_control", "cert_file")
241        if self.cert_file:
242            self.cert_pwd = config.get("experiment_control", "cert_pwd")
243        else:
244            self.cert_file = config.get("globals", "cert_file")
245            self.cert_pwd = config.get("globals", "cert_pwd")
246
247        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
248                or config.get("globals", "trusted_certs")
249
250        self.repodir = config.get("experiment_control", "repodir")
251        self.repo_url = config.get("experiment_control", "repo_url", 
252                "https://users.isi.deterlab.net:23235");
253
254        self.exp_stem = "fed-stem"
255        self.log = logging.getLogger("fedd.experiment_control")
256        set_log_level(config, "experiment_control", self.log)
257        self.muxmax = 2
258        self.nthreads = 10
259        self.randomize_experiments = False
260
261        self.splitter = None
262        self.ssh_keygen = "/usr/bin/ssh-keygen"
263        self.ssh_identity_file = None
264
265
266        self.debug = config.getboolean("experiment_control", "create_debug")
267        self.cleanup = not config.getboolean("experiment_control", 
268                "leave_tmpfiles")
269        self.state_filename = config.get("experiment_control", 
270                "experiment_state")
271        self.store_filename = config.get("experiment_control", 
272                "synch_store")
273        self.store_url = config.get("experiment_control", "store_url")
274        self.splitter_url = config.get("experiment_control", "ns2topdl_uri")
275        self.fedkit = parse_tarfile_list(\
276                config.get("experiment_control", "fedkit"))
277        self.gatewaykit = parse_tarfile_list(\
278                config.get("experiment_control", "gatewaykit"))
279        accessdb_file = config.get("experiment_control", "accessdb")
280
281        self.ssh_pubkey_file = config.get("experiment_control", 
282                "ssh_pubkey_file")
283        self.ssh_privkey_file = config.get("experiment_control",
284                "ssh_privkey_file")
285        dt = config.get("experiment_control", "direct_transit")
286        self.auth_type = config.get('experiment_control', 'auth_type') \
287                or 'legacy'
288        self.auth_dir = config.get('experiment_control', 'auth_dir')
289        if dt: self.direct_transit = [ tb.strip() for tb in dt.split(",")]
290        else: self.direct_transit = [ ]
291        # NB for internal master/slave ops, not experiment setup
292        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
293
294        self.overrides = set([])
295        ovr = config.get('experiment_control', 'overrides')
296        if ovr:
297            for o in ovr.split(","):
298                o = o.strip()
299                if o.startswith('fedid:'): o = o[len('fedid:'):]
300                self.overrides.add(fedid(hexstr=o))
301
302        self.state = { }
303        self.state_lock = Lock()
304        self.tclsh = "/usr/local/bin/otclsh"
305        self.tcl_splitter = config.get("ns2topdl", "tcl_splitter") or \
306                config.get("experiment_control", "tcl_splitter",
307                        "/usr/testbed/lib/ns2ir/parse.tcl")
308        mapdb_file = config.get("experiment_control", "mapdb")
309        self.trace_file = sys.stderr
310
311        self.def_expstart = \
312                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
313                "/tmp/federate";
314        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
315                "FEDDIR/hosts";
316        self.def_gwstart = \
317                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
318                "/tmp/bridge.log";
319        self.def_mgwstart = \
320                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
321                "/tmp/bridge.log";
322        self.def_gwimage = "FBSD61-TUNNEL2";
323        self.def_gwtype = "pc";
324        self.local_access = { }
325
326        if self.auth_type == 'legacy':
327            if auth:
328                self.auth = auth
329            else:
330                self.log.error( "[access]: No authorizer initialized, " +\
331                        "creating local one.")
332                auth = authorizer()
333        elif self.auth_type == 'abac':
334            self.auth = abac_authorizer(load=self.auth_dir)
335        else:
336            raise service_error(service_error.internal, 
337                    "Unknown auth_type: %s" % self.auth_type)
338
339
340        if self.ssh_pubkey_file:
341            try:
342                f = open(self.ssh_pubkey_file, 'r')
343                self.ssh_pubkey = f.read()
344                f.close()
345            except EnvironmentError:
346                raise service_error(service_error.internal,
347                        "Cannot read sshpubkey")
348        else:
349            raise service_error(service_error.internal, 
350                    "No SSH public key file?")
351
352        if not self.ssh_privkey_file:
353            raise service_error(service_error.internal, 
354                    "No SSH public key file?")
355
356
357        if mapdb_file:
358            self.read_mapdb(mapdb_file)
359        else:
360            self.log.warn("[experiment_control] No testbed map, using defaults")
361            self.tbmap = { 
362                    'deter':'https://users.isi.deterlab.net:23235',
363                    'emulab':'https://users.isi.deterlab.net:23236',
364                    'ucb':'https://users.isi.deterlab.net:23237',
365                    }
366
367        if accessdb_file:
368                self.read_accessdb(accessdb_file)
369        else:
370            raise service_error(service_error.internal,
371                    "No accessdb specified in config")
372
373        # Grab saved state.  OK to do this w/o locking because it's read only
374        # and only one thread should be in existence that can see self.state at
375        # this point.
376        if self.state_filename:
377            self.read_state()
378
379        if self.store_filename:
380            self.read_store()
381        else:
382            self.log.warning("No saved synch store")
383            self.synch_store = synch_store
384
385        # Dispatch tables
386        self.soap_services = {\
387                'New': soap_handler('New', self.new_experiment),
388                'Create': soap_handler('Create', self.create_experiment),
389                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
390                'Vis': soap_handler('Vis', self.get_vis),
391                'Info': soap_handler('Info', self.get_info),
392                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
393                'Terminate': soap_handler('Terminate', 
394                    self.terminate_experiment),
395                'GetValue': soap_handler('GetValue', self.GetValue),
396                'SetValue': soap_handler('SetValue', self.SetValue),
397        }
398
399        self.xmlrpc_services = {\
400                'New': xmlrpc_handler('New', self.new_experiment),
401                'Create': xmlrpc_handler('Create', self.create_experiment),
402                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
403                'Vis': xmlrpc_handler('Vis', self.get_vis),
404                'Info': xmlrpc_handler('Info', self.get_info),
405                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
406                'Terminate': xmlrpc_handler('Terminate',
407                    self.terminate_experiment),
408                'GetValue': xmlrpc_handler('GetValue', self.GetValue),
409                'SetValue': xmlrpc_handler('SetValue', self.SetValue),
410        }
411
412    # Call while holding self.state_lock
413    def write_state(self):
414        """
415        Write a new copy of experiment state after copying the existing state
416        to a backup.
417
418        State format is a simple pickling of the state dictionary.
419        """
420        if os.access(self.state_filename, os.W_OK):
421            copy_file(self.state_filename, \
422                    "%s.bak" % self.state_filename)
423        try:
424            f = open(self.state_filename, 'w')
425            pickle.dump(self.state, f)
426        except EnvironmentError, e:
427            self.log.error("Can't write file %s: %s" % \
428                    (self.state_filename, e))
429        except pickle.PicklingError, e:
430            self.log.error("Pickling problem: %s" % e)
431        except TypeError, e:
432            self.log.error("Pickling problem (TypeError): %s" % e)
433
434    @staticmethod
435    def get_alloc_ids(state):
436        """
437        Pull the fedids of the identifiers of each allocation from the
438        state.  Again, a dict dive that's best isolated.
439
440        Used by read_store and read state
441        """
442
443        return [ f['allocID']['fedid'] 
444                for f in state.get('federant',[]) \
445                    if f.has_key('allocID') and \
446                        f['allocID'].has_key('fedid')]
447
448    # Call while holding self.state_lock
449    def read_state(self):
450        """
451        Read a new copy of experiment state.  Old state is overwritten.
452
453        State format is a simple pickling of the state dictionary.
454        """
455       
456        def get_experiment_id(state):
457            """
458            Pull the fedid experimentID out of the saved state.  This is kind
459            of a gross walk through the dict.
460            """
461
462            if state.has_key('experimentID'):
463                for e in state['experimentID']:
464                    if e.has_key('fedid'):
465                        return e['fedid']
466                else:
467                    return None
468            else:
469                return None
470
471        try:
472            f = open(self.state_filename, "r")
473            self.state = pickle.load(f)
474            self.log.debug("[read_state]: Read state from %s" % \
475                    self.state_filename)
476        except EnvironmentError, e:
477            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
478                    % (self.state_filename, e))
479        except pickle.UnpicklingError, e:
480            self.log.warning(("[read_state]: No saved state: " + \
481                    "Unpickling failed: %s") % e)
482       
483        for s in self.state.values():
484            try:
485
486                eid = get_experiment_id(s)
487                if eid : 
488                    if self.auth_type == 'legacy':
489                        # XXX: legacy
490                        # Give the owner rights to the experiment
491                        self.auth.set_attribute(s['owner'], eid)
492                        # And holders of the eid as well
493                        self.auth.set_attribute(eid, eid)
494                        # allow overrides to control experiments as well
495                        for o in self.overrides:
496                            self.auth.set_attribute(o, eid)
497                        # Set permissions to allow reading of the software
498                        # repo, if any, as well.
499                        for a in self.get_alloc_ids(s):
500                            self.auth.set_attribute(a, 'repo/%s' % eid)
501                else:
502                    raise KeyError("No experiment id")
503            except KeyError, e:
504                self.log.warning("[read_state]: State ownership or identity " +\
505                        "misformatted in %s: %s" % (self.state_filename, e))
506
507
508    def read_accessdb(self, accessdb_file):
509        """
510        Read the mapping from fedids that can create experiments to their name
511        in the 3-level access namespace.  All will be asserted from this
512        testbed and can include the local username and porject that will be
513        asserted on their behalf by this fedd.  Each fedid is also added to the
514        authorization system with the "create" attribute.
515        """
516        self.accessdb = {}
517        # These are the regexps for parsing the db
518        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
519        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
520                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
521        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
522                "\s*->\s*(" + name_expr + ")\s*$")
523        lineno = 0
524
525        # Parse the mappings and store in self.authdb, a dict of
526        # fedid -> (proj, user)
527        try:
528            f = open(accessdb_file, "r")
529            for line in f:
530                lineno += 1
531                line = line.strip()
532                if len(line) == 0 or line.startswith('#'):
533                    continue
534                m = project_line.match(line)
535                if m:
536                    fid = fedid(hexstr=m.group(1))
537                    project, user = m.group(2,3)
538                    if not self.accessdb.has_key(fid):
539                        self.accessdb[fid] = []
540                    self.accessdb[fid].append((project, user))
541                    continue
542
543                m = user_line.match(line)
544                if m:
545                    fid = fedid(hexstr=m.group(1))
546                    project = None
547                    user = m.group(2)
548                    if not self.accessdb.has_key(fid):
549                        self.accessdb[fid] = []
550                    self.accessdb[fid].append((project, user))
551                    continue
552                self.log.warn("[experiment_control] Error parsing access " +\
553                        "db %s at line %d" %  (accessdb_file, lineno))
554        except EnvironmentError:
555            raise service_error(service_error.internal,
556                    ("Error opening/reading %s as experiment " +\
557                            "control accessdb") %  accessdb_file)
558        f.close()
559
560        # Initialize the authorization attributes
561        # XXX: legacy
562        if self.auth_type == 'legacy':
563            for fid in self.accessdb.keys():
564                self.auth.set_attribute(fid, 'create')
565                self.auth.set_attribute(fid, 'new')
566
567    def read_mapdb(self, file):
568        """
569        Read a simple colon separated list of mappings for the
570        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
571        """
572
573        self.tbmap = { }
574        lineno =0
575        try:
576            f = open(file, "r")
577            for line in f:
578                lineno += 1
579                line = line.strip()
580                if line.startswith('#') or len(line) == 0:
581                    continue
582                try:
583                    label, url = line.split(':', 1)
584                    self.tbmap[label] = url
585                except ValueError, e:
586                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
587                            "map db: %s %s" % (lineno, line, e))
588        except EnvironmentError, e:
589            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
590                    "open %s: %s" % (file, e))
591        f.close()
592
593    def read_store(self):
594        try:
595            self.synch_store = synch_store()
596            self.synch_store.load(self.store_filename)
597            self.log.debug("[read_store]: Read store from %s" % \
598                    self.store_filename)
599        except EnvironmentError, e:
600            self.log.warning("[read_store]: No saved store: Can't open %s: %s"\
601                    % (self.state_filename, e))
602            self.synch_store = synch_store()
603
604        # Set the initial permissions on data in the store.  XXX: This ad hoc
605        # authorization attribute initialization is getting out of hand.
606        # XXX: legacy
607        if self.auth_type == 'legacy':
608            for k in self.synch_store.all_keys():
609                try:
610                    if k.startswith('fedid:'):
611                        fid = fedid(hexstr=k[6:46])
612                        if self.state.has_key(fid):
613                            for a in self.get_alloc_ids(self.state[fid]):
614                                self.auth.set_attribute(a, k)
615                except ValueError, e:
616                    self.log.warn("Cannot deduce permissions for %s" % k)
617
618
619    def write_store(self):
620        """
621        Write a new copy of synch_store after writing current state
622        to a backup.  We use the internal synch_store pickle method to avoid
623        incinsistent data.
624
625        State format is a simple pickling of the store.
626        """
627        if os.access(self.store_filename, os.W_OK):
628            copy_file(self.store_filename, \
629                    "%s.bak" % self.store_filename)
630        try:
631            self.synch_store.save(self.store_filename)
632        except EnvironmentError, e:
633            self.log.error("Can't write file %s: %s" % \
634                    (self.store_filename, e))
635        except TypeError, e:
636            self.log.error("Pickling problem (TypeError): %s" % e)
637
638       
639    def generate_ssh_keys(self, dest, type="rsa" ):
640        """
641        Generate a set of keys for the gateways to use to talk.
642
643        Keys are of type type and are stored in the required dest file.
644        """
645        valid_types = ("rsa", "dsa")
646        t = type.lower();
647        if t not in valid_types: raise ValueError
648        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
649
650        try:
651            trace = open("/dev/null", "w")
652        except EnvironmentError:
653            raise service_error(service_error.internal,
654                    "Cannot open /dev/null??");
655
656        # May raise CalledProcessError
657        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
658        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
659        if rv != 0:
660            raise service_error(service_error.internal, 
661                    "Cannot generate nonce ssh keys.  %s return code %d" \
662                            % (self.ssh_keygen, rv))
663
664    def gentopo(self, str):
665        """
666        Generate the topology dtat structure from the splitter's XML
667        representation of it.
668
669        The topology XML looks like:
670            <experiment>
671                <nodes>
672                    <node><vname></vname><ips>ip1:ip2</ips></node>
673                </nodes>
674                <lans>
675                    <lan>
676                        <vname></vname><vnode></vnode><ip></ip>
677                        <bandwidth></bandwidth><member>node:port</member>
678                    </lan>
679                </lans>
680        """
681        class topo_parse:
682            """
683            Parse the topology XML and create the dats structure.
684            """
685            def __init__(self):
686                # Typing of the subelements for data conversion
687                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
688                self.int_subelements = ( 'bandwidth',)
689                self.float_subelements = ( 'delay',)
690                # The final data structure
691                self.nodes = [ ]
692                self.lans =  [ ]
693                self.topo = { \
694                        'node': self.nodes,\
695                        'lan' : self.lans,\
696                    }
697                self.element = { }  # Current element being created
698                self.chars = ""     # Last text seen
699
700            def end_element(self, name):
701                # After each sub element the contents is added to the current
702                # element or to the appropriate list.
703                if name == 'node':
704                    self.nodes.append(self.element)
705                    self.element = { }
706                elif name == 'lan':
707                    self.lans.append(self.element)
708                    self.element = { }
709                elif name in self.str_subelements:
710                    self.element[name] = self.chars
711                    self.chars = ""
712                elif name in self.int_subelements:
713                    self.element[name] = int(self.chars)
714                    self.chars = ""
715                elif name in self.float_subelements:
716                    self.element[name] = float(self.chars)
717                    self.chars = ""
718
719            def found_chars(self, data):
720                self.chars += data.rstrip()
721
722
723        tp = topo_parse();
724        parser = xml.parsers.expat.ParserCreate()
725        parser.EndElementHandler = tp.end_element
726        parser.CharacterDataHandler = tp.found_chars
727
728        parser.Parse(str)
729
730        return tp.topo
731       
732
733    def genviz(self, topo):
734        """
735        Generate the visualization the virtual topology
736        """
737
738        neato = "/usr/local/bin/neato"
739        # These are used to parse neato output and to create the visualization
740        # file.
741        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="([\d\.]+),([\d\.]+)"')
742        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
743                "%s</type></node>"
744
745        try:
746            # Node names
747            nodes = [ n['vname'] for n in topo['node'] ]
748            topo_lans = topo['lan']
749        except KeyError, e:
750            raise service_error(service_error.internal, "Bad topology: %s" %e)
751
752        lans = { }
753        links = { }
754
755        # Walk through the virtual topology, organizing the connections into
756        # 2-node connections (links) and more-than-2-node connections (lans).
757        # When a lan is created, it's added to the list of nodes (there's a
758        # node in the visualization for the lan).
759        for l in topo_lans:
760            if links.has_key(l['vname']):
761                if len(links[l['vname']]) < 2:
762                    links[l['vname']].append(l['vnode'])
763                else:
764                    nodes.append(l['vname'])
765                    lans[l['vname']] = links[l['vname']]
766                    del links[l['vname']]
767                    lans[l['vname']].append(l['vnode'])
768            elif lans.has_key(l['vname']):
769                lans[l['vname']].append(l['vnode'])
770            else:
771                links[l['vname']] = [ l['vnode'] ]
772
773
774        # Open up a temporary file for dot to turn into a visualization
775        try:
776            df, dotname = tempfile.mkstemp()
777            dotfile = os.fdopen(df, 'w')
778        except EnvironmentError:
779            raise service_error(service_error.internal,
780                    "Failed to open file in genviz")
781
782        try:
783            dnull = open('/dev/null', 'w')
784        except EnvironmentError:
785            service_error(service_error.internal,
786                    "Failed to open /dev/null in genviz")
787
788        # Generate a dot/neato input file from the links, nodes and lans
789        try:
790            print >>dotfile, "graph G {"
791            for n in nodes:
792                print >>dotfile, '\t"%s"' % n
793            for l in links.keys():
794                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
795            for l in lans.keys():
796                for n in lans[l]:
797                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
798            print >>dotfile, "}"
799            dotfile.close()
800        except TypeError:
801            raise service_error(service_error.internal,
802                    "Single endpoint link in vtopo")
803        except EnvironmentError:
804            raise service_error(service_error.internal, "Cannot write dot file")
805
806        # Use dot to create a visualization
807        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
808                '-Gpack=true', dotname], stdout=PIPE, stderr=dnull,
809                close_fds=True)
810        dnull.close()
811
812        # Translate dot to vis format
813        vis_nodes = [ ]
814        vis = { 'node': vis_nodes }
815        for line in dot.stdout:
816            m = vis_re.match(line)
817            if m:
818                vn = m.group(1)
819                vis_node = {'name': vn, \
820                        'x': float(m.group(2)),\
821                        'y' : float(m.group(3)),\
822                    }
823                if vn in links.keys() or vn in lans.keys():
824                    vis_node['type'] = 'lan'
825                else:
826                    vis_node['type'] = 'node'
827                vis_nodes.append(vis_node)
828        rv = dot.wait()
829
830        os.remove(dotname)
831        if rv == 0 : return vis
832        else: return None
833
834    def get_access(self, tb, nodes, tbparam, access_user, masters, tbmap):
835        """
836        Get access to testbed through fedd and set the parameters for that tb
837        """
838        def get_export_project(svcs):
839            """
840            Look through for the list of federated_service for this testbed
841            objects for a project_export service, and extract the project
842            parameter.
843            """
844
845            pe = [s for s in svcs if s.name=='project_export']
846            if len(pe) == 1:
847                return pe[0].params.get('project', None)
848            elif len(pe) == 0:
849                return None
850            else:
851                raise service_error(service_error.req,
852                        "More than one project export is not supported")
853
854        uri = tbmap.get(testbed_base(tb), None)
855        if not uri:
856            raise service_error(service_error.server_config, 
857                    "Unknown testbed: %s" % tb)
858
859        export_svcs = masters.get(tb,[])
860        import_svcs = [ s for m in masters.values() \
861                for s in m \
862                    if tb in s.importers ]
863
864        export_project = get_export_project(export_svcs)
865
866        # Tweak search order so that if there are entries in access_user that
867        # have a project matching the export project, we try them first
868        if export_project: 
869            access_sequence = [ (p, u) for p, u in access_user \
870                    if p == export_project] 
871            access_sequence.extend([(p, u) for p, u in access_user \
872                    if p != export_project]) 
873        else: 
874            access_sequence = access_user
875
876        for p, u in access_sequence: 
877            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
878                    "to %s") %  ((p or "None"), u, uri))
879
880            if p:
881                # Request with user and project specified
882                req = {\
883                        'credential': [ "project: %s" % p, "user: %s"  % u],
884                    }
885            else:
886                # Request with only user specified
887                req = {\
888                        'credential': [ 'user: %s' % u ],
889                    }
890
891            # Make the service request from the services we're importing and
892            # exporting.  Keep track of the export request ids so we can
893            # collect the resulting info from the access response.
894            e_keys = { }
895            if import_svcs or export_svcs:
896                req['service'] = [ ]
897
898                for i, s in enumerate(import_svcs):
899                    idx = 'import%d' % i
900                    sr = {'id': idx, 'name': s.name, 'visibility': 'import' }
901                    if s.params:
902                        sr['fedAttr'] = [ { 'attribute': k, 'value': v } \
903                                for k, v in s.params.items()]
904                    req['service'].append(sr)
905
906                for i, s in enumerate(export_svcs):
907                    idx = 'export%d' % i
908                    e_keys[idx] = s
909                    sr = {'id': idx, 'name': s.name, 'visibility': 'export' }
910                    if s.params:
911                        sr['fedAttr'] = [ { 'attribute': k, 'value': v } 
912                                for k, v in s.params.items()]
913                    req['service'].append(sr)
914
915            # node resources if any
916            if nodes != None and len(nodes) > 0:
917                rnodes = [ ]
918                for n in nodes:
919                    rn = { }
920                    image, hw, count = n.split(":")
921                    if image: rn['image'] = [ image ]
922                    if hw: rn['hardware'] = [ hw ]
923                    if count and int(count) >0 : rn['count'] = int(count)
924                    rnodes.append(rn)
925                req['resources']= { }
926                req['resources']['node'] = rnodes
927
928            try:
929                if self.local_access.has_key(uri):
930                    # Local access call
931                    req = { 'RequestAccessRequestBody' : req }
932                    r = self.local_access[uri].RequestAccess(req, 
933                            fedid(file=self.cert_file))
934                    r = { 'RequestAccessResponseBody' : r }
935                else:
936                    r = self.call_RequestAccess(uri, req, 
937                            self.cert_file, self.cert_pwd, self.trusted_certs)
938            except service_error, e:
939                if e.code == service_error.access:
940                    self.log.debug("[get_access] Access denied")
941                    r = None
942                    continue
943                else:
944                    raise e
945
946            if r.has_key('RequestAccessResponseBody'):
947                # Through to here we have a valid response, not a fault.
948                # Access denied is a fault, so something better or worse than
949                # access denied has happened.
950                r = r['RequestAccessResponseBody']
951                self.log.debug("[get_access] Access granted")
952                break
953            else:
954                raise service_error(service_error.protocol,
955                        "Bad proxy response")
956       
957        if not r:
958            raise service_error(service_error.access, 
959                    "Access denied by %s (%s)" % (tb, uri))
960
961        tbparam[tb] = { 
962                "allocID" : r['allocID'],
963                "uri": uri,
964                }
965
966        # Collect the responses corresponding to the services this testbed
967        # exports.  These will be the service requests that we will include in
968        # the start segment requests (with appropriate visibility values) to
969        # import and export the segments.
970        for s in r.get('service', []):
971            id = s.get('id', None)
972            if id and id in e_keys:
973                e_keys[id].reqs.append(s)
974
975        # Add attributes to parameter space.  We don't allow attributes to
976        # overlay any parameters already installed.
977        for a in r.get('fedAttr', []):
978            try:
979                if a['attribute'] and \
980                        isinstance(a['attribute'], basestring)\
981                        and not tbparam[tb].has_key(a['attribute'].lower()):
982                    tbparam[tb][a['attribute'].lower()] = a['value']
983            except KeyError:
984                self.log.error("Bad attribute in response: %s" % a)
985
986    def release_access(self, tb, aid, tbmap=None, uri=None, cert_file=None,
987            cert_pwd=None):
988        """
989        Release access to testbed through fedd
990        """
991
992        if not uri and tbmap:
993            uri = tbmap.get(tb, None)
994        if not uri:
995            raise service_error(service_error.server_config, 
996                    "Unknown testbed: %s" % tb)
997
998        if self.local_access.has_key(uri):
999            resp = self.local_access[uri].ReleaseAccess(\
1000                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
1001                    fedid(file=cert_file))
1002            resp = { 'ReleaseAccessResponseBody': resp } 
1003        else:
1004            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
1005                    cert_file, cert_pwd, self.trusted_certs)
1006
1007        # better error coding
1008
1009    def remote_ns2topdl(self, uri, desc):
1010
1011        req = {
1012                'description' : { 'ns2description': desc },
1013            }
1014
1015        r = self.call_Ns2Topdl(uri, req, self.cert_file, self.cert_pwd, 
1016                self.trusted_certs)
1017
1018        if r.has_key('Ns2TopdlResponseBody'):
1019            r = r['Ns2TopdlResponseBody']
1020            ed = r.get('experimentdescription', None)
1021            if ed.has_key('topdldescription'):
1022                return topdl.Topology(**ed['topdldescription'])
1023            else:
1024                raise service_error(service_error.protocol, 
1025                        "Bad splitter response (no output)")
1026        else:
1027            raise service_error(service_error.protocol, "Bad splitter response")
1028
1029    class start_segment:
1030        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
1031                cert_pwd=None, trusted_certs=None, caller=None,
1032                log_collector=None):
1033            self.log = log
1034            self.debug = debug
1035            self.cert_file = cert_file
1036            self.cert_pwd = cert_pwd
1037            self.trusted_certs = None
1038            self.caller = caller
1039            self.testbed = testbed
1040            self.log_collector = log_collector
1041            self.response = None
1042            self.node = { }
1043
1044        def make_map(self, resp):
1045            for e in resp.get('embedding', []):
1046                if 'toponame' in e and 'physname' in e:
1047                    self.node[e['toponame']] = e['physname'][0]
1048
1049        def __call__(self, uri, aid, topo, masters, attrs=None, connInfo=None):
1050            req = {
1051                    'allocID': { 'fedid' : aid }, 
1052                    'segmentdescription': { 
1053                        'topdldescription': topo.to_dict(),
1054                    },
1055                }
1056
1057            if connInfo:
1058                req['connection'] = connInfo
1059
1060            import_svcs = [ s for m in masters.values() \
1061                    for s in m if self.testbed in s.importers]
1062
1063            if import_svcs or self.testbed in masters:
1064                req['service'] = []
1065
1066            for s in import_svcs:
1067                for r in s.reqs:
1068                    sr = copy.deepcopy(r)
1069                    sr['visibility'] = 'import';
1070                    req['service'].append(sr)
1071
1072            for s in masters.get(self.testbed, []):
1073                for r in s.reqs:
1074                    sr = copy.deepcopy(r)
1075                    sr['visibility'] = 'export';
1076                    req['service'].append(sr)
1077
1078            if attrs:
1079                req['fedAttr'] = attrs
1080
1081            try:
1082                self.log.debug("Calling StartSegment at %s " % uri)
1083                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
1084                        self.trusted_certs)
1085                if r.has_key('StartSegmentResponseBody'):
1086                    lval = r['StartSegmentResponseBody'].get('allocationLog',
1087                            None)
1088                    if lval and self.log_collector:
1089                        for line in  lval.splitlines(True):
1090                            self.log_collector.write(line)
1091                    self.make_map(r['StartSegmentResponseBody'])
1092                    self.response = r
1093                else:
1094                    raise service_error(service_error.internal, 
1095                            "Bad response!?: %s" %r)
1096                return True
1097            except service_error, e:
1098                self.log.error("Start segment failed on %s: %s" % \
1099                        (self.testbed, e))
1100                return False
1101
1102
1103
1104    class terminate_segment:
1105        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
1106                cert_pwd=None, trusted_certs=None, caller=None):
1107            self.log = log
1108            self.debug = debug
1109            self.cert_file = cert_file
1110            self.cert_pwd = cert_pwd
1111            self.trusted_certs = None
1112            self.caller = caller
1113            self.testbed = testbed
1114
1115        def __call__(self, uri, aid ):
1116            req = {
1117                    'allocID': aid , 
1118                }
1119            try:
1120                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
1121                        self.trusted_certs)
1122                return True
1123            except service_error, e:
1124                self.log.error("Terminate segment failed on %s: %s" % \
1125                        (self.testbed, e))
1126                return False
1127   
1128
1129    def allocate_resources(self, allocated, masters, eid, expid, 
1130            tbparams, top, topo, tmpdir, alloc_log=None, log_collector=None, 
1131            attrs=None, connInfo={}, tbmap=None, expcert=None):
1132
1133        started = { }           # Testbeds where a sub-experiment started
1134                                # successfully
1135
1136        # XXX
1137        fail_soft = False
1138
1139        if tbmap is None: tbmap = { }
1140
1141        log = alloc_log or self.log
1142
1143        thread_pool = self.thread_pool(self.nthreads)
1144        threads = [ ]
1145        starters = [ ]
1146
1147        if expcert:
1148            cert = expcert
1149            pw = None
1150        else:
1151            cert = self.cert_file
1152            pw = self.cert_pwd
1153
1154        for tb in allocated.keys():
1155            # Create and start a thread to start the segment, and save it
1156            # to get the return value later
1157            tb_attrs = copy.copy(attrs)
1158            thread_pool.wait_for_slot()
1159            uri = tbparams[tb].get('uri', tbmap.get(testbed_base(tb), None))
1160            base, suffix = split_testbed(tb)
1161            if suffix:
1162                tb_attrs.append({'attribute': 'experiment_name', 
1163                    'value': "%s-%s" % (eid, suffix)})
1164            else:
1165                tb_attrs.append({'attribute': 'experiment_name', 'value': eid})
1166            if not uri:
1167                raise service_error(service_error.internal, 
1168                        "Unknown testbed %s !?" % tb)
1169
1170            if tbparams[tb].has_key('allocID') and \
1171                    tbparams[tb]['allocID'].has_key('fedid'):
1172                aid = tbparams[tb]['allocID']['fedid']
1173            else:
1174                raise service_error(service_error.internal, 
1175                        "No alloc id for testbed %s !?" % tb)
1176
1177            s = self.start_segment(log=log, debug=self.debug,
1178                    testbed=tb, cert_file=cert,
1179                    cert_pwd=pw, trusted_certs=self.trusted_certs,
1180                    caller=self.call_StartSegment,
1181                    log_collector=log_collector)
1182            starters.append(s)
1183            t  = self.pooled_thread(\
1184                    target=s, name=tb,
1185                    args=(uri, aid, topo[tb], masters, tb_attrs, connInfo[tb]),
1186                    pdata=thread_pool, trace_file=self.trace_file)
1187            threads.append(t)
1188            t.start()
1189
1190        # Wait until all finish (keep pinging the log, though)
1191        mins = 0
1192        revoked = False
1193        while not thread_pool.wait_for_all_done(60.0):
1194            mins += 1
1195            alloc_log.info("Waiting for sub threads (it has been %d mins)" \
1196                    % mins)
1197            if not revoked and \
1198                    len([ t.getName() for t in threads if t.rv == False]) > 0:
1199                # a testbed has failed.  Revoke this experiment's
1200                # synchronizarion values so that sub experiments will not
1201                # deadlock waiting for synchronization that will never happen
1202                self.log.info("A subexperiment has failed to swap in, " + \
1203                        "revoking synch keys")
1204                var_key = "fedid:%s" % expid
1205                for k in self.synch_store.all_keys():
1206                    if len(k) > 45 and k[0:46] == var_key:
1207                        self.synch_store.revoke_key(k)
1208                revoked = True
1209
1210        failed = [ t.getName() for t in threads if not t.rv ]
1211        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1212
1213        # If one failed clean up, unless fail_soft is set
1214        if failed:
1215            if not fail_soft:
1216                thread_pool.clear()
1217                for tb in succeeded:
1218                    # Create and start a thread to stop the segment
1219                    thread_pool.wait_for_slot()
1220                    uri = tbparams[tb]['uri']
1221                    t  = self.pooled_thread(\
1222                            target=self.terminate_segment(log=log,
1223                                testbed=tb,
1224                                cert_file=cert, 
1225                                cert_pwd=pw,
1226                                trusted_certs=self.trusted_certs,
1227                                caller=self.call_TerminateSegment),
1228                            args=(uri, tbparams[tb]['federant']['allocID']),
1229                            name=tb,
1230                            pdata=thread_pool, trace_file=self.trace_file)
1231                    t.start()
1232                # Wait until all finish (if any are being stopped)
1233                if succeeded:
1234                    thread_pool.wait_for_all_done()
1235
1236                # release the allocations
1237                for tb in tbparams.keys():
1238                    try:
1239                        self.release_access(tb, tbparams[tb]['allocID'], 
1240                                tbmap=tbmap, uri=tbparams[tb].get('uri', None),
1241                                cert_file=cert, cert_pwd=pw)
1242                    except service_error, e:
1243                        self.log.warn("Error releasing access: %s" % e.desc)
1244                # Remove the placeholder
1245                self.state_lock.acquire()
1246                self.state[eid]['experimentStatus'] = 'failed'
1247                if self.state_filename: self.write_state()
1248                self.state_lock.release()
1249                # Remove the repo dir
1250                self.remove_dirs("%s/%s" %(self.repodir, expid))
1251                # Walk up tmpdir, deleting as we go
1252                if self.cleanup:
1253                    self.remove_dirs(tmpdir)
1254                else:
1255                    log.debug("[start_experiment]: not removing %s" % tmpdir)
1256
1257
1258                log.error("Swap in failed on %s" % ",".join(failed))
1259                return
1260        else:
1261            # Walk through the successes and gather the virtual to physical
1262            # mapping.
1263            embedding = [ ]
1264            for s in starters:
1265                for k, v in s.node.items():
1266                    embedding.append({
1267                        'toponame': k, 
1268                        'physname': [ v],
1269                        'testbed': s.testbed
1270                        })
1271            log.info("[start_segment]: Experiment %s active" % eid)
1272
1273
1274        # Walk up tmpdir, deleting as we go
1275        if self.cleanup:
1276            self.remove_dirs(tmpdir)
1277        else:
1278            log.debug("[start_experiment]: not removing %s" % tmpdir)
1279
1280        # Insert the experiment into our state and update the disk copy.
1281        self.state_lock.acquire()
1282        self.state[expid]['experimentStatus'] = 'active'
1283        self.state[eid] = self.state[expid]
1284        self.state[eid]['experimentdescription']['topdldescription'] = \
1285                top.to_dict()
1286        self.state[eid]['embedding'] = embedding
1287        if self.state_filename: self.write_state()
1288        self.state_lock.release()
1289        return
1290
1291
1292    def add_kit(self, e, kit):
1293        """
1294        Add a Software object created from the list of (install, location)
1295        tuples passed as kit  to the software attribute of an object e.  We
1296        do this enough to break out the code, but it's kind of a hack to
1297        avoid changing the old tuple rep.
1298        """
1299
1300        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1301
1302        if isinstance(e.software, list): e.software.extend(s)
1303        else: e.software = s
1304
1305
1306    def create_experiment_state(self, fid, req, expid, expcert,
1307            state='starting'):
1308        """
1309        Create the initial entry in the experiment's state.  The expid and
1310        expcert are the experiment's fedid and certifacte that represents that
1311        ID, which are installed in the experiment state.  If the request
1312        includes a suggested local name that is used if possible.  If the local
1313        name is already taken by an experiment owned by this user that has
1314        failed, it is overwritten.  Otherwise new letters are added until a
1315        valid localname is found.  The generated local name is returned.
1316        """
1317
1318        if req.has_key('experimentID') and \
1319                req['experimentID'].has_key('localname'):
1320            overwrite = False
1321            eid = req['experimentID']['localname']
1322            # If there's an old failed experiment here with the same local name
1323            # and accessible by this user, we'll overwrite it, otherwise we'll
1324            # fall through and do the collision avoidance.
1325            old_expid = self.get_experiment_fedid(eid)
1326            if old_expid and self.check_experiment_access(fid, old_expid):
1327                self.state_lock.acquire()
1328                status = self.state[eid].get('experimentStatus', None)
1329                if status and status == 'failed':
1330                    # remove the old access attribute
1331                    self.auth.unset_attribute(fid, old_expid)
1332                    self.auth.save()
1333                    overwrite = True
1334                    del self.state[eid]
1335                    del self.state[old_expid]
1336                self.state_lock.release()
1337            self.state_lock.acquire()
1338            while (self.state.has_key(eid) and not overwrite):
1339                eid += random.choice(string.ascii_letters)
1340            # Initial state
1341            self.state[eid] = {
1342                    'experimentID' : \
1343                            [ { 'localname' : eid }, {'fedid': expid } ],
1344                    'experimentStatus': state,
1345                    'experimentAccess': { 'X509' : expcert },
1346                    'owner': fid,
1347                    'log' : [],
1348                }
1349            self.state[expid] = self.state[eid]
1350            if self.state_filename: self.write_state()
1351            self.state_lock.release()
1352        else:
1353            eid = self.exp_stem
1354            for i in range(0,5):
1355                eid += random.choice(string.ascii_letters)
1356            self.state_lock.acquire()
1357            while (self.state.has_key(eid)):
1358                eid = self.exp_stem
1359                for i in range(0,5):
1360                    eid += random.choice(string.ascii_letters)
1361            # Initial state
1362            self.state[eid] = {
1363                    'experimentID' : \
1364                            [ { 'localname' : eid }, {'fedid': expid } ],
1365                    'experimentStatus': state,
1366                    'experimentAccess': { 'X509' : expcert },
1367                    'owner': fid,
1368                    'log' : [],
1369                }
1370            self.state[expid] = self.state[eid]
1371            if self.state_filename: self.write_state()
1372            self.state_lock.release()
1373
1374        return eid
1375
1376
1377    def allocate_ips_to_topo(self, top):
1378        """
1379        Add an ip4_address attribute to all the hosts in the topology, based on
1380        the shared substrates on which they sit.  An /etc/hosts file is also
1381        created and returned as a list of hostfiles entries.  We also return
1382        the allocator, because we may need to allocate IPs to portals
1383        (specifically DRAGON portals).
1384        """
1385        subs = sorted(top.substrates, 
1386                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1387                reverse=True)
1388        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1389        ifs = { }
1390        hosts = [ ]
1391
1392        for idx, s in enumerate(subs):
1393            net_size = len(s.interfaces)+2
1394
1395            a = ips.allocate(net_size)
1396            if a :
1397                base, num = a
1398                if num < net_size: 
1399                    raise service_error(service_error.internal,
1400                            "Allocator returned wrong number of IPs??")
1401            else:
1402                raise service_error(service_error.req, 
1403                        "Cannot allocate IP addresses")
1404            mask = ips.min_alloc
1405            while mask < net_size:
1406                mask *= 2
1407
1408            netmask = ((2**32-1) ^ (mask-1))
1409
1410            base += 1
1411            for i in s.interfaces:
1412                i.attribute.append(
1413                        topdl.Attribute('ip4_address', 
1414                            "%s" % ip_addr(base)))
1415                i.attribute.append(
1416                        topdl.Attribute('ip4_netmask', 
1417                            "%s" % ip_addr(int(netmask))))
1418
1419                hname = i.element.name
1420                if ifs.has_key(hname):
1421                    hosts.append("%s\t%s-%s %s-%d" % \
1422                            (ip_addr(base), hname, s.name, hname,
1423                                ifs[hname]))
1424                else:
1425                    ifs[hname] = 0
1426                    hosts.append("%s\t%s-%s %s-%d %s" % \
1427                            (ip_addr(base), hname, s.name, hname,
1428                                ifs[hname], hname))
1429
1430                ifs[hname] += 1
1431                base += 1
1432        return hosts, ips
1433
1434    def get_access_to_testbeds(self, testbeds, access_user, allocated, 
1435            tbparams, masters, tbmap):
1436        """
1437        Request access to the various testbeds required for this instantiation
1438        (passed in as testbeds).  User, access_user, expoert_project and master
1439        are used to construct the correct requests.  Per-testbed parameters are
1440        returned in tbparams.
1441        """
1442        for tb in testbeds:
1443            self.get_access(tb, None, tbparams, access_user, masters, tbmap)
1444            allocated[tb] = 1
1445
1446    def get_abac_access_to_testbeds(self, testbeds, fid, allocated, 
1447            tbparam, masters, tbmap, expid=None, expcert=None):
1448        for tb in testbeds:
1449            self.get_abac_access(tb, tbparam, fid, masters, tbmap, expid,
1450                    expcert)
1451            allocated[tb] = 1
1452
1453    def get_abac_access(self, tb, tbparam,fid, masters, tbmap, expid=None, expcert=None):
1454        """
1455        Get access to testbed through fedd and set the parameters for that tb
1456        """
1457        def get_export_project(svcs):
1458            """
1459            Look through for the list of federated_service for this testbed
1460            objects for a project_export service, and extract the project
1461            parameter.
1462            """
1463
1464            pe = [s for s in svcs if s.name=='project_export']
1465            if len(pe) == 1:
1466                return pe[0].params.get('project', None)
1467            elif len(pe) == 0:
1468                return None
1469            else:
1470                raise service_error(service_error.req,
1471                        "More than one project export is not supported")
1472
1473        uri = tbmap.get(testbed_base(tb), None)
1474        if not uri:
1475            raise service_error(service_error.server_config, 
1476                    "Unknown testbed: %s" % tb)
1477
1478        export_svcs = masters.get(tb,[])
1479        import_svcs = [ s for m in masters.values() \
1480                for s in m \
1481                    if tb in s.importers ]
1482
1483        export_project = get_export_project(export_svcs)
1484        # Compose the credential list so that IDs come before attributes
1485        creds = set()
1486        keys = set()
1487        certs = self.auth.get_creds_for_principal(fid)
1488        if expid:
1489            certs.update(self.auth.get_creds_for_principal(expid))
1490        for c in certs:
1491            keys.add(c.issuer_cert())
1492            creds.add(c.attribute_cert())
1493        creds = list(keys) + list(creds)
1494
1495        if expcert: cert, pw = expcert, None
1496        else: cert, pw = self.cert_file, self.cert_pw
1497
1498        # Request credentials
1499        req = {
1500                'abac_credential': creds,
1501            }
1502        # Make the service request from the services we're importing and
1503        # exporting.  Keep track of the export request ids so we can
1504        # collect the resulting info from the access response.
1505        e_keys = { }
1506        if import_svcs or export_svcs:
1507            req['service'] = [ ]
1508
1509            for i, s in enumerate(import_svcs):
1510                idx = 'import%d' % i
1511                sr = {'id': idx, 'name': s.name, 'visibility': 'import' }
1512                if s.params:
1513                    sr['fedAttr'] = [ { 'attribute': k, 'value': v } \
1514                            for k, v in s.params.items()]
1515                req['service'].append(sr)
1516
1517            for i, s in enumerate(export_svcs):
1518                idx = 'export%d' % i
1519                e_keys[idx] = s
1520                sr = {'id': idx, 'name': s.name, 'visibility': 'export' }
1521                if s.params:
1522                    sr['fedAttr'] = [ { 'attribute': k, 'value': v } 
1523                            for k, v in s.params.items()]
1524                req['service'].append(sr)
1525
1526
1527        if self.local_access.has_key(uri):
1528            # Local access call
1529            req = { 'RequestAccessRequestBody' : req }
1530            r = self.local_access[uri].RequestAccess(req, 
1531                    fedid(file=self.cert_file))
1532            r = { 'RequestAccessResponseBody' : r }
1533        else:
1534            r = self.call_RequestAccess(uri, req, cert, pw, self.trusted_certs)
1535
1536        if r.has_key('RequestAccessResponseBody'):
1537            # Through to here we have a valid response, not a fault.
1538            # Access denied is a fault, so something better or worse than
1539            # access denied has happened.
1540            r = r['RequestAccessResponseBody']
1541            self.log.debug("[get_access] Access granted")
1542        else:
1543            raise service_error(service_error.protocol,
1544                        "Bad proxy response")
1545       
1546        tbparam[tb] = { 
1547                "allocID" : r['allocID'],
1548                "uri": uri,
1549                }
1550
1551        # Collect the responses corresponding to the services this testbed
1552        # exports.  These will be the service requests that we will include in
1553        # the start segment requests (with appropriate visibility values) to
1554        # import and export the segments.
1555        for s in r.get('service', []):
1556            id = s.get('id', None)
1557            if id and id in e_keys:
1558                e_keys[id].reqs.append(s)
1559
1560        # Add attributes to parameter space.  We don't allow attributes to
1561        # overlay any parameters already installed.
1562        for a in r.get('fedAttr', []):
1563            try:
1564                if a['attribute'] and \
1565                        isinstance(a['attribute'], basestring)\
1566                        and not tbparam[tb].has_key(a['attribute'].lower()):
1567                    tbparam[tb][a['attribute'].lower()] = a['value']
1568            except KeyError:
1569                self.log.error("Bad attribute in response: %s" % a)
1570
1571
1572    def split_topology(self, top, topo, testbeds):
1573        """
1574        Create the sub-topologies that are needed for experiment instantiation.
1575        """
1576        for tb in testbeds:
1577            topo[tb] = top.clone()
1578            # copy in for loop allows deletions from the original
1579            for e in [ e for e in topo[tb].elements]:
1580                etb = e.get_attribute('testbed')
1581                # NB: elements without a testbed attribute won't appear in any
1582                # sub topologies. 
1583                if not etb or etb != tb:
1584                    for i in e.interface:
1585                        for s in i.subs:
1586                            try:
1587                                s.interfaces.remove(i)
1588                            except ValueError:
1589                                raise service_error(service_error.internal,
1590                                        "Can't remove interface??")
1591                    topo[tb].elements.remove(e)
1592            topo[tb].make_indices()
1593
1594    def wrangle_software(self, expid, top, topo, tbparams):
1595        """
1596        Copy software out to the repository directory, allocate permissions and
1597        rewrite the segment topologies to look for the software in local
1598        places.
1599        """
1600
1601        # Copy the rpms and tarfiles to a distribution directory from
1602        # which the federants can retrieve them
1603        linkpath = "%s/software" %  expid
1604        softdir ="%s/%s" % ( self.repodir, linkpath)
1605        softmap = { }
1606        # These are in a list of tuples format (each kit).  This comprehension
1607        # unwraps them into a single list of tuples that initilaizes the set of
1608        # tuples.
1609        pkgs = set([ t for l in [self.fedkit, self.gatewaykit] \
1610                for p, t in l ])
1611        pkgs.update([x.location for e in top.elements \
1612                for x in e.software])
1613        try:
1614            os.makedirs(softdir)
1615        except EnvironmentError, e:
1616            raise service_error(
1617                    "Cannot create software directory: %s" % e)
1618        # The actual copying.  Everything's converted into a url for copying.
1619        for pkg in pkgs:
1620            loc = pkg
1621
1622            scheme, host, path = urlparse(loc)[0:3]
1623            dest = os.path.basename(path)
1624            if not scheme:
1625                if not loc.startswith('/'):
1626                    loc = "/%s" % loc
1627                loc = "file://%s" %loc
1628            try:
1629                u = urlopen(loc)
1630            except Exception, e:
1631                raise service_error(service_error.req, 
1632                        "Cannot open %s: %s" % (loc, e))
1633            try:
1634                f = open("%s/%s" % (softdir, dest) , "w")
1635                self.log.debug("Writing %s/%s" % (softdir,dest) )
1636                data = u.read(4096)
1637                while data:
1638                    f.write(data)
1639                    data = u.read(4096)
1640                f.close()
1641                u.close()
1642            except Exception, e:
1643                raise service_error(service_error.internal,
1644                        "Could not copy %s: %s" % (loc, e))
1645            path = re.sub("/tmp", "", linkpath)
1646            # XXX
1647            softmap[pkg] = \
1648                    "%s/%s/%s" %\
1649                    ( self.repo_url, path, dest)
1650
1651            # Allow the individual segments to access the software.
1652            for tb in tbparams.keys():
1653                self.auth.set_attribute(tbparams[tb]['allocID']['fedid'], 
1654                        "/%s/%s" % ( path, dest))
1655            self.auth.save()
1656
1657        # Convert the software locations in the segments into the local
1658        # copies on this host
1659        for soft in [ s for tb in topo.values() \
1660                for e in tb.elements \
1661                    if getattr(e, 'software', False) \
1662                        for s in e.software ]:
1663            if softmap.has_key(soft.location):
1664                soft.location = softmap[soft.location]
1665
1666
1667    def new_experiment(self, req, fid):
1668        """
1669        The external interface to empty initial experiment creation called from
1670        the dispatcher.
1671
1672        Creates a working directory, splits the incoming description using the
1673        splitter script and parses out the avrious subsections using the
1674        lcasses above.  Once each sub-experiment is created, use pooled threads
1675        to instantiate them and start it all up.
1676        """
1677        req = req.get('NewRequestBody', None)
1678        if not req:
1679            raise service_error(service_error.req,
1680                    "Bad request format (no NewRequestBody)")
1681
1682        if self.auth.import_credentials(data_list=req.get('credential', [])):
1683            self.auth.save()
1684
1685        if not self.auth.check_attribute(fid, 'new'):
1686            raise service_error(service_error.access, "New access denied")
1687
1688        try:
1689            tmpdir = tempfile.mkdtemp(prefix="split-")
1690        except EnvironmentError:
1691            raise service_error(service_error.internal, "Cannot create tmp dir")
1692
1693        try:
1694            access_user = self.accessdb[fid]
1695        except KeyError:
1696            raise service_error(service_error.internal,
1697                    "Access map and authorizer out of sync in " + \
1698                            "new_experiment for fedid %s"  % fid)
1699
1700        pid = "dummy"
1701        gid = "dummy"
1702
1703        # Generate an ID for the experiment (slice) and a certificate that the
1704        # allocator can use to prove they own it.  We'll ship it back through
1705        # the encrypted connection.  If the requester supplied one, use it.
1706        if 'experimentAccess' in req and 'X509' in req['experimentAccess']:
1707            expcert = req['experimentAccess']['X509']
1708            expid = fedid(certstr=expcert)
1709            self.state_lock.acquire()
1710            if expid in self.state:
1711                self.state_lock.release()
1712                raise service_error(service_error.req, 
1713                        'fedid %s identifies an existing experiment' % expid)
1714            self.state_lock.release()
1715        else:
1716            (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1717
1718        #now we're done with the tmpdir, and it should be empty
1719        if self.cleanup:
1720            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1721            os.rmdir(tmpdir)
1722        else:
1723            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1724
1725        eid = self.create_experiment_state(fid, req, expid, expcert, 
1726                state='empty')
1727
1728        # Let users touch the state
1729        self.auth.set_attribute(fid, expid)
1730        self.auth.set_attribute(expid, expid)
1731        # Override fedids can manipulate state as well
1732        for o in self.overrides:
1733            self.auth.set_attribute(o, expid)
1734        self.auth.save()
1735
1736        rv = {
1737                'experimentID': [
1738                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1739                ],
1740                'experimentStatus': 'empty',
1741                'experimentAccess': { 'X509' : expcert }
1742            }
1743
1744        return rv
1745
1746    def create_experiment(self, req, fid):
1747        """
1748        The external interface to experiment creation called from the
1749        dispatcher.
1750
1751        Creates a working directory, splits the incoming description using the
1752        splitter script and parses out the various subsections using the
1753        classes above.  Once each sub-experiment is created, use pooled threads
1754        to instantiate them and start it all up.
1755        """
1756
1757        req = req.get('CreateRequestBody', None)
1758        if not req:
1759            raise service_error(service_error.req,
1760                    "Bad request format (no CreateRequestBody)")
1761
1762        # Get the experiment access
1763        exp = req.get('experimentID', None)
1764        if exp:
1765            if exp.has_key('fedid'):
1766                key = exp['fedid']
1767                expid = key
1768                eid = None
1769            elif exp.has_key('localname'):
1770                key = exp['localname']
1771                eid = key
1772                expid = None
1773            else:
1774                raise service_error(service_error.req, "Unknown lookup type")
1775        else:
1776            raise service_error(service_error.req, "No request?")
1777
1778        # Import information from the requester
1779        if self.auth.import_credentials(data_list=req.get('credential', [])):
1780            self.auth.save()
1781
1782        self.check_experiment_access(fid, key)
1783
1784        # Install the testbed map entries supplied with the request into a copy
1785        # of the testbed map.
1786        tbmap = dict(self.tbmap)
1787        for m in req.get('testbedmap', []):
1788            if 'testbed' in m and 'uri' in m:
1789                tbmap[m['testbed']] = m['uri']
1790
1791        try:
1792            tmpdir = tempfile.mkdtemp(prefix="split-")
1793            os.mkdir(tmpdir+"/keys")
1794        except EnvironmentError:
1795            raise service_error(service_error.internal, "Cannot create tmp dir")
1796
1797        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1798        gw_secretkey_base = "fed.%s" % self.ssh_type
1799        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1800        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1801        tclfile = tmpdir + "/experiment.tcl"
1802        tbparams = { }
1803        try:
1804            access_user = self.accessdb[fid]
1805        except KeyError:
1806            raise service_error(service_error.internal,
1807                    "Access map and authorizer out of sync in " + \
1808                            "create_experiment for fedid %s"  % fid)
1809
1810        pid = "dummy"
1811        gid = "dummy"
1812
1813        # The tcl parser needs to read a file so put the content into that file
1814        descr=req.get('experimentdescription', None)
1815        if descr:
1816            file_content=descr.get('ns2description', None)
1817            if file_content:
1818                try:
1819                    f = open(tclfile, 'w')
1820                    f.write(file_content)
1821                    f.close()
1822                except EnvironmentError:
1823                    raise service_error(service_error.internal,
1824                            "Cannot write temp experiment description")
1825            else:
1826                raise service_error(service_error.req, 
1827                        "Only ns2descriptions supported")
1828        else:
1829            raise service_error(service_error.req, "No experiment description")
1830
1831        self.state_lock.acquire()
1832        if self.state.has_key(key):
1833            self.state[key]['experimentStatus'] = "starting"
1834            for e in self.state[key].get('experimentID',[]):
1835                if not expid and e.has_key('fedid'):
1836                    expid = e['fedid']
1837                elif not eid and e.has_key('localname'):
1838                    eid = e['localname']
1839            if 'experimentAccess' in self.state[key] and \
1840                    'X509' in self.state[key]['experimentAccess']:
1841                expcert = self.state[key]['experimentAccess']['X509']
1842            else:
1843                expcert = None
1844        self.state_lock.release()
1845
1846        if not (eid and expid):
1847            raise service_error(service_error.internal, 
1848                    "Cannot find local experiment info!?")
1849
1850        # make a protected copy of the access certificate so the experiment
1851        # controller can act as the experiment principal.
1852        if expcert and self.auth_type != 'legacy':
1853            expcert_file = self.make_temp_certfile(expcert, tmpdir)
1854            if not expcert_file:
1855                raise service_error(service_error.internal, 
1856                        "Cannot create temp cert file?")
1857        else:
1858            expcert_file = None
1859
1860        try: 
1861            # This catches exceptions to clear the placeholder if necessary
1862            try:
1863                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1864            except ValueError:
1865                raise service_error(service_error.server_config, 
1866                        "Bad key type (%s)" % self.ssh_type)
1867
1868            # Copy the service request
1869            tb_services = [ s for s in req.get('service',[]) ]
1870            # Translate to topdl
1871            if self.splitter_url:
1872                self.log.debug("Calling remote topdl translator at %s" % \
1873                        self.splitter_url)
1874                top = self.remote_ns2topdl(self.splitter_url, file_content)
1875            else:
1876                tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
1877                    str(self.muxmax), '-m', 'dummy']
1878
1879                tclcmd.extend([pid, gid, eid, tclfile])
1880
1881                self.log.debug("running local splitter %s", " ".join(tclcmd))
1882                # This is just fantastic.  As a side effect the parser copies
1883                # tb_compat.tcl into the current directory, so that directory
1884                # must be writable by the fedd user.  Doing this in the
1885                # temporary subdir ensures this is the case.
1886                tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
1887                        cwd=tmpdir)
1888                split_data = tclparser.stdout
1889
1890                top = topdl.topology_from_xml(file=split_data, top="experiment")
1891
1892            hosts, ip_allocator = self.allocate_ips_to_topo(top)
1893            # Find the testbeds to look up
1894            testbeds = set([ a.value for e in top.elements \
1895                    for a in e.attribute \
1896                        if a.attribute == 'testbed'])
1897
1898            tb_hosts = { }
1899            for tb in testbeds:
1900                tb_hosts[tb] = [ e.name for e in top.elements \
1901                        if isinstance(e, topdl.Computer) and \
1902                            e.get_attribute('testbed') and \
1903                            e.get_attribute('testbed') == tb]
1904
1905            masters = { }           # testbeds exporting services
1906            pmasters = { }          # Testbeds exporting services that
1907                                    # need portals
1908            for s in tb_services:
1909                # If this is a service request with the importall field
1910                # set, fill it out.
1911
1912                if s.get('importall', False):
1913                    s['import'] = [ tb for tb in testbeds \
1914                            if tb not in s.get('export',[])]
1915                    del s['importall']
1916
1917                # Add the service to masters
1918                for tb in s.get('export', []):
1919                    if s.get('name', None):
1920                        if tb not in masters:
1921                            masters[tb] = [ ]
1922
1923                        params = { }
1924                        if 'fedAttr' in s:
1925                            for a in s['fedAttr']:
1926                                params[a.get('attribute', '')] = \
1927                                        a.get('value','')
1928
1929                        fser = federated_service(name=s['name'],
1930                                exporter=tb, importers=s.get('import',[]),
1931                                params=params)
1932                        if fser.name == 'hide_hosts' \
1933                                and 'hosts' not in fser.params:
1934                            fser.params['hosts'] = \
1935                                    ",".join(tb_hosts.get(fser.exporter, []))
1936                        masters[tb].append(fser)
1937
1938                        if fser.portal:
1939                            if tb not in pmasters: pmasters[tb] = [ fser ]
1940                            else: pmasters[tb].append(fser)
1941                    else:
1942                        self.log.error('Testbed service does not have name " + \
1943                                "and importers')
1944
1945
1946            allocated = { }         # Testbeds we can access
1947            topo ={ }               # Sub topologies
1948            connInfo = { }          # Connection information
1949
1950            if self.auth_type == 'legacy':
1951                self.get_access_to_testbeds(testbeds, access_user, allocated, 
1952                        tbparams, masters, tbmap)
1953            elif self.auth_type == 'abac':
1954                self.get_abac_access_to_testbeds(testbeds, fid, allocated, 
1955                        tbparams, masters, tbmap, expid, expcert_file)
1956            else:
1957                raise service_error(service_error.internal, 
1958                        "Unknown auth_type %s" % self.auth_type)
1959
1960            self.split_topology(top, topo, testbeds)
1961
1962            # Copy configuration files into the remote file store
1963            # The config urlpath
1964            configpath = "/%s/config" % expid
1965            # The config file system location
1966            configdir ="%s%s" % ( self.repodir, configpath)
1967            try:
1968                os.makedirs(configdir)
1969            except EnvironmentError, e:
1970                raise service_error(service_error.internal,
1971                        "Cannot create config directory: %s" % e)
1972            try:
1973                f = open("%s/hosts" % configdir, "w")
1974                f.write('\n'.join(hosts))
1975                f.close()
1976            except EnvironmentError, e:
1977                raise service_error(service_error.internal, 
1978                        "Cannot write hosts file: %s" % e)
1979            try:
1980                copy_file("%s" % gw_pubkey, "%s/%s" % \
1981                        (configdir, gw_pubkey_base))
1982                copy_file("%s" % gw_secretkey, "%s/%s" % \
1983                        (configdir, gw_secretkey_base))
1984            except EnvironmentError, e:
1985                raise service_error(service_error.internal, 
1986                        "Cannot copy keyfiles: %s" % e)
1987
1988            # Allow the individual testbeds to access the configuration files.
1989            for tb in tbparams.keys():
1990                asignee = tbparams[tb]['allocID']['fedid']
1991                for f in ("hosts", gw_secretkey_base, gw_pubkey_base):
1992                    self.auth.set_attribute(asignee, "%s/%s" % \
1993                            (configpath, f))
1994
1995            part = experiment_partition(self.auth, self.store_url, tbmap,
1996                    self.muxmax, self.direct_transit)
1997            part.add_portals(top, topo, eid, pmasters, tbparams, ip_allocator,
1998                    connInfo, expid)
1999            # Now get access to the dynamic testbeds (those added above)
2000            for tb in [ t for t in topo if t not in allocated]:
2001                #XXX: ABAC
2002                if self.auth_type =='legacy':
2003                    self.get_access(tb, None, tbparams, access_user, 
2004                            masters, tbmap)
2005                elif self.auth_type == 'abac':
2006                    self.get_abac_access(tb, tbparams, fid, masters, tbmap, 
2007                            expid, expcert_file)
2008                else:
2009                    raise service_error(service_error.internal, 
2010                            "Unknown auth_type %s" % self.auth_type)
2011                allocated[tb] = 1
2012                store_keys = topo[tb].get_attribute('store_keys')
2013                # Give the testbed access to keys it exports or imports
2014                if store_keys:
2015                    for sk in store_keys.split(" "):
2016                        self.auth.set_attribute(\
2017                                tbparams[tb]['allocID']['fedid'], sk)
2018            self.auth.save()
2019
2020            self.wrangle_software(expid, top, topo, tbparams)
2021
2022            vtopo = topdl.topology_to_vtopo(top)
2023            vis = self.genviz(vtopo)
2024
2025            # save federant information
2026            for k in allocated.keys():
2027                tbparams[k]['federant'] = {
2028                        'name': [ { 'localname' : eid} ],
2029                        'allocID' : tbparams[k]['allocID'],
2030                        'uri': tbparams[k]['uri'],
2031                    }
2032
2033            self.state_lock.acquire()
2034            self.state[eid]['vtopo'] = vtopo
2035            self.state[eid]['vis'] = vis
2036            self.state[eid]['experimentdescription'] = \
2037                    { 'topdldescription': top.to_dict() }
2038            self.state[eid]['federant'] = \
2039                    [ tbparams[tb]['federant'] for tb in tbparams.keys() \
2040                        if tbparams[tb].has_key('federant') ]
2041            if self.state_filename: 
2042                self.write_state()
2043            self.state_lock.release()
2044        except service_error, e:
2045            # If something goes wrong in the parse (usually an access error)
2046            # clear the placeholder state.  From here on out the code delays
2047            # exceptions.  Failing at this point returns a fault to the remote
2048            # caller.
2049
2050            self.state_lock.acquire()
2051            del self.state[eid]
2052            del self.state[expid]
2053            if self.state_filename: self.write_state()
2054            self.state_lock.release()
2055            if tmpdir and self.cleanup:
2056                self.remove_dirs(tmpdir)
2057            raise e
2058
2059
2060        # Start the background swapper and return the starting state.  From
2061        # here on out, the state will stick around a while.
2062
2063        # Let users touch the state
2064        self.auth.set_attribute(fid, expid)
2065        self.auth.set_attribute(expid, expid)
2066        # Override fedids can manipulate state as well
2067        for o in self.overrides:
2068            self.auth.set_attribute(o, expid)
2069        self.auth.save()
2070
2071        # Create a logger that logs to the experiment's state object as well as
2072        # to the main log file.
2073        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
2074        alloc_collector = self.list_log(self.state[eid]['log'])
2075        h = logging.StreamHandler(alloc_collector)
2076        # XXX: there should be a global one of these rather than repeating the
2077        # code.
2078        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2079                    '%d %b %y %H:%M:%S'))
2080        alloc_log.addHandler(h)
2081       
2082        attrs = [ 
2083                {
2084                    'attribute': 'ssh_pubkey', 
2085                    'value': '%s/%s/config/%s' % \
2086                            (self.repo_url, expid, gw_pubkey_base)
2087                },
2088                {
2089                    'attribute': 'ssh_secretkey', 
2090                    'value': '%s/%s/config/%s' % \
2091                            (self.repo_url, expid, gw_secretkey_base)
2092                },
2093                {
2094                    'attribute': 'hosts', 
2095                    'value': '%s/%s/config/hosts' % \
2096                            (self.repo_url, expid)
2097                },
2098            ]
2099
2100        # transit and disconnected testbeds may not have a connInfo entry.
2101        # Fill in the blanks.
2102        for t in allocated.keys():
2103            if not connInfo.has_key(t):
2104                connInfo[t] = { }
2105
2106        # Start a thread to do the resource allocation
2107        t  = Thread(target=self.allocate_resources,
2108                args=(allocated, masters, eid, expid, tbparams, 
2109                    top, topo, tmpdir, alloc_log, alloc_collector, attrs,
2110                    connInfo, tbmap, expcert_file),
2111                name=eid)
2112        t.start()
2113
2114        rv = {
2115                'experimentID': [
2116                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2117                ],
2118                'experimentStatus': 'starting',
2119            }
2120
2121        return rv
2122   
2123    def get_experiment_fedid(self, key):
2124        """
2125        find the fedid associated with the localname key in the state database.
2126        """
2127
2128        rv = None
2129        self.state_lock.acquire()
2130        if self.state.has_key(key):
2131            if isinstance(self.state[key], dict):
2132                try:
2133                    kl = [ f['fedid'] for f in \
2134                            self.state[key]['experimentID']\
2135                                if f.has_key('fedid') ]
2136                except KeyError:
2137                    self.state_lock.release()
2138                    raise service_error(service_error.internal, 
2139                            "No fedid for experiment %s when getting "+\
2140                                    "fedid(!?)" % key)
2141                if len(kl) == 1:
2142                    rv = kl[0]
2143                else:
2144                    self.state_lock.release()
2145                    raise service_error(service_error.internal, 
2146                            "multiple fedids for experiment %s when " +\
2147                                    "getting fedid(!?)" % key)
2148            else:
2149                self.state_lock.release()
2150                raise service_error(service_error.internal, 
2151                        "Unexpected state for %s" % key)
2152        self.state_lock.release()
2153        return rv
2154
2155    def check_experiment_access(self, fid, key):
2156        """
2157        Confirm that the fid has access to the experiment.  Though a request
2158        may be made in terms of a local name, the access attribute is always
2159        the experiment's fedid.
2160        """
2161        if not isinstance(key, fedid):
2162            key = self.get_experiment_fedid(key)
2163
2164        if self.auth.check_attribute(fid, key):
2165            return True
2166        else:
2167            raise service_error(service_error.access, "Access Denied")
2168
2169
2170    def get_handler(self, path, fid):
2171        self.log.info("Get handler %s %s" % (path, fid))
2172        if self.auth.check_attribute(fid, path):
2173            return ("%s/%s" % (self.repodir, path), "application/binary")
2174        else:
2175            return (None, None)
2176
2177    def get_vtopo(self, req, fid):
2178        """
2179        Return the stored virtual topology for this experiment
2180        """
2181        rv = None
2182        state = None
2183
2184        req = req.get('VtopoRequestBody', None)
2185        if not req:
2186            raise service_error(service_error.req,
2187                    "Bad request format (no VtopoRequestBody)")
2188        exp = req.get('experiment', None)
2189        if exp:
2190            if exp.has_key('fedid'):
2191                key = exp['fedid']
2192                keytype = "fedid"
2193            elif exp.has_key('localname'):
2194                key = exp['localname']
2195                keytype = "localname"
2196            else:
2197                raise service_error(service_error.req, "Unknown lookup type")
2198        else:
2199            raise service_error(service_error.req, "No request?")
2200
2201        self.check_experiment_access(fid, key)
2202
2203        self.state_lock.acquire()
2204        if self.state.has_key(key):
2205            if self.state[key].has_key('vtopo'):
2206                rv = { 'experiment' : {keytype: key },\
2207                        'vtopo': self.state[key]['vtopo'],\
2208                    }
2209            else:
2210                state = self.state[key]['experimentStatus']
2211        self.state_lock.release()
2212
2213        if rv: return rv
2214        else: 
2215            if state:
2216                raise service_error(service_error.partial, 
2217                        "Not ready: %s" % state)
2218            else:
2219                raise service_error(service_error.req, "No such experiment")
2220
2221    def get_vis(self, req, fid):
2222        """
2223        Return the stored visualization for this experiment
2224        """
2225        rv = None
2226        state = None
2227
2228        req = req.get('VisRequestBody', None)
2229        if not req:
2230            raise service_error(service_error.req,
2231                    "Bad request format (no VisRequestBody)")
2232        exp = req.get('experiment', None)
2233        if exp:
2234            if exp.has_key('fedid'):
2235                key = exp['fedid']
2236                keytype = "fedid"
2237            elif exp.has_key('localname'):
2238                key = exp['localname']
2239                keytype = "localname"
2240            else:
2241                raise service_error(service_error.req, "Unknown lookup type")
2242        else:
2243            raise service_error(service_error.req, "No request?")
2244
2245        self.check_experiment_access(fid, key)
2246
2247        self.state_lock.acquire()
2248        if self.state.has_key(key):
2249            if self.state[key].has_key('vis'):
2250                rv =  { 'experiment' : {keytype: key },\
2251                        'vis': self.state[key]['vis'],\
2252                        }
2253            else:
2254                state = self.state[key]['experimentStatus']
2255        self.state_lock.release()
2256
2257        if rv: return rv
2258        else:
2259            if state:
2260                raise service_error(service_error.partial, 
2261                        "Not ready: %s" % state)
2262            else:
2263                raise service_error(service_error.req, "No such experiment")
2264
2265    def clean_info_response(self, rv):
2266        """
2267        Remove the information in the experiment's state object that is not in
2268        the info response.
2269        """
2270        # Remove the owner info (should always be there, but...)
2271        if rv.has_key('owner'): del rv['owner']
2272
2273        # Convert the log into the allocationLog parameter and remove the
2274        # log entry (with defensive programming)
2275        if rv.has_key('log'):
2276            rv['allocationLog'] = "".join(rv['log'])
2277            del rv['log']
2278        else:
2279            rv['allocationLog'] = ""
2280
2281        if rv['experimentStatus'] != 'active':
2282            if rv.has_key('federant'): del rv['federant']
2283        else:
2284            # remove the allocationID and uri info from each federant
2285            for f in rv.get('federant', []):
2286                if f.has_key('allocID'): del f['allocID']
2287                if f.has_key('uri'): del f['uri']
2288
2289        return rv
2290
2291    def get_info(self, req, fid):
2292        """
2293        Return all the stored info about this experiment
2294        """
2295        rv = None
2296
2297        req = req.get('InfoRequestBody', None)
2298        if not req:
2299            raise service_error(service_error.req,
2300                    "Bad request format (no InfoRequestBody)")
2301        exp = req.get('experiment', None)
2302        if exp:
2303            if exp.has_key('fedid'):
2304                key = exp['fedid']
2305                keytype = "fedid"
2306            elif exp.has_key('localname'):
2307                key = exp['localname']
2308                keytype = "localname"
2309            else:
2310                raise service_error(service_error.req, "Unknown lookup type")
2311        else:
2312            raise service_error(service_error.req, "No request?")
2313
2314        self.check_experiment_access(fid, key)
2315
2316        # The state may be massaged by the service function that called
2317        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
2318        # state.
2319        self.state_lock.acquire()
2320        if self.state.has_key(key):
2321            rv = copy.deepcopy(self.state[key])
2322        self.state_lock.release()
2323
2324        if rv:
2325            return self.clean_info_response(rv)
2326        else:
2327            raise service_error(service_error.req, "No such experiment")
2328
2329    def get_multi_info(self, req, fid):
2330        """
2331        Return all the stored info that this fedid can access
2332        """
2333        rv = { 'info': [ ] }
2334
2335        self.state_lock.acquire()
2336        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2337            try:
2338                self.check_experiment_access(fid, key)
2339            except service_error, e:
2340                if e.code == service_error.access:
2341                    continue
2342                else:
2343                    self.state_lock.release()
2344                    raise e
2345
2346            if self.state.has_key(key):
2347                e = copy.deepcopy(self.state[key])
2348                e = self.clean_info_response(e)
2349                rv['info'].append(e)
2350        self.state_lock.release()
2351        return rv
2352
2353    def remove_dirs(self, dir):
2354        """
2355        Remove the directory tree and all files rooted at dir.  Log any errors,
2356        but continue.
2357        """
2358        self.log.debug("[removedirs]: removing %s" % dir)
2359        try:
2360            for path, dirs, files in os.walk(dir, topdown=False):
2361                for f in files:
2362                    os.remove(os.path.join(path, f))
2363                for d in dirs:
2364                    os.rmdir(os.path.join(path, d))
2365            os.rmdir(dir)
2366        except EnvironmentError, e:
2367            self.log.error("Error deleting directory tree in %s" % e);
2368
2369    @staticmethod
2370    def make_temp_certfile(expcert, tmpdir):
2371        """
2372        make a protected copy of the access certificate so the experiment
2373        controller can act as the experiment principal.  mkstemp is the most
2374        secure way to do that. The directory should be created by
2375        mkdtemp.  Return the filename.
2376        """
2377        if expcert and tmpdir:
2378            try:
2379                certf, certfn = tempfile.mkstemp(suffix=".pem", dir=tmpdir)
2380                f = os.fdopen(certf, 'w')
2381                print >> f, expcert
2382                f.close()
2383            except EnvironmentError, e:
2384                raise service_error(service_error.internal, 
2385                        "Cannot create temp cert file?")
2386            return certfn
2387        else:
2388            return None
2389
2390    def terminate_experiment(self, req, fid):
2391        """
2392        Swap this experiment out on the federants and delete the shared
2393        information
2394        """
2395        tbparams = { }
2396        req = req.get('TerminateRequestBody', None)
2397        if not req:
2398            raise service_error(service_error.req,
2399                    "Bad request format (no TerminateRequestBody)")
2400        force = req.get('force', False)
2401        exp = req.get('experiment', None)
2402        if exp:
2403            if exp.has_key('fedid'):
2404                key = exp['fedid']
2405                keytype = "fedid"
2406            elif exp.has_key('localname'):
2407                key = exp['localname']
2408                keytype = "localname"
2409            else:
2410                raise service_error(service_error.req, "Unknown lookup type")
2411        else:
2412            raise service_error(service_error.req, "No request?")
2413
2414        self.check_experiment_access(fid, key)
2415
2416        dealloc_list = [ ]
2417
2418
2419        # Create a logger that logs to the dealloc_list as well as to the main
2420        # log file.
2421        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2422        h = logging.StreamHandler(self.list_log(dealloc_list))
2423        # XXX: there should be a global one of these rather than repeating the
2424        # code.
2425        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2426                    '%d %b %y %H:%M:%S'))
2427        dealloc_log.addHandler(h)
2428
2429        self.state_lock.acquire()
2430        fed_exp = self.state.get(key, None)
2431        repo = None
2432
2433        if fed_exp:
2434            # This branch of the conditional holds the lock to generate a
2435            # consistent temporary tbparams variable to deallocate experiments.
2436            # It releases the lock to do the deallocations and reacquires it to
2437            # remove the experiment state when the termination is complete.
2438
2439            # First make sure that the experiment creation is complete.
2440            status = fed_exp.get('experimentStatus', None)
2441
2442            if status:
2443                if status in ('starting', 'terminating'):
2444                    if not force:
2445                        self.state_lock.release()
2446                        raise service_error(service_error.partial, 
2447                                'Experiment still being created or destroyed')
2448                    else:
2449                        self.log.warning('Experiment in %s state ' % status + \
2450                                'being terminated by force.')
2451            else:
2452                # No status??? trouble
2453                self.state_lock.release()
2454                raise service_error(service_error.internal,
2455                        "Experiment has no status!?")
2456
2457            ids = []
2458            #  experimentID is a list of dicts that are self-describing
2459            #  identifiers.  This finds all the fedids and localnames - the
2460            #  keys of self.state - and puts them into ids.
2461            for id in fed_exp.get('experimentID', []):
2462                if id.has_key('fedid'): 
2463                    ids.append(id['fedid'])
2464                    repo = "%s" % id['fedid']
2465                if id.has_key('localname'): ids.append(id['localname'])
2466
2467            # Get the experimentAccess - the principal for this experiment.  It
2468            # is this principal to which credentials have been delegated, and
2469            # as which the experiment controller must act.
2470            if 'experimentAccess' in self.state[key] and \
2471                    'X509' in self.state[key]['experimentAccess']:
2472                expcert = self.state[key]['experimentAccess']['X509']
2473            else:
2474                expcert = None
2475            # Collect the allocation/segment ids into a dict keyed by the fedid
2476            # of the allocation (or a monotonically increasing integer) that
2477            # contains a tuple of uri, aid (which is a dict...)
2478            for i, fed in enumerate(fed_exp.get('federant', [])):
2479                try:
2480                    uri = fed['uri']
2481                    aid = fed['allocID']
2482                    k = fed['allocID'].get('fedid', i)
2483                except KeyError, e:
2484                    continue
2485                tbparams[k] = (uri, aid)
2486            fed_exp['experimentStatus'] = 'terminating'
2487            if self.state_filename: self.write_state()
2488            self.state_lock.release()
2489
2490            try:
2491                tmpdir = tempfile.mkdtemp(prefix="split-")
2492            except EnvironmentError:
2493                raise service_error(service_error.internal, 
2494                        "Cannot create tmp dir")
2495            # This try block makes sure the tempdir is cleared
2496            try:
2497                # If no expcert, try the deallocation as the experiment
2498                # controller instance.
2499                if expcert and self.auth_type != 'legacy': 
2500                    cert_file = self.make_temp_certfile(expcert, tmpdir)
2501                    pw = None
2502                else: 
2503                    cert_file = self.cert_file
2504                    pw = self.cert_pwd
2505
2506                # Stop everyone.  NB, wait_for_all waits until a thread starts
2507                # and then completes, so we can't wait if nothing starts.  So,
2508                # no tbparams, no start.
2509                if len(tbparams) > 0:
2510                    thread_pool = self.thread_pool(self.nthreads)
2511                    for k in tbparams.keys():
2512                        # Create and start a thread to stop the segment
2513                        thread_pool.wait_for_slot()
2514                        uri, aid = tbparams[k]
2515                        t  = self.pooled_thread(\
2516                                target=self.terminate_segment(log=dealloc_log,
2517                                    testbed=uri,
2518                                    cert_file=cert_file, 
2519                                    cert_pwd=pw,
2520                                    trusted_certs=self.trusted_certs,
2521                                    caller=self.call_TerminateSegment),
2522                                args=(uri, aid), name=k,
2523                                pdata=thread_pool, trace_file=self.trace_file)
2524                        t.start()
2525                    # Wait for completions
2526                    thread_pool.wait_for_all_done()
2527
2528                # release the allocations (failed experiments have done this
2529                # already, and starting experiments may be in odd states, so we
2530                # ignore errors releasing those allocations
2531                try: 
2532                    for k in tbparams.keys():
2533                        # This releases access by uri
2534                        uri, aid = tbparams[k]
2535                        self.release_access(None, aid, uri=uri,
2536                                cert_file=cert_file, cert_pwd=pw)
2537                except service_error, e:
2538                    if status != 'failed' and not force:
2539                        raise e
2540
2541            # Clean up the tmpdir no matter what
2542            finally:
2543                self.remove_dirs(tmpdir)
2544
2545            # Remove the terminated experiment
2546            self.state_lock.acquire()
2547            for id in ids:
2548                if self.state.has_key(id): del self.state[id]
2549
2550            if self.state_filename: self.write_state()
2551            self.state_lock.release()
2552
2553            # Delete any synch points associated with this experiment.  All
2554            # synch points begin with the fedid of the experiment.
2555            fedid_keys = set(["fedid:%s" % f for f in ids \
2556                    if isinstance(f, fedid)])
2557            for k in self.synch_store.all_keys():
2558                try:
2559                    if len(k) > 45 and k[0:46] in fedid_keys:
2560                        self.synch_store.del_value(k)
2561                except synch_store.BadDeletionError:
2562                    pass
2563            self.write_store()
2564
2565            # Remove software and other cached stuff from the filesystem.
2566            if repo:
2567                self.remove_dirs("%s/%s" % (self.repodir, repo))
2568               
2569            return { 
2570                    'experiment': exp , 
2571                    'deallocationLog': "".join(dealloc_list),
2572                    }
2573        else:
2574            # Don't forget to release the lock
2575            self.state_lock.release()
2576            raise service_error(service_error.req, "No saved state")
2577
2578
2579    def GetValue(self, req, fid):
2580        """
2581        Get a value from the synchronized store
2582        """
2583        req = req.get('GetValueRequestBody', None)
2584        if not req:
2585            raise service_error(service_error.req,
2586                    "Bad request format (no GetValueRequestBody)")
2587       
2588        name = req['name']
2589        wait = req['wait']
2590        rv = { 'name': name }
2591
2592        if self.auth.check_attribute(fid, name):
2593            self.log.debug("[GetValue] asking for %s " % name)
2594            try:
2595                v = self.synch_store.get_value(name, wait)
2596            except synch_store.RevokedKeyError:
2597                # No more synch on this key
2598                raise service_error(service_error.federant, 
2599                        "Synch key %s revoked" % name)
2600            if v is not None:
2601                rv['value'] = v
2602            self.log.debug("[GetValue] got %s from %s" % (v, name))
2603            return rv
2604        else:
2605            raise service_error(service_error.access, "Access Denied")
2606       
2607
2608    def SetValue(self, req, fid):
2609        """
2610        Set a value in the synchronized store
2611        """
2612        req = req.get('SetValueRequestBody', None)
2613        if not req:
2614            raise service_error(service_error.req,
2615                    "Bad request format (no SetValueRequestBody)")
2616       
2617        name = req['name']
2618        v = req['value']
2619
2620        if self.auth.check_attribute(fid, name):
2621            try:
2622                self.synch_store.set_value(name, v)
2623                self.write_store()
2624                self.log.debug("[SetValue] set %s to %s" % (name, v))
2625            except synch_store.CollisionError:
2626                # Translate into a service_error
2627                raise service_error(service_error.req,
2628                        "Value already set: %s" %name)
2629            except synch_store.RevokedKeyError:
2630                # No more synch on this key
2631                raise service_error(service_error.federant, 
2632                        "Synch key %s revoked" % name)
2633            return { 'name': name, 'value': v }
2634        else:
2635            raise service_error(service_error.access, "Access Denied")
Note: See TracBrowser for help on using the repository browser.