source: fedd/federation/experiment_control.py @ 7206e5a

axis_examplecompt_changesinfo-ops
Last change on this file since 7206e5a was 7206e5a, checked in by Ted Faber <faber@…>, 14 years ago

checkpoint: new works pretty well

  • Property mode set to 100644
File size: 83.2 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import Lock, Thread, Condition
21from subprocess import call, Popen, PIPE
22
23from urlparse import urlparse
24from urllib2 import urlopen
25
26from util import *
27from fedid import fedid, generate_fedid
28from remote_service import xmlrpc_handler, soap_handler, service_caller
29from service_error import service_error
30from synch_store import synch_store
31from experiment_partition import experiment_partition
32from authorizer import abac_authorizer
33
34import topdl
35import list_log
36from ip_allocator import ip_allocator
37from ip_addr import ip_addr
38
39
40class nullHandler(logging.Handler):
41    def emit(self, record): pass
42
43fl = logging.getLogger("fedd.experiment_control")
44fl.addHandler(nullHandler())
45
46
47# Right now, no support for composition.
48class federated_service:
49    def __init__(self, name, exporter=None, importers=None, params=None, 
50            reqs=None, portal=None):
51        self.name=name
52        self.exporter=exporter
53        if importers is None: self.importers = []
54        else: self.importers=importers
55        if params is None: self.params = { }
56        else: self.params = params
57        if reqs is None: self.reqs = []
58        else: self.reqs = reqs
59
60        if portal is not None:
61            self.portal = portal
62        else:
63            self.portal = (name in federated_service.needs_portal)
64
65    def __str__(self):
66        return "name %s export %s import %s params %s reqs %s" % \
67                (self.name, self.exporter, self.importers, self.params,
68                        [ (r['name'], r['visibility']) for r in self.reqs] )
69
70    needs_portal = ('SMB', 'seer', 'tmcd', 'project_export', 'seer_master')
71
72class experiment_control_local:
73    """
74    Control of experiments that this system can directly access.
75
76    Includes experiment creation, termination and information dissemination.
77    Thred safe.
78    """
79
80    class ssh_cmd_timeout(RuntimeError): pass
81   
82    class thread_pool:
83        """
84        A class to keep track of a set of threads all invoked for the same
85        task.  Manages the mutual exclusion of the states.
86        """
87        def __init__(self, nthreads):
88            """
89            Start a pool.
90            """
91            self.changed = Condition()
92            self.started = 0
93            self.terminated = 0
94            self.nthreads = nthreads
95
96        def acquire(self):
97            """
98            Get the pool's lock.
99            """
100            self.changed.acquire()
101
102        def release(self):
103            """
104            Release the pool's lock.
105            """
106            self.changed.release()
107
108        def wait(self, timeout = None):
109            """
110            Wait for a pool thread to start or stop.
111            """
112            self.changed.wait(timeout)
113
114        def start(self):
115            """
116            Called by a pool thread to report starting.
117            """
118            self.changed.acquire()
119            self.started += 1
120            self.changed.notifyAll()
121            self.changed.release()
122
123        def terminate(self):
124            """
125            Called by a pool thread to report finishing.
126            """
127            self.changed.acquire()
128            self.terminated += 1
129            self.changed.notifyAll()
130            self.changed.release()
131
132        def clear(self):
133            """
134            Clear all pool data.
135            """
136            self.changed.acquire()
137            self.started = 0
138            self.terminated =0
139            self.changed.notifyAll()
140            self.changed.release()
141
142        def wait_for_slot(self):
143            """
144            Wait until we have a free slot to start another pooled thread
145            """
146            self.acquire()
147            while self.started - self.terminated >= self.nthreads:
148                self.wait()
149            self.release()
150
151        def wait_for_all_done(self, timeout=None):
152            """
153            Wait until all active threads finish (and at least one has
154            started).  If a timeout is given, return after waiting that long
155            for termination.  If all threads are done (and one has started in
156            the since the last clear()) return True, otherwise False.
157            """
158            if timeout:
159                deadline = time.time() + timeout
160            self.acquire()
161            while self.started == 0 or self.started > self.terminated:
162                self.wait(timeout)
163                if timeout:
164                    if time.time() > deadline:
165                        break
166                    timeout = deadline - time.time()
167            self.release()
168            return not (self.started == 0 or self.started > self.terminated)
169
170    class pooled_thread(Thread):
171        """
172        One of a set of threads dedicated to a specific task.  Uses the
173        thread_pool class above for coordination.
174        """
175        def __init__(self, group=None, target=None, name=None, args=(), 
176                kwargs={}, pdata=None, trace_file=None):
177            Thread.__init__(self, group, target, name, args, kwargs)
178            self.rv = None          # Return value of the ops in this thread
179            self.exception = None   # Exception that terminated this thread
180            self.target=target      # Target function to run on start()
181            self.args = args        # Args to pass to target
182            self.kwargs = kwargs    # Additional kw args
183            self.pdata = pdata      # thread_pool for this class
184            # Logger for this thread
185            self.log = logging.getLogger("fedd.experiment_control")
186       
187        def run(self):
188            """
189            Emulate Thread.run, except add pool data manipulation and error
190            logging.
191            """
192            if self.pdata:
193                self.pdata.start()
194
195            if self.target:
196                try:
197                    self.rv = self.target(*self.args, **self.kwargs)
198                except service_error, s:
199                    self.exception = s
200                    self.log.error("Thread exception: %s %s" % \
201                            (s.code_string(), s.desc))
202                except:
203                    self.exception = sys.exc_info()[1]
204                    self.log.error(("Unexpected thread exception: %s" +\
205                            "Trace %s") % (self.exception,\
206                                traceback.format_exc()))
207            if self.pdata:
208                self.pdata.terminate()
209
210    call_RequestAccess = service_caller('RequestAccess')
211    call_ReleaseAccess = service_caller('ReleaseAccess')
212    call_StartSegment = service_caller('StartSegment')
213    call_TerminateSegment = service_caller('TerminateSegment')
214    call_Ns2Topdl = service_caller('Ns2Topdl')
215
216    def __init__(self, config=None, auth=None):
217        """
218        Intialize the various attributes, most from the config object
219        """
220
221        def parse_tarfile_list(tf):
222            """
223            Parse a tarfile list from the configuration.  This is a set of
224            paths and tarfiles separated by spaces.
225            """
226            rv = [ ]
227            if tf is not None:
228                tl = tf.split()
229                while len(tl) > 1:
230                    p, t = tl[0:2]
231                    del tl[0:2]
232                    rv.append((p, t))
233            return rv
234
235        self.thread_with_rv = experiment_control_local.pooled_thread
236        self.thread_pool = experiment_control_local.thread_pool
237        self.list_log = list_log.list_log
238
239        self.cert_file = config.get("experiment_control", "cert_file")
240        if self.cert_file:
241            self.cert_pwd = config.get("experiment_control", "cert_pwd")
242        else:
243            self.cert_file = config.get("globals", "cert_file")
244            self.cert_pwd = config.get("globals", "cert_pwd")
245
246        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
247                or config.get("globals", "trusted_certs")
248
249        self.repodir = config.get("experiment_control", "repodir")
250        self.repo_url = config.get("experiment_control", "repo_url", 
251                "https://users.isi.deterlab.net:23235");
252
253        self.exp_stem = "fed-stem"
254        self.log = logging.getLogger("fedd.experiment_control")
255        set_log_level(config, "experiment_control", self.log)
256        self.muxmax = 2
257        self.nthreads = 10
258        self.randomize_experiments = False
259
260        self.splitter = None
261        self.ssh_keygen = "/usr/bin/ssh-keygen"
262        self.ssh_identity_file = None
263
264
265        self.debug = config.getboolean("experiment_control", "create_debug")
266        self.cleanup = not config.getboolean("experiment_control", 
267                "leave_tmpfiles")
268        self.state_filename = config.get("experiment_control", 
269                "experiment_state")
270        self.store_filename = config.get("experiment_control", 
271                "synch_store")
272        self.store_url = config.get("experiment_control", "store_url")
273        self.splitter_url = config.get("experiment_control", "ns2topdl_uri")
274        self.fedkit = parse_tarfile_list(\
275                config.get("experiment_control", "fedkit"))
276        self.gatewaykit = parse_tarfile_list(\
277                config.get("experiment_control", "gatewaykit"))
278        accessdb_file = config.get("experiment_control", "accessdb")
279
280        self.ssh_pubkey_file = config.get("experiment_control", 
281                "ssh_pubkey_file")
282        self.ssh_privkey_file = config.get("experiment_control",
283                "ssh_privkey_file")
284        dt = config.get("experiment_control", "direct_transit")
285        self.auth_type = config.get('experiment_control', 'auth_type') \
286                or 'legacy'
287        self.auth_dir = config.get('experiment_control', 'auth_dir')
288        if dt: self.direct_transit = [ tb.strip() for tb in dt.split(",")]
289        else: self.direct_transit = [ ]
290        # NB for internal master/slave ops, not experiment setup
291        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
292
293        self.overrides = set([])
294        ovr = config.get('experiment_control', 'overrides')
295        if ovr:
296            for o in ovr.split(","):
297                o = o.strip()
298                if o.startswith('fedid:'): o = o[len('fedid:'):]
299                self.overrides.add(fedid(hexstr=o))
300
301        self.state = { }
302        self.state_lock = Lock()
303        self.tclsh = "/usr/local/bin/otclsh"
304        self.tcl_splitter = config.get("ns2topdl", "tcl_splitter") or \
305                config.get("experiment_control", "tcl_splitter",
306                        "/usr/testbed/lib/ns2ir/parse.tcl")
307        mapdb_file = config.get("experiment_control", "mapdb")
308        self.trace_file = sys.stderr
309
310        self.def_expstart = \
311                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
312                "/tmp/federate";
313        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
314                "FEDDIR/hosts";
315        self.def_gwstart = \
316                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
317                "/tmp/bridge.log";
318        self.def_mgwstart = \
319                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
320                "/tmp/bridge.log";
321        self.def_gwimage = "FBSD61-TUNNEL2";
322        self.def_gwtype = "pc";
323        self.local_access = { }
324
325        if self.auth_type == 'legacy':
326            if auth:
327                self.auth = auth
328            else:
329                self.log.error( "[access]: No authorizer initialized, " +\
330                        "creating local one.")
331                auth = authorizer()
332        elif self.auth_type == 'abac':
333            self.auth = abac_authorizer(load=self.auth_dir)
334        else:
335            raise service_error(service_error.internal, 
336                    "Unknown auth_type: %s" % self.auth_type)
337
338
339        if self.ssh_pubkey_file:
340            try:
341                f = open(self.ssh_pubkey_file, 'r')
342                self.ssh_pubkey = f.read()
343                f.close()
344            except EnvironmentError:
345                raise service_error(service_error.internal,
346                        "Cannot read sshpubkey")
347        else:
348            raise service_error(service_error.internal, 
349                    "No SSH public key file?")
350
351        if not self.ssh_privkey_file:
352            raise service_error(service_error.internal, 
353                    "No SSH public key file?")
354
355
356        if mapdb_file:
357            self.read_mapdb(mapdb_file)
358        else:
359            self.log.warn("[experiment_control] No testbed map, using defaults")
360            self.tbmap = { 
361                    'deter':'https://users.isi.deterlab.net:23235',
362                    'emulab':'https://users.isi.deterlab.net:23236',
363                    'ucb':'https://users.isi.deterlab.net:23237',
364                    }
365
366        if accessdb_file:
367                self.read_accessdb(accessdb_file)
368        else:
369            raise service_error(service_error.internal,
370                    "No accessdb specified in config")
371
372        # Grab saved state.  OK to do this w/o locking because it's read only
373        # and only one thread should be in existence that can see self.state at
374        # this point.
375        if self.state_filename:
376            self.read_state()
377
378        if self.store_filename:
379            self.read_store()
380        else:
381            self.log.warning("No saved synch store")
382            self.synch_store = synch_store
383
384        # Dispatch tables
385        self.soap_services = {\
386                'New': soap_handler('New', self.new_experiment),
387                'Create': soap_handler('Create', self.create_experiment),
388                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
389                'Vis': soap_handler('Vis', self.get_vis),
390                'Info': soap_handler('Info', self.get_info),
391                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
392                'Terminate': soap_handler('Terminate', 
393                    self.terminate_experiment),
394                'GetValue': soap_handler('GetValue', self.GetValue),
395                'SetValue': soap_handler('SetValue', self.SetValue),
396        }
397
398        self.xmlrpc_services = {\
399                'New': xmlrpc_handler('New', self.new_experiment),
400                'Create': xmlrpc_handler('Create', self.create_experiment),
401                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
402                'Vis': xmlrpc_handler('Vis', self.get_vis),
403                'Info': xmlrpc_handler('Info', self.get_info),
404                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
405                'Terminate': xmlrpc_handler('Terminate',
406                    self.terminate_experiment),
407                'GetValue': xmlrpc_handler('GetValue', self.GetValue),
408                'SetValue': xmlrpc_handler('SetValue', self.SetValue),
409        }
410
411    # Call while holding self.state_lock
412    def write_state(self):
413        """
414        Write a new copy of experiment state after copying the existing state
415        to a backup.
416
417        State format is a simple pickling of the state dictionary.
418        """
419        if os.access(self.state_filename, os.W_OK):
420            copy_file(self.state_filename, \
421                    "%s.bak" % self.state_filename)
422        try:
423            f = open(self.state_filename, 'w')
424            pickle.dump(self.state, f)
425        except EnvironmentError, e:
426            self.log.error("Can't write file %s: %s" % \
427                    (self.state_filename, e))
428        except pickle.PicklingError, e:
429            self.log.error("Pickling problem: %s" % e)
430        except TypeError, e:
431            self.log.error("Pickling problem (TypeError): %s" % e)
432
433    @staticmethod
434    def get_alloc_ids(state):
435        """
436        Pull the fedids of the identifiers of each allocation from the
437        state.  Again, a dict dive that's best isolated.
438
439        Used by read_store and read state
440        """
441
442        return [ f['allocID']['fedid'] 
443                for f in state.get('federant',[]) \
444                    if f.has_key('allocID') and \
445                        f['allocID'].has_key('fedid')]
446
447    # Call while holding self.state_lock
448    def read_state(self):
449        """
450        Read a new copy of experiment state.  Old state is overwritten.
451
452        State format is a simple pickling of the state dictionary.
453        """
454       
455        def get_experiment_id(state):
456            """
457            Pull the fedid experimentID out of the saved state.  This is kind
458            of a gross walk through the dict.
459            """
460
461            if state.has_key('experimentID'):
462                for e in state['experimentID']:
463                    if e.has_key('fedid'):
464                        return e['fedid']
465                else:
466                    return None
467            else:
468                return None
469
470        try:
471            f = open(self.state_filename, "r")
472            self.state = pickle.load(f)
473            self.log.debug("[read_state]: Read state from %s" % \
474                    self.state_filename)
475        except EnvironmentError, e:
476            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
477                    % (self.state_filename, e))
478        except pickle.UnpicklingError, e:
479            self.log.warning(("[read_state]: No saved state: " + \
480                    "Unpickling failed: %s") % e)
481       
482        for s in self.state.values():
483            try:
484
485                eid = get_experiment_id(s)
486                if eid : 
487                    if self.auth_type == 'legacy':
488                        # XXX: legacy
489                        # Give the owner rights to the experiment
490                        self.auth.set_attribute(s['owner'], eid)
491                        # And holders of the eid as well
492                        self.auth.set_attribute(eid, eid)
493                        # allow overrides to control experiments as well
494                        for o in self.overrides:
495                            self.auth.set_attribute(o, eid)
496                        # Set permissions to allow reading of the software
497                        # repo, if any, as well.
498                        for a in self.get_alloc_ids(s):
499                            self.auth.set_attribute(a, 'repo/%s' % eid)
500                else:
501                    raise KeyError("No experiment id")
502            except KeyError, e:
503                self.log.warning("[read_state]: State ownership or identity " +\
504                        "misformatted in %s: %s" % (self.state_filename, e))
505
506
507    def read_accessdb(self, accessdb_file):
508        """
509        Read the mapping from fedids that can create experiments to their name
510        in the 3-level access namespace.  All will be asserted from this
511        testbed and can include the local username and porject that will be
512        asserted on their behalf by this fedd.  Each fedid is also added to the
513        authorization system with the "create" attribute.
514        """
515        self.accessdb = {}
516        # These are the regexps for parsing the db
517        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
518        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
519                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
520        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
521                "\s*->\s*(" + name_expr + ")\s*$")
522        lineno = 0
523
524        # Parse the mappings and store in self.authdb, a dict of
525        # fedid -> (proj, user)
526        try:
527            f = open(accessdb_file, "r")
528            for line in f:
529                lineno += 1
530                line = line.strip()
531                if len(line) == 0 or line.startswith('#'):
532                    continue
533                m = project_line.match(line)
534                if m:
535                    fid = fedid(hexstr=m.group(1))
536                    project, user = m.group(2,3)
537                    if not self.accessdb.has_key(fid):
538                        self.accessdb[fid] = []
539                    self.accessdb[fid].append((project, user))
540                    continue
541
542                m = user_line.match(line)
543                if m:
544                    fid = fedid(hexstr=m.group(1))
545                    project = None
546                    user = m.group(2)
547                    if not self.accessdb.has_key(fid):
548                        self.accessdb[fid] = []
549                    self.accessdb[fid].append((project, user))
550                    continue
551                self.log.warn("[experiment_control] Error parsing access " +\
552                        "db %s at line %d" %  (accessdb_file, lineno))
553        except EnvironmentError:
554            raise service_error(service_error.internal,
555                    ("Error opening/reading %s as experiment " +\
556                            "control accessdb") %  accessdb_file)
557        f.close()
558
559        # Initialize the authorization attributes
560        # XXX: legacy
561        if self.auth_type == 'legacy':
562            for fid in self.accessdb.keys():
563                self.auth.set_attribute(fid, 'create')
564                self.auth.set_attribute(fid, 'new')
565
566    def read_mapdb(self, file):
567        """
568        Read a simple colon separated list of mappings for the
569        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
570        """
571
572        self.tbmap = { }
573        lineno =0
574        try:
575            f = open(file, "r")
576            for line in f:
577                lineno += 1
578                line = line.strip()
579                if line.startswith('#') or len(line) == 0:
580                    continue
581                try:
582                    label, url = line.split(':', 1)
583                    self.tbmap[label] = url
584                except ValueError, e:
585                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
586                            "map db: %s %s" % (lineno, line, e))
587        except EnvironmentError, e:
588            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
589                    "open %s: %s" % (file, e))
590        f.close()
591
592    def read_store(self):
593        try:
594            self.synch_store = synch_store()
595            self.synch_store.load(self.store_filename)
596            self.log.debug("[read_store]: Read store from %s" % \
597                    self.store_filename)
598        except EnvironmentError, e:
599            self.log.warning("[read_store]: No saved store: Can't open %s: %s"\
600                    % (self.state_filename, e))
601            self.synch_store = synch_store()
602
603        # Set the initial permissions on data in the store.  XXX: This ad hoc
604        # authorization attribute initialization is getting out of hand.
605        # XXX: legacy
606        if self.auth_type == 'legacy':
607            for k in self.synch_store.all_keys():
608                try:
609                    if k.startswith('fedid:'):
610                        fid = fedid(hexstr=k[6:46])
611                        if self.state.has_key(fid):
612                            for a in self.get_alloc_ids(self.state[fid]):
613                                self.auth.set_attribute(a, k)
614                except ValueError, e:
615                    self.log.warn("Cannot deduce permissions for %s" % k)
616
617
618    def write_store(self):
619        """
620        Write a new copy of synch_store after writing current state
621        to a backup.  We use the internal synch_store pickle method to avoid
622        incinsistent data.
623
624        State format is a simple pickling of the store.
625        """
626        if os.access(self.store_filename, os.W_OK):
627            copy_file(self.store_filename, \
628                    "%s.bak" % self.store_filename)
629        try:
630            self.synch_store.save(self.store_filename)
631        except EnvironmentError, e:
632            self.log.error("Can't write file %s: %s" % \
633                    (self.store_filename, e))
634        except TypeError, e:
635            self.log.error("Pickling problem (TypeError): %s" % e)
636
637       
638    def generate_ssh_keys(self, dest, type="rsa" ):
639        """
640        Generate a set of keys for the gateways to use to talk.
641
642        Keys are of type type and are stored in the required dest file.
643        """
644        valid_types = ("rsa", "dsa")
645        t = type.lower();
646        if t not in valid_types: raise ValueError
647        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
648
649        try:
650            trace = open("/dev/null", "w")
651        except EnvironmentError:
652            raise service_error(service_error.internal,
653                    "Cannot open /dev/null??");
654
655        # May raise CalledProcessError
656        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
657        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
658        if rv != 0:
659            raise service_error(service_error.internal, 
660                    "Cannot generate nonce ssh keys.  %s return code %d" \
661                            % (self.ssh_keygen, rv))
662
663    def gentopo(self, str):
664        """
665        Generate the topology dtat structure from the splitter's XML
666        representation of it.
667
668        The topology XML looks like:
669            <experiment>
670                <nodes>
671                    <node><vname></vname><ips>ip1:ip2</ips></node>
672                </nodes>
673                <lans>
674                    <lan>
675                        <vname></vname><vnode></vnode><ip></ip>
676                        <bandwidth></bandwidth><member>node:port</member>
677                    </lan>
678                </lans>
679        """
680        class topo_parse:
681            """
682            Parse the topology XML and create the dats structure.
683            """
684            def __init__(self):
685                # Typing of the subelements for data conversion
686                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
687                self.int_subelements = ( 'bandwidth',)
688                self.float_subelements = ( 'delay',)
689                # The final data structure
690                self.nodes = [ ]
691                self.lans =  [ ]
692                self.topo = { \
693                        'node': self.nodes,\
694                        'lan' : self.lans,\
695                    }
696                self.element = { }  # Current element being created
697                self.chars = ""     # Last text seen
698
699            def end_element(self, name):
700                # After each sub element the contents is added to the current
701                # element or to the appropriate list.
702                if name == 'node':
703                    self.nodes.append(self.element)
704                    self.element = { }
705                elif name == 'lan':
706                    self.lans.append(self.element)
707                    self.element = { }
708                elif name in self.str_subelements:
709                    self.element[name] = self.chars
710                    self.chars = ""
711                elif name in self.int_subelements:
712                    self.element[name] = int(self.chars)
713                    self.chars = ""
714                elif name in self.float_subelements:
715                    self.element[name] = float(self.chars)
716                    self.chars = ""
717
718            def found_chars(self, data):
719                self.chars += data.rstrip()
720
721
722        tp = topo_parse();
723        parser = xml.parsers.expat.ParserCreate()
724        parser.EndElementHandler = tp.end_element
725        parser.CharacterDataHandler = tp.found_chars
726
727        parser.Parse(str)
728
729        return tp.topo
730       
731
732    def genviz(self, topo):
733        """
734        Generate the visualization the virtual topology
735        """
736
737        neato = "/usr/local/bin/neato"
738        # These are used to parse neato output and to create the visualization
739        # file.
740        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="([\d\.]+),([\d\.]+)"')
741        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
742                "%s</type></node>"
743
744        try:
745            # Node names
746            nodes = [ n['vname'] for n in topo['node'] ]
747            topo_lans = topo['lan']
748        except KeyError, e:
749            raise service_error(service_error.internal, "Bad topology: %s" %e)
750
751        lans = { }
752        links = { }
753
754        # Walk through the virtual topology, organizing the connections into
755        # 2-node connections (links) and more-than-2-node connections (lans).
756        # When a lan is created, it's added to the list of nodes (there's a
757        # node in the visualization for the lan).
758        for l in topo_lans:
759            if links.has_key(l['vname']):
760                if len(links[l['vname']]) < 2:
761                    links[l['vname']].append(l['vnode'])
762                else:
763                    nodes.append(l['vname'])
764                    lans[l['vname']] = links[l['vname']]
765                    del links[l['vname']]
766                    lans[l['vname']].append(l['vnode'])
767            elif lans.has_key(l['vname']):
768                lans[l['vname']].append(l['vnode'])
769            else:
770                links[l['vname']] = [ l['vnode'] ]
771
772
773        # Open up a temporary file for dot to turn into a visualization
774        try:
775            df, dotname = tempfile.mkstemp()
776            dotfile = os.fdopen(df, 'w')
777        except EnvironmentError:
778            raise service_error(service_error.internal,
779                    "Failed to open file in genviz")
780
781        try:
782            dnull = open('/dev/null', 'w')
783        except EnvironmentError:
784            service_error(service_error.internal,
785                    "Failed to open /dev/null in genviz")
786
787        # Generate a dot/neato input file from the links, nodes and lans
788        try:
789            print >>dotfile, "graph G {"
790            for n in nodes:
791                print >>dotfile, '\t"%s"' % n
792            for l in links.keys():
793                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
794            for l in lans.keys():
795                for n in lans[l]:
796                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
797            print >>dotfile, "}"
798            dotfile.close()
799        except TypeError:
800            raise service_error(service_error.internal,
801                    "Single endpoint link in vtopo")
802        except EnvironmentError:
803            raise service_error(service_error.internal, "Cannot write dot file")
804
805        # Use dot to create a visualization
806        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
807                '-Gpack=true', dotname], stdout=PIPE, stderr=dnull,
808                close_fds=True)
809        dnull.close()
810
811        # Translate dot to vis format
812        vis_nodes = [ ]
813        vis = { 'node': vis_nodes }
814        for line in dot.stdout:
815            m = vis_re.match(line)
816            if m:
817                vn = m.group(1)
818                vis_node = {'name': vn, \
819                        'x': float(m.group(2)),\
820                        'y' : float(m.group(3)),\
821                    }
822                if vn in links.keys() or vn in lans.keys():
823                    vis_node['type'] = 'lan'
824                else:
825                    vis_node['type'] = 'node'
826                vis_nodes.append(vis_node)
827        rv = dot.wait()
828
829        os.remove(dotname)
830        if rv == 0 : return vis
831        else: return None
832
833    def get_access(self, tb, nodes, tbparam, access_user, masters, tbmap):
834        """
835        Get access to testbed through fedd and set the parameters for that tb
836        """
837        def get_export_project(svcs):
838            """
839            Look through for the list of federated_service for this testbed
840            objects for a project_export service, and extract the project
841            parameter.
842            """
843
844            pe = [s for s in svcs if s.name=='project_export']
845            if len(pe) == 1:
846                return pe[0].params.get('project', None)
847            elif len(pe) == 0:
848                return None
849            else:
850                raise service_error(service_error.req,
851                        "More than one project export is not supported")
852
853        uri = tbmap.get(testbed_base(tb), None)
854        if not uri:
855            raise service_error(service_error.server_config, 
856                    "Unknown testbed: %s" % tb)
857
858        export_svcs = masters.get(tb,[])
859        import_svcs = [ s for m in masters.values() \
860                for s in m \
861                    if tb in s.importers ]
862
863        export_project = get_export_project(export_svcs)
864
865        # Tweak search order so that if there are entries in access_user that
866        # have a project matching the export project, we try them first
867        if export_project: 
868            access_sequence = [ (p, u) for p, u in access_user \
869                    if p == export_project] 
870            access_sequence.extend([(p, u) for p, u in access_user \
871                    if p != export_project]) 
872        else: 
873            access_sequence = access_user
874
875        for p, u in access_sequence: 
876            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
877                    "to %s") %  ((p or "None"), u, uri))
878
879            if p:
880                # Request with user and project specified
881                req = {\
882                        'credential': [ "project: %s" % p, "user: %s"  % u],
883                    }
884            else:
885                # Request with only user specified
886                req = {\
887                        'credential': [ 'user: %s' % u ],
888                    }
889
890            # Make the service request from the services we're importing and
891            # exporting.  Keep track of the export request ids so we can
892            # collect the resulting info from the access response.
893            e_keys = { }
894            if import_svcs or export_svcs:
895                req['service'] = [ ]
896
897                for i, s in enumerate(import_svcs):
898                    idx = 'import%d' % i
899                    sr = {'id': idx, 'name': s.name, 'visibility': 'import' }
900                    if s.params:
901                        sr['fedAttr'] = [ { 'attribute': k, 'value': v } \
902                                for k, v in s.params.items()]
903                    req['service'].append(sr)
904
905                for i, s in enumerate(export_svcs):
906                    idx = 'export%d' % i
907                    e_keys[idx] = s
908                    sr = {'id': idx, 'name': s.name, 'visibility': 'export' }
909                    if s.params:
910                        sr['fedAttr'] = [ { 'attribute': k, 'value': v } 
911                                for k, v in s.params.items()]
912                    req['service'].append(sr)
913
914            # node resources if any
915            if nodes != None and len(nodes) > 0:
916                rnodes = [ ]
917                for n in nodes:
918                    rn = { }
919                    image, hw, count = n.split(":")
920                    if image: rn['image'] = [ image ]
921                    if hw: rn['hardware'] = [ hw ]
922                    if count and int(count) >0 : rn['count'] = int(count)
923                    rnodes.append(rn)
924                req['resources']= { }
925                req['resources']['node'] = rnodes
926
927            try:
928                if self.local_access.has_key(uri):
929                    # Local access call
930                    req = { 'RequestAccessRequestBody' : req }
931                    r = self.local_access[uri].RequestAccess(req, 
932                            fedid(file=self.cert_file))
933                    r = { 'RequestAccessResponseBody' : r }
934                else:
935                    r = self.call_RequestAccess(uri, req, 
936                            self.cert_file, self.cert_pwd, self.trusted_certs)
937            except service_error, e:
938                if e.code == service_error.access:
939                    self.log.debug("[get_access] Access denied")
940                    r = None
941                    continue
942                else:
943                    raise e
944
945            if r.has_key('RequestAccessResponseBody'):
946                # Through to here we have a valid response, not a fault.
947                # Access denied is a fault, so something better or worse than
948                # access denied has happened.
949                r = r['RequestAccessResponseBody']
950                self.log.debug("[get_access] Access granted")
951                break
952            else:
953                raise service_error(service_error.protocol,
954                        "Bad proxy response")
955       
956        if not r:
957            raise service_error(service_error.access, 
958                    "Access denied by %s (%s)" % (tb, uri))
959
960        tbparam[tb] = { 
961                "allocID" : r['allocID'],
962                "uri": uri,
963                }
964
965        # Collect the responses corresponding to the services this testbed
966        # exports.  These will be the service requests that we will include in
967        # the start segment requests (with appropriate visibility values) to
968        # import and export the segments.
969        for s in r.get('service', []):
970            id = s.get('id', None)
971            if id and id in e_keys:
972                e_keys[id].reqs.append(s)
973
974        # Add attributes to parameter space.  We don't allow attributes to
975        # overlay any parameters already installed.
976        for a in r.get('fedAttr', []):
977            try:
978                if a['attribute'] and \
979                        isinstance(a['attribute'], basestring)\
980                        and not tbparam[tb].has_key(a['attribute'].lower()):
981                    tbparam[tb][a['attribute'].lower()] = a['value']
982            except KeyError:
983                self.log.error("Bad attribute in response: %s" % a)
984
985    def release_access(self, tb, aid, tbmap=None, uri=None):
986        """
987        Release access to testbed through fedd
988        """
989
990        if not uri and tbmap:
991            uri = tbmap.get(tb, None)
992        if not uri:
993            raise service_error(service_error.server_config, 
994                    "Unknown testbed: %s" % tb)
995
996        if self.local_access.has_key(uri):
997            resp = self.local_access[uri].ReleaseAccess(\
998                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
999                    fedid(file=self.cert_file))
1000            resp = { 'ReleaseAccessResponseBody': resp } 
1001        else:
1002            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
1003                    self.cert_file, self.cert_pwd, self.trusted_certs)
1004
1005        # better error coding
1006
1007    def remote_ns2topdl(self, uri, desc):
1008
1009        req = {
1010                'description' : { 'ns2description': desc },
1011            }
1012
1013        r = self.call_Ns2Topdl(uri, req, self.cert_file, self.cert_pwd, 
1014                self.trusted_certs)
1015
1016        if r.has_key('Ns2TopdlResponseBody'):
1017            r = r['Ns2TopdlResponseBody']
1018            ed = r.get('experimentdescription', None)
1019            if ed.has_key('topdldescription'):
1020                return topdl.Topology(**ed['topdldescription'])
1021            else:
1022                raise service_error(service_error.protocol, 
1023                        "Bad splitter response (no output)")
1024        else:
1025            raise service_error(service_error.protocol, "Bad splitter response")
1026
1027    class start_segment:
1028        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
1029                cert_pwd=None, trusted_certs=None, caller=None,
1030                log_collector=None):
1031            self.log = log
1032            self.debug = debug
1033            self.cert_file = cert_file
1034            self.cert_pwd = cert_pwd
1035            self.trusted_certs = None
1036            self.caller = caller
1037            self.testbed = testbed
1038            self.log_collector = log_collector
1039            self.response = None
1040            self.node = { }
1041
1042        def make_map(self, resp):
1043            for e in resp.get('embedding', []):
1044                if 'toponame' in e and 'physname' in e:
1045                    self.node[e['toponame']] = e['physname'][0]
1046
1047        def __call__(self, uri, aid, topo, masters, attrs=None, connInfo=None):
1048            req = {
1049                    'allocID': { 'fedid' : aid }, 
1050                    'segmentdescription': { 
1051                        'topdldescription': topo.to_dict(),
1052                    },
1053                }
1054
1055            if connInfo:
1056                req['connection'] = connInfo
1057
1058            import_svcs = [ s for m in masters.values() \
1059                    for s in m if self.testbed in s.importers]
1060
1061            if import_svcs or self.testbed in masters:
1062                req['service'] = []
1063
1064            for s in import_svcs:
1065                for r in s.reqs:
1066                    sr = copy.deepcopy(r)
1067                    sr['visibility'] = 'import';
1068                    req['service'].append(sr)
1069
1070            for s in masters.get(self.testbed, []):
1071                for r in s.reqs:
1072                    sr = copy.deepcopy(r)
1073                    sr['visibility'] = 'export';
1074                    req['service'].append(sr)
1075
1076            if attrs:
1077                req['fedAttr'] = attrs
1078
1079            try:
1080                self.log.debug("Calling StartSegment at %s " % uri)
1081                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
1082                        self.trusted_certs)
1083                if r.has_key('StartSegmentResponseBody'):
1084                    lval = r['StartSegmentResponseBody'].get('allocationLog',
1085                            None)
1086                    if lval and self.log_collector:
1087                        for line in  lval.splitlines(True):
1088                            self.log_collector.write(line)
1089                    self.make_map(r['StartSegmentResponseBody'])
1090                    self.response = r
1091                else:
1092                    raise service_error(service_error.internal, 
1093                            "Bad response!?: %s" %r)
1094                return True
1095            except service_error, e:
1096                self.log.error("Start segment failed on %s: %s" % \
1097                        (self.testbed, e))
1098                return False
1099
1100
1101
1102    class terminate_segment:
1103        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
1104                cert_pwd=None, trusted_certs=None, caller=None):
1105            self.log = log
1106            self.debug = debug
1107            self.cert_file = cert_file
1108            self.cert_pwd = cert_pwd
1109            self.trusted_certs = None
1110            self.caller = caller
1111            self.testbed = testbed
1112
1113        def __call__(self, uri, aid ):
1114            req = {
1115                    'allocID': aid , 
1116                }
1117            try:
1118                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
1119                        self.trusted_certs)
1120                return True
1121            except service_error, e:
1122                self.log.error("Terminate segment failed on %s: %s" % \
1123                        (self.testbed, e))
1124                return False
1125   
1126
1127    def allocate_resources(self, allocated, masters, eid, expid, 
1128            tbparams, top, topo, tmpdir, alloc_log=None, log_collector=None, 
1129            attrs=None, connInfo={}, tbmap=None):
1130
1131        started = { }           # Testbeds where a sub-experiment started
1132                                # successfully
1133
1134        # XXX
1135        fail_soft = False
1136
1137        if tbmap is None: tbmap = { }
1138
1139        log = alloc_log or self.log
1140
1141        thread_pool = self.thread_pool(self.nthreads)
1142        threads = [ ]
1143        starters = [ ]
1144
1145        for tb in allocated.keys():
1146            # Create and start a thread to start the segment, and save it
1147            # to get the return value later
1148            tb_attrs = copy.copy(attrs)
1149            thread_pool.wait_for_slot()
1150            uri = tbparams[tb].get('uri', tbmap.get(testbed_base(tb), None))
1151            base, suffix = split_testbed(tb)
1152            if suffix:
1153                tb_attrs.append({'attribute': 'experiment_name', 
1154                    'value': "%s-%s" % (eid, suffix)})
1155            else:
1156                tb_attrs.append({'attribute': 'experiment_name', 'value': eid})
1157            if not uri:
1158                raise service_error(service_error.internal, 
1159                        "Unknown testbed %s !?" % tb)
1160
1161            if tbparams[tb].has_key('allocID') and \
1162                    tbparams[tb]['allocID'].has_key('fedid'):
1163                aid = tbparams[tb]['allocID']['fedid']
1164            else:
1165                raise service_error(service_error.internal, 
1166                        "No alloc id for testbed %s !?" % tb)
1167
1168            s = self.start_segment(log=log, debug=self.debug,
1169                    testbed=tb, cert_file=self.cert_file,
1170                    cert_pwd=self.cert_pwd, trusted_certs=self.trusted_certs,
1171                    caller=self.call_StartSegment,
1172                    log_collector=log_collector)
1173            starters.append(s)
1174            t  = self.pooled_thread(\
1175                    target=s, name=tb,
1176                    args=(uri, aid, topo[tb], masters, tb_attrs, connInfo[tb]),
1177                    pdata=thread_pool, trace_file=self.trace_file)
1178            threads.append(t)
1179            t.start()
1180
1181        # Wait until all finish (keep pinging the log, though)
1182        mins = 0
1183        revoked = False
1184        while not thread_pool.wait_for_all_done(60.0):
1185            mins += 1
1186            alloc_log.info("Waiting for sub threads (it has been %d mins)" \
1187                    % mins)
1188            if not revoked and \
1189                    len([ t.getName() for t in threads if t.rv == False]) > 0:
1190                # a testbed has failed.  Revoke this experiment's
1191                # synchronizarion values so that sub experiments will not
1192                # deadlock waiting for synchronization that will never happen
1193                self.log.info("A subexperiment has failed to swap in, " + \
1194                        "revoking synch keys")
1195                var_key = "fedid:%s" % expid
1196                for k in self.synch_store.all_keys():
1197                    if len(k) > 45 and k[0:46] == var_key:
1198                        self.synch_store.revoke_key(k)
1199                revoked = True
1200
1201        failed = [ t.getName() for t in threads if not t.rv ]
1202        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1203
1204        # If one failed clean up, unless fail_soft is set
1205        if failed:
1206            if not fail_soft:
1207                thread_pool.clear()
1208                for tb in succeeded:
1209                    # Create and start a thread to stop the segment
1210                    thread_pool.wait_for_slot()
1211                    uri = tbparams[tb]['uri']
1212                    t  = self.pooled_thread(\
1213                            target=self.terminate_segment(log=log,
1214                                testbed=tb,
1215                                cert_file=self.cert_file, 
1216                                cert_pwd=self.cert_pwd,
1217                                trusted_certs=self.trusted_certs,
1218                                caller=self.call_TerminateSegment),
1219                            args=(uri, tbparams[tb]['federant']['allocID']),
1220                            name=tb,
1221                            pdata=thread_pool, trace_file=self.trace_file)
1222                    t.start()
1223                # Wait until all finish (if any are being stopped)
1224                if succeeded:
1225                    thread_pool.wait_for_all_done()
1226
1227                # release the allocations
1228                for tb in tbparams.keys():
1229                    self.release_access(tb, tbparams[tb]['allocID'], 
1230                            tbmap=tbmap, uri=tbparams[tb].get('uri', None))
1231                # Remove the placeholder
1232                self.state_lock.acquire()
1233                self.state[eid]['experimentStatus'] = 'failed'
1234                if self.state_filename: self.write_state()
1235                self.state_lock.release()
1236                # Remove the repo dir
1237                self.remove_dirs("%s/%s" %(self.repodir, expid))
1238                # Walk up tmpdir, deleting as we go
1239                if self.cleanup:
1240                    self.remove_dirs(tmpdir)
1241                else:
1242                    log.debug("[start_experiment]: not removing %s" % tmpdir)
1243
1244
1245                log.error("Swap in failed on %s" % ",".join(failed))
1246                return
1247        else:
1248            # Walk through the successes and gather the virtual to physical
1249            # mapping.
1250            embedding = [ ]
1251            for s in starters:
1252                for k, v in s.node.items():
1253                    embedding.append({
1254                        'toponame': k, 
1255                        'physname': [ v],
1256                        'testbed': s.testbed
1257                        })
1258            log.info("[start_segment]: Experiment %s active" % eid)
1259
1260
1261        # Walk up tmpdir, deleting as we go
1262        if self.cleanup:
1263            self.remove_dirs(tmpdir)
1264        else:
1265            log.debug("[start_experiment]: not removing %s" % tmpdir)
1266
1267        # Insert the experiment into our state and update the disk copy.
1268        self.state_lock.acquire()
1269        self.state[expid]['experimentStatus'] = 'active'
1270        self.state[eid] = self.state[expid]
1271        self.state[eid]['experimentdescription']['topdldescription'] = \
1272                top.to_dict()
1273        self.state[eid]['embedding'] = embedding
1274        if self.state_filename: self.write_state()
1275        self.state_lock.release()
1276        return
1277
1278
1279    def add_kit(self, e, kit):
1280        """
1281        Add a Software object created from the list of (install, location)
1282        tuples passed as kit  to the software attribute of an object e.  We
1283        do this enough to break out the code, but it's kind of a hack to
1284        avoid changing the old tuple rep.
1285        """
1286
1287        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1288
1289        if isinstance(e.software, list): e.software.extend(s)
1290        else: e.software = s
1291
1292
1293    def create_experiment_state(self, fid, req, expid, expcert,
1294            state='starting'):
1295        """
1296        Create the initial entry in the experiment's state.  The expid and
1297        expcert are the experiment's fedid and certifacte that represents that
1298        ID, which are installed in the experiment state.  If the request
1299        includes a suggested local name that is used if possible.  If the local
1300        name is already taken by an experiment owned by this user that has
1301        failed, it is overwritten.  Otherwise new letters are added until a
1302        valid localname is found.  The generated local name is returned.
1303        """
1304
1305        if req.has_key('experimentID') and \
1306                req['experimentID'].has_key('localname'):
1307            overwrite = False
1308            eid = req['experimentID']['localname']
1309            # If there's an old failed experiment here with the same local name
1310            # and accessible by this user, we'll overwrite it, otherwise we'll
1311            # fall through and do the collision avoidance.
1312            old_expid = self.get_experiment_fedid(eid)
1313            if old_expid and self.check_experiment_access(fid, old_expid):
1314                self.state_lock.acquire()
1315                status = self.state[eid].get('experimentStatus', None)
1316                if status and status == 'failed':
1317                    # remove the old access attribute
1318                    self.auth.unset_attribute(fid, old_expid)
1319                    self.auth.save()
1320                    overwrite = True
1321                    del self.state[eid]
1322                    del self.state[old_expid]
1323                self.state_lock.release()
1324            self.state_lock.acquire()
1325            while (self.state.has_key(eid) and not overwrite):
1326                eid += random.choice(string.ascii_letters)
1327            # Initial state
1328            self.state[eid] = {
1329                    'experimentID' : \
1330                            [ { 'localname' : eid }, {'fedid': expid } ],
1331                    'experimentStatus': state,
1332                    'experimentAccess': { 'X509' : expcert },
1333                    'owner': fid,
1334                    'log' : [],
1335                }
1336            self.state[expid] = self.state[eid]
1337            if self.state_filename: self.write_state()
1338            self.state_lock.release()
1339        else:
1340            eid = self.exp_stem
1341            for i in range(0,5):
1342                eid += random.choice(string.ascii_letters)
1343            self.state_lock.acquire()
1344            while (self.state.has_key(eid)):
1345                eid = self.exp_stem
1346                for i in range(0,5):
1347                    eid += random.choice(string.ascii_letters)
1348            # Initial state
1349            self.state[eid] = {
1350                    'experimentID' : \
1351                            [ { 'localname' : eid }, {'fedid': expid } ],
1352                    'experimentStatus': state,
1353                    'experimentAccess': { 'X509' : expcert },
1354                    'owner': fid,
1355                    'log' : [],
1356                }
1357            self.state[expid] = self.state[eid]
1358            if self.state_filename: self.write_state()
1359            self.state_lock.release()
1360
1361        return eid
1362
1363
1364    def allocate_ips_to_topo(self, top):
1365        """
1366        Add an ip4_address attribute to all the hosts in the topology, based on
1367        the shared substrates on which they sit.  An /etc/hosts file is also
1368        created and returned as a list of hostfiles entries.  We also return
1369        the allocator, because we may need to allocate IPs to portals
1370        (specifically DRAGON portals).
1371        """
1372        subs = sorted(top.substrates, 
1373                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1374                reverse=True)
1375        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1376        ifs = { }
1377        hosts = [ ]
1378
1379        for idx, s in enumerate(subs):
1380            net_size = len(s.interfaces)+2
1381
1382            a = ips.allocate(net_size)
1383            if a :
1384                base, num = a
1385                if num < net_size: 
1386                    raise service_error(service_error.internal,
1387                            "Allocator returned wrong number of IPs??")
1388            else:
1389                raise service_error(service_error.req, 
1390                        "Cannot allocate IP addresses")
1391            mask = ips.min_alloc
1392            while mask < net_size:
1393                mask *= 2
1394
1395            netmask = ((2**32-1) ^ (mask-1))
1396
1397            base += 1
1398            for i in s.interfaces:
1399                i.attribute.append(
1400                        topdl.Attribute('ip4_address', 
1401                            "%s" % ip_addr(base)))
1402                i.attribute.append(
1403                        topdl.Attribute('ip4_netmask', 
1404                            "%s" % ip_addr(int(netmask))))
1405
1406                hname = i.element.name
1407                if ifs.has_key(hname):
1408                    hosts.append("%s\t%s-%s %s-%d" % \
1409                            (ip_addr(base), hname, s.name, hname,
1410                                ifs[hname]))
1411                else:
1412                    ifs[hname] = 0
1413                    hosts.append("%s\t%s-%s %s-%d %s" % \
1414                            (ip_addr(base), hname, s.name, hname,
1415                                ifs[hname], hname))
1416
1417                ifs[hname] += 1
1418                base += 1
1419        return hosts, ips
1420
1421    def get_access_to_testbeds(self, testbeds, access_user, allocated, 
1422            tbparams, masters, tbmap):
1423        """
1424        Request access to the various testbeds required for this instantiation
1425        (passed in as testbeds).  User, access_user, expoert_project and master
1426        are used to construct the correct requests.  Per-testbed parameters are
1427        returned in tbparams.
1428        """
1429        for tb in testbeds:
1430            self.get_access(tb, None, tbparams, access_user, masters, tbmap)
1431            allocated[tb] = 1
1432
1433    def split_topology(self, top, topo, testbeds):
1434        """
1435        Create the sub-topologies that are needed for experiment instantiation.
1436        """
1437        for tb in testbeds:
1438            topo[tb] = top.clone()
1439            # copy in for loop allows deletions from the original
1440            for e in [ e for e in topo[tb].elements]:
1441                etb = e.get_attribute('testbed')
1442                # NB: elements without a testbed attribute won't appear in any
1443                # sub topologies. 
1444                if not etb or etb != tb:
1445                    for i in e.interface:
1446                        for s in i.subs:
1447                            try:
1448                                s.interfaces.remove(i)
1449                            except ValueError:
1450                                raise service_error(service_error.internal,
1451                                        "Can't remove interface??")
1452                    topo[tb].elements.remove(e)
1453            topo[tb].make_indices()
1454
1455    def wrangle_software(self, expid, top, topo, tbparams):
1456        """
1457        Copy software out to the repository directory, allocate permissions and
1458        rewrite the segment topologies to look for the software in local
1459        places.
1460        """
1461
1462        # Copy the rpms and tarfiles to a distribution directory from
1463        # which the federants can retrieve them
1464        linkpath = "%s/software" %  expid
1465        softdir ="%s/%s" % ( self.repodir, linkpath)
1466        softmap = { }
1467        # These are in a list of tuples format (each kit).  This comprehension
1468        # unwraps them into a single list of tuples that initilaizes the set of
1469        # tuples.
1470        pkgs = set([ t for l in [self.fedkit, self.gatewaykit] \
1471                for p, t in l ])
1472        pkgs.update([x.location for e in top.elements \
1473                for x in e.software])
1474        try:
1475            os.makedirs(softdir)
1476        except EnvironmentError, e:
1477            raise service_error(
1478                    "Cannot create software directory: %s" % e)
1479        # The actual copying.  Everything's converted into a url for copying.
1480        for pkg in pkgs:
1481            loc = pkg
1482
1483            scheme, host, path = urlparse(loc)[0:3]
1484            dest = os.path.basename(path)
1485            if not scheme:
1486                if not loc.startswith('/'):
1487                    loc = "/%s" % loc
1488                loc = "file://%s" %loc
1489            try:
1490                u = urlopen(loc)
1491            except Exception, e:
1492                raise service_error(service_error.req, 
1493                        "Cannot open %s: %s" % (loc, e))
1494            try:
1495                f = open("%s/%s" % (softdir, dest) , "w")
1496                self.log.debug("Writing %s/%s" % (softdir,dest) )
1497                data = u.read(4096)
1498                while data:
1499                    f.write(data)
1500                    data = u.read(4096)
1501                f.close()
1502                u.close()
1503            except Exception, e:
1504                raise service_error(service_error.internal,
1505                        "Could not copy %s: %s" % (loc, e))
1506            path = re.sub("/tmp", "", linkpath)
1507            # XXX
1508            softmap[pkg] = \
1509                    "%s/%s/%s" %\
1510                    ( self.repo_url, path, dest)
1511
1512            # Allow the individual segments to access the software.
1513            for tb in tbparams.keys():
1514                self.auth.set_attribute(tbparams[tb]['allocID']['fedid'], 
1515                        "/%s/%s" % ( path, dest))
1516            self.auth.save()
1517
1518        # Convert the software locations in the segments into the local
1519        # copies on this host
1520        for soft in [ s for tb in topo.values() \
1521                for e in tb.elements \
1522                    if getattr(e, 'software', False) \
1523                        for s in e.software ]:
1524            if softmap.has_key(soft.location):
1525                soft.location = softmap[soft.location]
1526
1527
1528    def new_experiment(self, req, fid):
1529        """
1530        The external interface to empty initial experiment creation called from
1531        the dispatcher.
1532
1533        Creates a working directory, splits the incoming description using the
1534        splitter script and parses out the avrious subsections using the
1535        lcasses above.  Once each sub-experiment is created, use pooled threads
1536        to instantiate them and start it all up.
1537        """
1538        req = req.get('NewRequestBody', None)
1539        if not req:
1540            raise service_error(service_error.req,
1541                    "Bad request format (no NewRequestBody)")
1542
1543        if self.auth.import_credentials(data_list=req.get('credential', [])):
1544            self.auth.save()
1545       
1546        if not self.auth.check_attribute(fid, 'new'):
1547            raise service_error(service_error.access, "New access denied")
1548
1549        try:
1550            tmpdir = tempfile.mkdtemp(prefix="split-")
1551        except EnvironmentError:
1552            raise service_error(service_error.internal, "Cannot create tmp dir")
1553
1554        try:
1555            access_user = self.accessdb[fid]
1556        except KeyError:
1557            raise service_error(service_error.internal,
1558                    "Access map and authorizer out of sync in " + \
1559                            "new_experiment for fedid %s"  % fid)
1560
1561        pid = "dummy"
1562        gid = "dummy"
1563
1564        # Generate an ID for the experiment (slice) and a certificate that the
1565        # allocator can use to prove they own it.  We'll ship it back through
1566        # the encrypted connection.  If the requester supplied one, use it.
1567        if 'experimentAccess' in req and 'X509' in req['experimentAccess']:
1568            expcert = req['experimentAccess']['X509']
1569            tf = tempfile.NamedTemporaryFile()
1570            tf.write(expcert)
1571            tf.flush()
1572            expid = fedid(file=tf.name)
1573            tf.close()
1574            self.state_lock.acquire()
1575            if expid in self.state:
1576                self.state_lock.release()
1577                raise service_error(service_error.req, 
1578                        'fedid %s identifies an existing experiment' % expid)
1579            self.state_lock.release()
1580        else:
1581            (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1582
1583        #now we're done with the tmpdir, and it should be empty
1584        if self.cleanup:
1585            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1586            os.rmdir(tmpdir)
1587        else:
1588            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1589
1590        eid = self.create_experiment_state(fid, req, expid, expcert, 
1591                state='empty')
1592
1593        # Let users touch the state
1594        self.auth.set_attribute(fid, expid)
1595        self.auth.set_attribute(expid, expid)
1596        # Override fedids can manipulate state as well
1597        for o in self.overrides:
1598            self.auth.set_attribute(o, expid)
1599        self.auth.save()
1600
1601        rv = {
1602                'experimentID': [
1603                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1604                ],
1605                'experimentStatus': 'empty',
1606                'experimentAccess': { 'X509' : expcert }
1607            }
1608
1609        return rv
1610
1611    def create_experiment(self, req, fid):
1612        """
1613        The external interface to experiment creation called from the
1614        dispatcher.
1615
1616        Creates a working directory, splits the incoming description using the
1617        splitter script and parses out the various subsections using the
1618        classes above.  Once each sub-experiment is created, use pooled threads
1619        to instantiate them and start it all up.
1620        """
1621
1622        req = req.get('CreateRequestBody', None)
1623        if not req:
1624            raise service_error(service_error.req,
1625                    "Bad request format (no CreateRequestBody)")
1626
1627        # Get the experiment access
1628        exp = req.get('experimentID', None)
1629        if exp:
1630            if exp.has_key('fedid'):
1631                key = exp['fedid']
1632                expid = key
1633                eid = None
1634            elif exp.has_key('localname'):
1635                key = exp['localname']
1636                eid = key
1637                expid = None
1638            else:
1639                raise service_error(service_error.req, "Unknown lookup type")
1640        else:
1641            raise service_error(service_error.req, "No request?")
1642
1643        self.check_experiment_access(fid, key)
1644
1645        # Install the testbed map entries supplied with the request into a copy
1646        # of the testbed map.
1647        tbmap = dict(self.tbmap)
1648        for m in req.get('testbedmap', []):
1649            if 'testbed' in m and 'uri' in m:
1650                tbmap[m['testbed']] = m['uri']
1651
1652        try:
1653            tmpdir = tempfile.mkdtemp(prefix="split-")
1654            os.mkdir(tmpdir+"/keys")
1655        except EnvironmentError:
1656            raise service_error(service_error.internal, "Cannot create tmp dir")
1657
1658        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1659        gw_secretkey_base = "fed.%s" % self.ssh_type
1660        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1661        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1662        tclfile = tmpdir + "/experiment.tcl"
1663        tbparams = { }
1664        try:
1665            access_user = self.accessdb[fid]
1666        except KeyError:
1667            raise service_error(service_error.internal,
1668                    "Access map and authorizer out of sync in " + \
1669                            "create_experiment for fedid %s"  % fid)
1670
1671        pid = "dummy"
1672        gid = "dummy"
1673
1674        # The tcl parser needs to read a file so put the content into that file
1675        descr=req.get('experimentdescription', None)
1676        if descr:
1677            file_content=descr.get('ns2description', None)
1678            if file_content:
1679                try:
1680                    f = open(tclfile, 'w')
1681                    f.write(file_content)
1682                    f.close()
1683                except EnvironmentError:
1684                    raise service_error(service_error.internal,
1685                            "Cannot write temp experiment description")
1686            else:
1687                raise service_error(service_error.req, 
1688                        "Only ns2descriptions supported")
1689        else:
1690            raise service_error(service_error.req, "No experiment description")
1691
1692        self.state_lock.acquire()
1693        if self.state.has_key(key):
1694            self.state[key]['experimentStatus'] = "starting"
1695            for e in self.state[key].get('experimentID',[]):
1696                if not expid and e.has_key('fedid'):
1697                    expid = e['fedid']
1698                elif not eid and e.has_key('localname'):
1699                    eid = e['localname']
1700        self.state_lock.release()
1701
1702        if not (eid and expid):
1703            raise service_error(service_error.internal, 
1704                    "Cannot find local experiment info!?")
1705
1706        try: 
1707            # This catches exceptions to clear the placeholder if necessary
1708            try:
1709                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1710            except ValueError:
1711                raise service_error(service_error.server_config, 
1712                        "Bad key type (%s)" % self.ssh_type)
1713
1714            # Copy the service request
1715            tb_services = [ s for s in req.get('service',[]) ]
1716            # Translate to topdl
1717            if self.splitter_url:
1718                self.log.debug("Calling remote topdl translator at %s" % \
1719                        self.splitter_url)
1720                top = self.remote_ns2topdl(self.splitter_url, file_content)
1721            else:
1722                tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
1723                    str(self.muxmax), '-m', 'dummy']
1724
1725                tclcmd.extend([pid, gid, eid, tclfile])
1726
1727                self.log.debug("running local splitter %s", " ".join(tclcmd))
1728                # This is just fantastic.  As a side effect the parser copies
1729                # tb_compat.tcl into the current directory, so that directory
1730                # must be writable by the fedd user.  Doing this in the
1731                # temporary subdir ensures this is the case.
1732                tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
1733                        cwd=tmpdir)
1734                split_data = tclparser.stdout
1735
1736                top = topdl.topology_from_xml(file=split_data, top="experiment")
1737
1738            hosts, ip_allocator = self.allocate_ips_to_topo(top)
1739            # Find the testbeds to look up
1740            testbeds = set([ a.value for e in top.elements \
1741                    for a in e.attribute \
1742                        if a.attribute == 'testbed'])
1743
1744            tb_hosts = { }
1745            for tb in testbeds:
1746                tb_hosts[tb] = [ e.name for e in top.elements \
1747                        if isinstance(e, topdl.Computer) and \
1748                            e.get_attribute('testbed') and \
1749                            e.get_attribute('testbed') == tb]
1750
1751            masters = { }           # testbeds exporting services
1752            pmasters = { }          # Testbeds exporting services that
1753                                    # need portals
1754            for s in tb_services:
1755                # If this is a service request with the importall field
1756                # set, fill it out.
1757
1758                if s.get('importall', False):
1759                    s['import'] = [ tb for tb in testbeds \
1760                            if tb not in s.get('export',[])]
1761                    del s['importall']
1762
1763                # Add the service to masters
1764                for tb in s.get('export', []):
1765                    if s.get('name', None):
1766                        if tb not in masters:
1767                            masters[tb] = [ ]
1768
1769                        params = { }
1770                        if 'fedAttr' in s:
1771                            for a in s['fedAttr']:
1772                                params[a.get('attribute', '')] = \
1773                                        a.get('value','')
1774
1775                        fser = federated_service(name=s['name'],
1776                                exporter=tb, importers=s.get('import',[]),
1777                                params=params)
1778                        if fser.name == 'hide_hosts' \
1779                                and 'hosts' not in fser.params:
1780                            fser.params['hosts'] = \
1781                                    ",".join(tb_hosts.get(fser.exporter, []))
1782                        masters[tb].append(fser)
1783
1784                        if fser.portal:
1785                            if tb not in pmasters: pmasters[tb] = [ fser ]
1786                            else: pmasters[tb].append(fser)
1787                    else:
1788                        self.log.error('Testbed service does not have name " + \
1789                                "and importers')
1790
1791
1792            allocated = { }         # Testbeds we can access
1793            topo ={ }               # Sub topologies
1794            connInfo = { }          # Connection information
1795
1796            self.get_access_to_testbeds(testbeds, access_user, allocated, 
1797                    tbparams, masters, tbmap)
1798
1799            self.split_topology(top, topo, testbeds)
1800
1801            # Copy configuration files into the remote file store
1802            # The config urlpath
1803            configpath = "/%s/config" % expid
1804            # The config file system location
1805            configdir ="%s%s" % ( self.repodir, configpath)
1806            try:
1807                os.makedirs(configdir)
1808            except EnvironmentError, e:
1809                raise service_error(service_error.internal,
1810                        "Cannot create config directory: %s" % e)
1811            try:
1812                f = open("%s/hosts" % configdir, "w")
1813                f.write('\n'.join(hosts))
1814                f.close()
1815            except EnvironmentError, e:
1816                raise service_error(service_error.internal, 
1817                        "Cannot write hosts file: %s" % e)
1818            try:
1819                copy_file("%s" % gw_pubkey, "%s/%s" % \
1820                        (configdir, gw_pubkey_base))
1821                copy_file("%s" % gw_secretkey, "%s/%s" % \
1822                        (configdir, gw_secretkey_base))
1823            except EnvironmentError, e:
1824                raise service_error(service_error.internal, 
1825                        "Cannot copy keyfiles: %s" % e)
1826
1827            # Allow the individual testbeds to access the configuration files.
1828            for tb in tbparams.keys():
1829                asignee = tbparams[tb]['allocID']['fedid']
1830                for f in ("hosts", gw_secretkey_base, gw_pubkey_base):
1831                    self.auth.set_attribute(asignee, "%s/%s" % \
1832                            (configpath, f))
1833
1834            part = experiment_partition(self.auth, self.store_url, tbmap,
1835                    self.muxmax, self.direct_transit)
1836            part.add_portals(top, topo, eid, pmasters, tbparams, ip_allocator,
1837                    connInfo, expid)
1838            # Now get access to the dynamic testbeds (those added above)
1839            for tb in [ t for t in topo if t not in allocated]:
1840                self.get_access(tb, None, tbparams, access_user, masters)
1841                allocated[tb] = 1
1842                store_keys = topo[tb].get_attribute('store_keys')
1843                # Give the testbed access to keys it exports or imports
1844                if store_keys:
1845                    for sk in store_keys.split(" "):
1846                        self.auth.set_attribute(\
1847                                tbparams[tb]['allocID']['fedid'], sk)
1848            self.auth.save()
1849
1850            self.wrangle_software(expid, top, topo, tbparams)
1851
1852            vtopo = topdl.topology_to_vtopo(top)
1853            vis = self.genviz(vtopo)
1854
1855            # save federant information
1856            for k in allocated.keys():
1857                tbparams[k]['federant'] = {
1858                        'name': [ { 'localname' : eid} ],
1859                        'allocID' : tbparams[k]['allocID'],
1860                        'uri': tbparams[k]['uri'],
1861                    }
1862
1863            self.state_lock.acquire()
1864            self.state[eid]['vtopo'] = vtopo
1865            self.state[eid]['vis'] = vis
1866            self.state[eid]['experimentdescription'] = \
1867                    { 'topdldescription': top.to_dict() }
1868            self.state[eid]['federant'] = \
1869                    [ tbparams[tb]['federant'] for tb in tbparams.keys() \
1870                        if tbparams[tb].has_key('federant') ]
1871            if self.state_filename: 
1872                self.write_state()
1873            self.state_lock.release()
1874        except service_error, e:
1875            # If something goes wrong in the parse (usually an access error)
1876            # clear the placeholder state.  From here on out the code delays
1877            # exceptions.  Failing at this point returns a fault to the remote
1878            # caller.
1879
1880            self.state_lock.acquire()
1881            del self.state[eid]
1882            del self.state[expid]
1883            if self.state_filename: self.write_state()
1884            self.state_lock.release()
1885            raise e
1886
1887
1888        # Start the background swapper and return the starting state.  From
1889        # here on out, the state will stick around a while.
1890
1891        # Let users touch the state
1892        self.auth.set_attribute(fid, expid)
1893        self.auth.set_attribute(expid, expid)
1894        # Override fedids can manipulate state as well
1895        for o in self.overrides:
1896            self.auth.set_attribute(o, expid)
1897        self.auth.save()
1898
1899        # Create a logger that logs to the experiment's state object as well as
1900        # to the main log file.
1901        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
1902        alloc_collector = self.list_log(self.state[eid]['log'])
1903        h = logging.StreamHandler(alloc_collector)
1904        # XXX: there should be a global one of these rather than repeating the
1905        # code.
1906        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
1907                    '%d %b %y %H:%M:%S'))
1908        alloc_log.addHandler(h)
1909       
1910        attrs = [ 
1911                {
1912                    'attribute': 'ssh_pubkey', 
1913                    'value': '%s/%s/config/%s' % \
1914                            (self.repo_url, expid, gw_pubkey_base)
1915                },
1916                {
1917                    'attribute': 'ssh_secretkey', 
1918                    'value': '%s/%s/config/%s' % \
1919                            (self.repo_url, expid, gw_secretkey_base)
1920                },
1921                {
1922                    'attribute': 'hosts', 
1923                    'value': '%s/%s/config/hosts' % \
1924                            (self.repo_url, expid)
1925                },
1926            ]
1927
1928        # transit and disconnected testbeds may not have a connInfo entry.
1929        # Fill in the blanks.
1930        for t in allocated.keys():
1931            if not connInfo.has_key(t):
1932                connInfo[t] = { }
1933
1934        # Start a thread to do the resource allocation
1935        t  = Thread(target=self.allocate_resources,
1936                args=(allocated, masters, eid, expid, tbparams, 
1937                    top, topo, tmpdir, alloc_log, alloc_collector, attrs,
1938                    connInfo, tbmap),
1939                name=eid)
1940        t.start()
1941
1942        rv = {
1943                'experimentID': [
1944                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1945                ],
1946                'experimentStatus': 'starting',
1947            }
1948
1949        return rv
1950   
1951    def get_experiment_fedid(self, key):
1952        """
1953        find the fedid associated with the localname key in the state database.
1954        """
1955
1956        rv = None
1957        self.state_lock.acquire()
1958        if self.state.has_key(key):
1959            if isinstance(self.state[key], dict):
1960                try:
1961                    kl = [ f['fedid'] for f in \
1962                            self.state[key]['experimentID']\
1963                                if f.has_key('fedid') ]
1964                except KeyError:
1965                    self.state_lock.release()
1966                    raise service_error(service_error.internal, 
1967                            "No fedid for experiment %s when getting "+\
1968                                    "fedid(!?)" % key)
1969                if len(kl) == 1:
1970                    rv = kl[0]
1971                else:
1972                    self.state_lock.release()
1973                    raise service_error(service_error.internal, 
1974                            "multiple fedids for experiment %s when " +\
1975                                    "getting fedid(!?)" % key)
1976            else:
1977                self.state_lock.release()
1978                raise service_error(service_error.internal, 
1979                        "Unexpected state for %s" % key)
1980        self.state_lock.release()
1981        return rv
1982
1983    def check_experiment_access(self, fid, key):
1984        """
1985        Confirm that the fid has access to the experiment.  Though a request
1986        may be made in terms of a local name, the access attribute is always
1987        the experiment's fedid.
1988        """
1989        if not isinstance(key, fedid):
1990            key = self.get_experiment_fedid(key)
1991
1992        if self.auth.check_attribute(fid, key):
1993            return True
1994        else:
1995            raise service_error(service_error.access, "Access Denied")
1996
1997
1998    def get_handler(self, path, fid):
1999        self.log.info("Get handler %s %s" % (path, fid))
2000        if self.auth.check_attribute(fid, path):
2001            return ("%s/%s" % (self.repodir, path), "application/binary")
2002        else:
2003            return (None, None)
2004
2005    def get_vtopo(self, req, fid):
2006        """
2007        Return the stored virtual topology for this experiment
2008        """
2009        rv = None
2010        state = None
2011
2012        req = req.get('VtopoRequestBody', None)
2013        if not req:
2014            raise service_error(service_error.req,
2015                    "Bad request format (no VtopoRequestBody)")
2016        exp = req.get('experiment', None)
2017        if exp:
2018            if exp.has_key('fedid'):
2019                key = exp['fedid']
2020                keytype = "fedid"
2021            elif exp.has_key('localname'):
2022                key = exp['localname']
2023                keytype = "localname"
2024            else:
2025                raise service_error(service_error.req, "Unknown lookup type")
2026        else:
2027            raise service_error(service_error.req, "No request?")
2028
2029        self.check_experiment_access(fid, key)
2030
2031        self.state_lock.acquire()
2032        if self.state.has_key(key):
2033            if self.state[key].has_key('vtopo'):
2034                rv = { 'experiment' : {keytype: key },\
2035                        'vtopo': self.state[key]['vtopo'],\
2036                    }
2037            else:
2038                state = self.state[key]['experimentStatus']
2039        self.state_lock.release()
2040
2041        if rv: return rv
2042        else: 
2043            if state:
2044                raise service_error(service_error.partial, 
2045                        "Not ready: %s" % state)
2046            else:
2047                raise service_error(service_error.req, "No such experiment")
2048
2049    def get_vis(self, req, fid):
2050        """
2051        Return the stored visualization for this experiment
2052        """
2053        rv = None
2054        state = None
2055
2056        req = req.get('VisRequestBody', None)
2057        if not req:
2058            raise service_error(service_error.req,
2059                    "Bad request format (no VisRequestBody)")
2060        exp = req.get('experiment', None)
2061        if exp:
2062            if exp.has_key('fedid'):
2063                key = exp['fedid']
2064                keytype = "fedid"
2065            elif exp.has_key('localname'):
2066                key = exp['localname']
2067                keytype = "localname"
2068            else:
2069                raise service_error(service_error.req, "Unknown lookup type")
2070        else:
2071            raise service_error(service_error.req, "No request?")
2072
2073        self.check_experiment_access(fid, key)
2074
2075        self.state_lock.acquire()
2076        if self.state.has_key(key):
2077            if self.state[key].has_key('vis'):
2078                rv =  { 'experiment' : {keytype: key },\
2079                        'vis': self.state[key]['vis'],\
2080                        }
2081            else:
2082                state = self.state[key]['experimentStatus']
2083        self.state_lock.release()
2084
2085        if rv: return rv
2086        else:
2087            if state:
2088                raise service_error(service_error.partial, 
2089                        "Not ready: %s" % state)
2090            else:
2091                raise service_error(service_error.req, "No such experiment")
2092
2093    def clean_info_response(self, rv):
2094        """
2095        Remove the information in the experiment's state object that is not in
2096        the info response.
2097        """
2098        # Remove the owner info (should always be there, but...)
2099        if rv.has_key('owner'): del rv['owner']
2100
2101        # Convert the log into the allocationLog parameter and remove the
2102        # log entry (with defensive programming)
2103        if rv.has_key('log'):
2104            rv['allocationLog'] = "".join(rv['log'])
2105            del rv['log']
2106        else:
2107            rv['allocationLog'] = ""
2108
2109        if rv['experimentStatus'] != 'active':
2110            if rv.has_key('federant'): del rv['federant']
2111        else:
2112            # remove the allocationID and uri info from each federant
2113            for f in rv.get('federant', []):
2114                if f.has_key('allocID'): del f['allocID']
2115                if f.has_key('uri'): del f['uri']
2116
2117        return rv
2118
2119    def get_info(self, req, fid):
2120        """
2121        Return all the stored info about this experiment
2122        """
2123        rv = None
2124
2125        req = req.get('InfoRequestBody', None)
2126        if not req:
2127            raise service_error(service_error.req,
2128                    "Bad request format (no InfoRequestBody)")
2129        exp = req.get('experiment', None)
2130        if exp:
2131            if exp.has_key('fedid'):
2132                key = exp['fedid']
2133                keytype = "fedid"
2134            elif exp.has_key('localname'):
2135                key = exp['localname']
2136                keytype = "localname"
2137            else:
2138                raise service_error(service_error.req, "Unknown lookup type")
2139        else:
2140            raise service_error(service_error.req, "No request?")
2141
2142        self.check_experiment_access(fid, key)
2143
2144        # The state may be massaged by the service function that called
2145        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
2146        # state.
2147        self.state_lock.acquire()
2148        if self.state.has_key(key):
2149            rv = copy.deepcopy(self.state[key])
2150        self.state_lock.release()
2151
2152        if rv:
2153            return self.clean_info_response(rv)
2154        else:
2155            raise service_error(service_error.req, "No such experiment")
2156
2157    def get_multi_info(self, req, fid):
2158        """
2159        Return all the stored info that this fedid can access
2160        """
2161        rv = { 'info': [ ] }
2162
2163        self.state_lock.acquire()
2164        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2165            try:
2166                self.check_experiment_access(fid, key)
2167            except service_error, e:
2168                if e.code == service_error.access:
2169                    continue
2170                else:
2171                    self.state_lock.release()
2172                    raise e
2173
2174            if self.state.has_key(key):
2175                e = copy.deepcopy(self.state[key])
2176                e = self.clean_info_response(e)
2177                rv['info'].append(e)
2178        self.state_lock.release()
2179        return rv
2180
2181    def remove_dirs(self, dir):
2182        """
2183        Remove the directory tree and all files rooted at dir.  Log any errors,
2184        but continue.
2185        """
2186        self.log.debug("[removedirs]: removing %s" % dir)
2187        try:
2188            for path, dirs, files in os.walk(dir, topdown=False):
2189                for f in files:
2190                    os.remove(os.path.join(path, f))
2191                for d in dirs:
2192                    os.rmdir(os.path.join(path, d))
2193            os.rmdir(dir)
2194        except EnvironmentError, e:
2195            self.log.error("Error deleting directory tree in %s" % e);
2196
2197    def terminate_experiment(self, req, fid):
2198        """
2199        Swap this experiment out on the federants and delete the shared
2200        information
2201        """
2202        tbparams = { }
2203        req = req.get('TerminateRequestBody', None)
2204        if not req:
2205            raise service_error(service_error.req,
2206                    "Bad request format (no TerminateRequestBody)")
2207        force = req.get('force', False)
2208        exp = req.get('experiment', None)
2209        if exp:
2210            if exp.has_key('fedid'):
2211                key = exp['fedid']
2212                keytype = "fedid"
2213            elif exp.has_key('localname'):
2214                key = exp['localname']
2215                keytype = "localname"
2216            else:
2217                raise service_error(service_error.req, "Unknown lookup type")
2218        else:
2219            raise service_error(service_error.req, "No request?")
2220
2221        self.check_experiment_access(fid, key)
2222
2223        dealloc_list = [ ]
2224
2225
2226        # Create a logger that logs to the dealloc_list as well as to the main
2227        # log file.
2228        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2229        h = logging.StreamHandler(self.list_log(dealloc_list))
2230        # XXX: there should be a global one of these rather than repeating the
2231        # code.
2232        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2233                    '%d %b %y %H:%M:%S'))
2234        dealloc_log.addHandler(h)
2235
2236        self.state_lock.acquire()
2237        fed_exp = self.state.get(key, None)
2238        repo = None
2239
2240        if fed_exp:
2241            # This branch of the conditional holds the lock to generate a
2242            # consistent temporary tbparams variable to deallocate experiments.
2243            # It releases the lock to do the deallocations and reacquires it to
2244            # remove the experiment state when the termination is complete.
2245
2246            # First make sure that the experiment creation is complete.
2247            status = fed_exp.get('experimentStatus', None)
2248
2249            if status:
2250                if status in ('starting', 'terminating'):
2251                    if not force:
2252                        self.state_lock.release()
2253                        raise service_error(service_error.partial, 
2254                                'Experiment still being created or destroyed')
2255                    else:
2256                        self.log.warning('Experiment in %s state ' % status + \
2257                                'being terminated by force.')
2258            else:
2259                # No status??? trouble
2260                self.state_lock.release()
2261                raise service_error(service_error.internal,
2262                        "Experiment has no status!?")
2263
2264            ids = []
2265            #  experimentID is a list of dicts that are self-describing
2266            #  identifiers.  This finds all the fedids and localnames - the
2267            #  keys of self.state - and puts them into ids.
2268            for id in fed_exp.get('experimentID', []):
2269                if id.has_key('fedid'): 
2270                    ids.append(id['fedid'])
2271                    repo = "%s" % id['fedid']
2272                if id.has_key('localname'): ids.append(id['localname'])
2273
2274            # Collect the allocation/segment ids into a dict keyed by the fedid
2275            # of the allocation (or a monotonically increasing integer) that
2276            # contains a tuple of uri, aid (which is a dict...)
2277            for i, fed in enumerate(fed_exp.get('federant', [])):
2278                try:
2279                    uri = fed['uri']
2280                    aid = fed['allocID']
2281                    k = fed['allocID'].get('fedid', i)
2282                except KeyError, e:
2283                    continue
2284                tbparams[k] = (uri, aid)
2285            fed_exp['experimentStatus'] = 'terminating'
2286            if self.state_filename: self.write_state()
2287            self.state_lock.release()
2288
2289            # Stop everyone.  NB, wait_for_all waits until a thread starts and
2290            # then completes, so we can't wait if nothing starts.  So, no
2291            # tbparams, no start.
2292            if len(tbparams) > 0:
2293                thread_pool = self.thread_pool(self.nthreads)
2294                for k in tbparams.keys():
2295                    # Create and start a thread to stop the segment
2296                    thread_pool.wait_for_slot()
2297                    uri, aid = tbparams[k]
2298                    t  = self.pooled_thread(\
2299                            target=self.terminate_segment(log=dealloc_log,
2300                                testbed=uri,
2301                                cert_file=self.cert_file, 
2302                                cert_pwd=self.cert_pwd,
2303                                trusted_certs=self.trusted_certs,
2304                                caller=self.call_TerminateSegment),
2305                            args=(uri, aid), name=k,
2306                            pdata=thread_pool, trace_file=self.trace_file)
2307                    t.start()
2308                # Wait for completions
2309                thread_pool.wait_for_all_done()
2310
2311            # release the allocations (failed experiments have done this
2312            # already, and starting experiments may be in odd states, so we
2313            # ignore errors releasing those allocations
2314            try: 
2315                for k in tbparams.keys():
2316                    # This releases access by uri
2317                    uri, aid = tbparams[k]
2318                    self.release_access(None, aid, uri=uri)
2319            except service_error, e:
2320                if status != 'failed' and not force:
2321                    raise e
2322
2323            # Remove the terminated experiment
2324            self.state_lock.acquire()
2325            for id in ids:
2326                if self.state.has_key(id): del self.state[id]
2327
2328            if self.state_filename: self.write_state()
2329            self.state_lock.release()
2330
2331            # Delete any synch points associated with this experiment.  All
2332            # synch points begin with the fedid of the experiment.
2333            fedid_keys = set(["fedid:%s" % f for f in ids \
2334                    if isinstance(f, fedid)])
2335            for k in self.synch_store.all_keys():
2336                try:
2337                    if len(k) > 45 and k[0:46] in fedid_keys:
2338                        self.synch_store.del_value(k)
2339                except synch_store.BadDeletionError:
2340                    pass
2341            self.write_store()
2342
2343            # Remove software and other cached stuff from the filesystem.
2344            if repo:
2345                self.remove_dirs("%s/%s" % (self.repodir, repo))
2346               
2347            return { 
2348                    'experiment': exp , 
2349                    'deallocationLog': "".join(dealloc_list),
2350                    }
2351        else:
2352            # Don't forget to release the lock
2353            self.state_lock.release()
2354            raise service_error(service_error.req, "No saved state")
2355
2356
2357    def GetValue(self, req, fid):
2358        """
2359        Get a value from the synchronized store
2360        """
2361        req = req.get('GetValueRequestBody', None)
2362        if not req:
2363            raise service_error(service_error.req,
2364                    "Bad request format (no GetValueRequestBody)")
2365       
2366        name = req['name']
2367        wait = req['wait']
2368        rv = { 'name': name }
2369
2370        if self.auth.check_attribute(fid, name):
2371            self.log.debug("[GetValue] asking for %s " % name)
2372            try:
2373                v = self.synch_store.get_value(name, wait)
2374            except synch_store.RevokedKeyError:
2375                # No more synch on this key
2376                raise service_error(service_error.federant, 
2377                        "Synch key %s revoked" % name)
2378            if v is not None:
2379                rv['value'] = v
2380            self.log.debug("[GetValue] got %s from %s" % (v, name))
2381            return rv
2382        else:
2383            raise service_error(service_error.access, "Access Denied")
2384       
2385
2386    def SetValue(self, req, fid):
2387        """
2388        Set a value in the synchronized store
2389        """
2390        req = req.get('SetValueRequestBody', None)
2391        if not req:
2392            raise service_error(service_error.req,
2393                    "Bad request format (no SetValueRequestBody)")
2394       
2395        name = req['name']
2396        v = req['value']
2397
2398        if self.auth.check_attribute(fid, name):
2399            try:
2400                self.synch_store.set_value(name, v)
2401                self.write_store()
2402                self.log.debug("[SetValue] set %s to %s" % (name, v))
2403            except synch_store.CollisionError:
2404                # Translate into a service_error
2405                raise service_error(service_error.req,
2406                        "Value already set: %s" %name)
2407            except synch_store.RevokedKeyError:
2408                # No more synch on this key
2409                raise service_error(service_error.federant, 
2410                        "Synch key %s revoked" % name)
2411            return { 'name': name, 'value': v }
2412        else:
2413            raise service_error(service_error.access, "Access Denied")
Note: See TracBrowser for help on using the repository browser.