source: fedd/federation/experiment_control.py @ 962ea25

axis_examplecompt_changesinfo-ops
Last change on this file since 962ea25 was 962ea25, checked in by Ted Faber <faber@…>, 14 years ago

Move that gross fedid generation code into the fedid class

  • Property mode set to 100644
File size: 83.2 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import Lock, Thread, Condition
21from subprocess import call, Popen, PIPE
22
23from urlparse import urlparse
24from urllib2 import urlopen
25
26from util import *
27from fedid import fedid, generate_fedid
28from remote_service import xmlrpc_handler, soap_handler, service_caller
29from service_error import service_error
30from synch_store import synch_store
31from experiment_partition import experiment_partition
32from authorizer import abac_authorizer
33
34import topdl
35import list_log
36from ip_allocator import ip_allocator
37from ip_addr import ip_addr
38
39
40class nullHandler(logging.Handler):
41    def emit(self, record): pass
42
43fl = logging.getLogger("fedd.experiment_control")
44fl.addHandler(nullHandler())
45
46
47# Right now, no support for composition.
48class federated_service:
49    def __init__(self, name, exporter=None, importers=None, params=None, 
50            reqs=None, portal=None):
51        self.name=name
52        self.exporter=exporter
53        if importers is None: self.importers = []
54        else: self.importers=importers
55        if params is None: self.params = { }
56        else: self.params = params
57        if reqs is None: self.reqs = []
58        else: self.reqs = reqs
59
60        if portal is not None:
61            self.portal = portal
62        else:
63            self.portal = (name in federated_service.needs_portal)
64
65    def __str__(self):
66        return "name %s export %s import %s params %s reqs %s" % \
67                (self.name, self.exporter, self.importers, self.params,
68                        [ (r['name'], r['visibility']) for r in self.reqs] )
69
70    needs_portal = ('SMB', 'seer', 'tmcd', 'project_export', 'seer_master')
71
72class experiment_control_local:
73    """
74    Control of experiments that this system can directly access.
75
76    Includes experiment creation, termination and information dissemination.
77    Thred safe.
78    """
79
80    class ssh_cmd_timeout(RuntimeError): pass
81   
82    class thread_pool:
83        """
84        A class to keep track of a set of threads all invoked for the same
85        task.  Manages the mutual exclusion of the states.
86        """
87        def __init__(self, nthreads):
88            """
89            Start a pool.
90            """
91            self.changed = Condition()
92            self.started = 0
93            self.terminated = 0
94            self.nthreads = nthreads
95
96        def acquire(self):
97            """
98            Get the pool's lock.
99            """
100            self.changed.acquire()
101
102        def release(self):
103            """
104            Release the pool's lock.
105            """
106            self.changed.release()
107
108        def wait(self, timeout = None):
109            """
110            Wait for a pool thread to start or stop.
111            """
112            self.changed.wait(timeout)
113
114        def start(self):
115            """
116            Called by a pool thread to report starting.
117            """
118            self.changed.acquire()
119            self.started += 1
120            self.changed.notifyAll()
121            self.changed.release()
122
123        def terminate(self):
124            """
125            Called by a pool thread to report finishing.
126            """
127            self.changed.acquire()
128            self.terminated += 1
129            self.changed.notifyAll()
130            self.changed.release()
131
132        def clear(self):
133            """
134            Clear all pool data.
135            """
136            self.changed.acquire()
137            self.started = 0
138            self.terminated =0
139            self.changed.notifyAll()
140            self.changed.release()
141
142        def wait_for_slot(self):
143            """
144            Wait until we have a free slot to start another pooled thread
145            """
146            self.acquire()
147            while self.started - self.terminated >= self.nthreads:
148                self.wait()
149            self.release()
150
151        def wait_for_all_done(self, timeout=None):
152            """
153            Wait until all active threads finish (and at least one has
154            started).  If a timeout is given, return after waiting that long
155            for termination.  If all threads are done (and one has started in
156            the since the last clear()) return True, otherwise False.
157            """
158            if timeout:
159                deadline = time.time() + timeout
160            self.acquire()
161            while self.started == 0 or self.started > self.terminated:
162                self.wait(timeout)
163                if timeout:
164                    if time.time() > deadline:
165                        break
166                    timeout = deadline - time.time()
167            self.release()
168            return not (self.started == 0 or self.started > self.terminated)
169
170    class pooled_thread(Thread):
171        """
172        One of a set of threads dedicated to a specific task.  Uses the
173        thread_pool class above for coordination.
174        """
175        def __init__(self, group=None, target=None, name=None, args=(), 
176                kwargs={}, pdata=None, trace_file=None):
177            Thread.__init__(self, group, target, name, args, kwargs)
178            self.rv = None          # Return value of the ops in this thread
179            self.exception = None   # Exception that terminated this thread
180            self.target=target      # Target function to run on start()
181            self.args = args        # Args to pass to target
182            self.kwargs = kwargs    # Additional kw args
183            self.pdata = pdata      # thread_pool for this class
184            # Logger for this thread
185            self.log = logging.getLogger("fedd.experiment_control")
186       
187        def run(self):
188            """
189            Emulate Thread.run, except add pool data manipulation and error
190            logging.
191            """
192            if self.pdata:
193                self.pdata.start()
194
195            if self.target:
196                try:
197                    self.rv = self.target(*self.args, **self.kwargs)
198                except service_error, s:
199                    self.exception = s
200                    self.log.error("Thread exception: %s %s" % \
201                            (s.code_string(), s.desc))
202                except:
203                    self.exception = sys.exc_info()[1]
204                    self.log.error(("Unexpected thread exception: %s" +\
205                            "Trace %s") % (self.exception,\
206                                traceback.format_exc()))
207            if self.pdata:
208                self.pdata.terminate()
209
210    call_RequestAccess = service_caller('RequestAccess')
211    call_ReleaseAccess = service_caller('ReleaseAccess')
212    call_StartSegment = service_caller('StartSegment')
213    call_TerminateSegment = service_caller('TerminateSegment')
214    call_Ns2Topdl = service_caller('Ns2Topdl')
215
216    def __init__(self, config=None, auth=None):
217        """
218        Intialize the various attributes, most from the config object
219        """
220
221        def parse_tarfile_list(tf):
222            """
223            Parse a tarfile list from the configuration.  This is a set of
224            paths and tarfiles separated by spaces.
225            """
226            rv = [ ]
227            if tf is not None:
228                tl = tf.split()
229                while len(tl) > 1:
230                    p, t = tl[0:2]
231                    del tl[0:2]
232                    rv.append((p, t))
233            return rv
234
235        self.thread_with_rv = experiment_control_local.pooled_thread
236        self.thread_pool = experiment_control_local.thread_pool
237        self.list_log = list_log.list_log
238
239        self.cert_file = config.get("experiment_control", "cert_file")
240        if self.cert_file:
241            self.cert_pwd = config.get("experiment_control", "cert_pwd")
242        else:
243            self.cert_file = config.get("globals", "cert_file")
244            self.cert_pwd = config.get("globals", "cert_pwd")
245
246        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
247                or config.get("globals", "trusted_certs")
248
249        self.repodir = config.get("experiment_control", "repodir")
250        self.repo_url = config.get("experiment_control", "repo_url", 
251                "https://users.isi.deterlab.net:23235");
252
253        self.exp_stem = "fed-stem"
254        self.log = logging.getLogger("fedd.experiment_control")
255        set_log_level(config, "experiment_control", self.log)
256        self.muxmax = 2
257        self.nthreads = 10
258        self.randomize_experiments = False
259
260        self.splitter = None
261        self.ssh_keygen = "/usr/bin/ssh-keygen"
262        self.ssh_identity_file = None
263
264
265        self.debug = config.getboolean("experiment_control", "create_debug")
266        self.cleanup = not config.getboolean("experiment_control", 
267                "leave_tmpfiles")
268        self.state_filename = config.get("experiment_control", 
269                "experiment_state")
270        self.store_filename = config.get("experiment_control", 
271                "synch_store")
272        self.store_url = config.get("experiment_control", "store_url")
273        self.splitter_url = config.get("experiment_control", "ns2topdl_uri")
274        self.fedkit = parse_tarfile_list(\
275                config.get("experiment_control", "fedkit"))
276        self.gatewaykit = parse_tarfile_list(\
277                config.get("experiment_control", "gatewaykit"))
278        accessdb_file = config.get("experiment_control", "accessdb")
279
280        self.ssh_pubkey_file = config.get("experiment_control", 
281                "ssh_pubkey_file")
282        self.ssh_privkey_file = config.get("experiment_control",
283                "ssh_privkey_file")
284        dt = config.get("experiment_control", "direct_transit")
285        self.auth_type = config.get('experiment_control', 'auth_type') \
286                or 'legacy'
287        self.auth_dir = config.get('experiment_control', 'auth_dir')
288        if dt: self.direct_transit = [ tb.strip() for tb in dt.split(",")]
289        else: self.direct_transit = [ ]
290        # NB for internal master/slave ops, not experiment setup
291        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
292
293        self.overrides = set([])
294        ovr = config.get('experiment_control', 'overrides')
295        if ovr:
296            for o in ovr.split(","):
297                o = o.strip()
298                if o.startswith('fedid:'): o = o[len('fedid:'):]
299                self.overrides.add(fedid(hexstr=o))
300
301        self.state = { }
302        self.state_lock = Lock()
303        self.tclsh = "/usr/local/bin/otclsh"
304        self.tcl_splitter = config.get("ns2topdl", "tcl_splitter") or \
305                config.get("experiment_control", "tcl_splitter",
306                        "/usr/testbed/lib/ns2ir/parse.tcl")
307        mapdb_file = config.get("experiment_control", "mapdb")
308        self.trace_file = sys.stderr
309
310        self.def_expstart = \
311                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
312                "/tmp/federate";
313        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
314                "FEDDIR/hosts";
315        self.def_gwstart = \
316                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
317                "/tmp/bridge.log";
318        self.def_mgwstart = \
319                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
320                "/tmp/bridge.log";
321        self.def_gwimage = "FBSD61-TUNNEL2";
322        self.def_gwtype = "pc";
323        self.local_access = { }
324
325        if self.auth_type == 'legacy':
326            if auth:
327                self.auth = auth
328            else:
329                self.log.error( "[access]: No authorizer initialized, " +\
330                        "creating local one.")
331                auth = authorizer()
332        elif self.auth_type == 'abac':
333            self.auth = abac_authorizer(load=self.auth_dir)
334        else:
335            raise service_error(service_error.internal, 
336                    "Unknown auth_type: %s" % self.auth_type)
337
338
339        if self.ssh_pubkey_file:
340            try:
341                f = open(self.ssh_pubkey_file, 'r')
342                self.ssh_pubkey = f.read()
343                f.close()
344            except EnvironmentError:
345                raise service_error(service_error.internal,
346                        "Cannot read sshpubkey")
347        else:
348            raise service_error(service_error.internal, 
349                    "No SSH public key file?")
350
351        if not self.ssh_privkey_file:
352            raise service_error(service_error.internal, 
353                    "No SSH public key file?")
354
355
356        if mapdb_file:
357            self.read_mapdb(mapdb_file)
358        else:
359            self.log.warn("[experiment_control] No testbed map, using defaults")
360            self.tbmap = { 
361                    'deter':'https://users.isi.deterlab.net:23235',
362                    'emulab':'https://users.isi.deterlab.net:23236',
363                    'ucb':'https://users.isi.deterlab.net:23237',
364                    }
365
366        if accessdb_file:
367                self.read_accessdb(accessdb_file)
368        else:
369            raise service_error(service_error.internal,
370                    "No accessdb specified in config")
371
372        # Grab saved state.  OK to do this w/o locking because it's read only
373        # and only one thread should be in existence that can see self.state at
374        # this point.
375        if self.state_filename:
376            self.read_state()
377
378        if self.store_filename:
379            self.read_store()
380        else:
381            self.log.warning("No saved synch store")
382            self.synch_store = synch_store
383
384        # Dispatch tables
385        self.soap_services = {\
386                'New': soap_handler('New', self.new_experiment),
387                'Create': soap_handler('Create', self.create_experiment),
388                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
389                'Vis': soap_handler('Vis', self.get_vis),
390                'Info': soap_handler('Info', self.get_info),
391                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
392                'Terminate': soap_handler('Terminate', 
393                    self.terminate_experiment),
394                'GetValue': soap_handler('GetValue', self.GetValue),
395                'SetValue': soap_handler('SetValue', self.SetValue),
396        }
397
398        self.xmlrpc_services = {\
399                'New': xmlrpc_handler('New', self.new_experiment),
400                'Create': xmlrpc_handler('Create', self.create_experiment),
401                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
402                'Vis': xmlrpc_handler('Vis', self.get_vis),
403                'Info': xmlrpc_handler('Info', self.get_info),
404                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
405                'Terminate': xmlrpc_handler('Terminate',
406                    self.terminate_experiment),
407                'GetValue': xmlrpc_handler('GetValue', self.GetValue),
408                'SetValue': xmlrpc_handler('SetValue', self.SetValue),
409        }
410
411    # Call while holding self.state_lock
412    def write_state(self):
413        """
414        Write a new copy of experiment state after copying the existing state
415        to a backup.
416
417        State format is a simple pickling of the state dictionary.
418        """
419        if os.access(self.state_filename, os.W_OK):
420            copy_file(self.state_filename, \
421                    "%s.bak" % self.state_filename)
422        try:
423            f = open(self.state_filename, 'w')
424            pickle.dump(self.state, f)
425        except EnvironmentError, e:
426            self.log.error("Can't write file %s: %s" % \
427                    (self.state_filename, e))
428        except pickle.PicklingError, e:
429            self.log.error("Pickling problem: %s" % e)
430        except TypeError, e:
431            self.log.error("Pickling problem (TypeError): %s" % e)
432
433    @staticmethod
434    def get_alloc_ids(state):
435        """
436        Pull the fedids of the identifiers of each allocation from the
437        state.  Again, a dict dive that's best isolated.
438
439        Used by read_store and read state
440        """
441
442        return [ f['allocID']['fedid'] 
443                for f in state.get('federant',[]) \
444                    if f.has_key('allocID') and \
445                        f['allocID'].has_key('fedid')]
446
447    # Call while holding self.state_lock
448    def read_state(self):
449        """
450        Read a new copy of experiment state.  Old state is overwritten.
451
452        State format is a simple pickling of the state dictionary.
453        """
454       
455        def get_experiment_id(state):
456            """
457            Pull the fedid experimentID out of the saved state.  This is kind
458            of a gross walk through the dict.
459            """
460
461            if state.has_key('experimentID'):
462                for e in state['experimentID']:
463                    if e.has_key('fedid'):
464                        return e['fedid']
465                else:
466                    return None
467            else:
468                return None
469
470        try:
471            f = open(self.state_filename, "r")
472            self.state = pickle.load(f)
473            self.log.debug("[read_state]: Read state from %s" % \
474                    self.state_filename)
475        except EnvironmentError, e:
476            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
477                    % (self.state_filename, e))
478        except pickle.UnpicklingError, e:
479            self.log.warning(("[read_state]: No saved state: " + \
480                    "Unpickling failed: %s") % e)
481       
482        for s in self.state.values():
483            try:
484
485                eid = get_experiment_id(s)
486                if eid : 
487                    if self.auth_type == 'legacy':
488                        # XXX: legacy
489                        # Give the owner rights to the experiment
490                        self.auth.set_attribute(s['owner'], eid)
491                        # And holders of the eid as well
492                        self.auth.set_attribute(eid, eid)
493                        # allow overrides to control experiments as well
494                        for o in self.overrides:
495                            self.auth.set_attribute(o, eid)
496                        # Set permissions to allow reading of the software
497                        # repo, if any, as well.
498                        for a in self.get_alloc_ids(s):
499                            self.auth.set_attribute(a, 'repo/%s' % eid)
500                else:
501                    raise KeyError("No experiment id")
502            except KeyError, e:
503                self.log.warning("[read_state]: State ownership or identity " +\
504                        "misformatted in %s: %s" % (self.state_filename, e))
505
506
507    def read_accessdb(self, accessdb_file):
508        """
509        Read the mapping from fedids that can create experiments to their name
510        in the 3-level access namespace.  All will be asserted from this
511        testbed and can include the local username and porject that will be
512        asserted on their behalf by this fedd.  Each fedid is also added to the
513        authorization system with the "create" attribute.
514        """
515        self.accessdb = {}
516        # These are the regexps for parsing the db
517        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
518        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
519                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
520        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
521                "\s*->\s*(" + name_expr + ")\s*$")
522        lineno = 0
523
524        # Parse the mappings and store in self.authdb, a dict of
525        # fedid -> (proj, user)
526        try:
527            f = open(accessdb_file, "r")
528            for line in f:
529                lineno += 1
530                line = line.strip()
531                if len(line) == 0 or line.startswith('#'):
532                    continue
533                m = project_line.match(line)
534                if m:
535                    fid = fedid(hexstr=m.group(1))
536                    project, user = m.group(2,3)
537                    if not self.accessdb.has_key(fid):
538                        self.accessdb[fid] = []
539                    self.accessdb[fid].append((project, user))
540                    continue
541
542                m = user_line.match(line)
543                if m:
544                    fid = fedid(hexstr=m.group(1))
545                    project = None
546                    user = m.group(2)
547                    if not self.accessdb.has_key(fid):
548                        self.accessdb[fid] = []
549                    self.accessdb[fid].append((project, user))
550                    continue
551                self.log.warn("[experiment_control] Error parsing access " +\
552                        "db %s at line %d" %  (accessdb_file, lineno))
553        except EnvironmentError:
554            raise service_error(service_error.internal,
555                    ("Error opening/reading %s as experiment " +\
556                            "control accessdb") %  accessdb_file)
557        f.close()
558
559        # Initialize the authorization attributes
560        # XXX: legacy
561        if self.auth_type == 'legacy':
562            for fid in self.accessdb.keys():
563                self.auth.set_attribute(fid, 'create')
564                self.auth.set_attribute(fid, 'new')
565
566    def read_mapdb(self, file):
567        """
568        Read a simple colon separated list of mappings for the
569        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
570        """
571
572        self.tbmap = { }
573        lineno =0
574        try:
575            f = open(file, "r")
576            for line in f:
577                lineno += 1
578                line = line.strip()
579                if line.startswith('#') or len(line) == 0:
580                    continue
581                try:
582                    label, url = line.split(':', 1)
583                    self.tbmap[label] = url
584                except ValueError, e:
585                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
586                            "map db: %s %s" % (lineno, line, e))
587        except EnvironmentError, e:
588            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
589                    "open %s: %s" % (file, e))
590        f.close()
591
592    def read_store(self):
593        try:
594            self.synch_store = synch_store()
595            self.synch_store.load(self.store_filename)
596            self.log.debug("[read_store]: Read store from %s" % \
597                    self.store_filename)
598        except EnvironmentError, e:
599            self.log.warning("[read_store]: No saved store: Can't open %s: %s"\
600                    % (self.state_filename, e))
601            self.synch_store = synch_store()
602
603        # Set the initial permissions on data in the store.  XXX: This ad hoc
604        # authorization attribute initialization is getting out of hand.
605        # XXX: legacy
606        if self.auth_type == 'legacy':
607            for k in self.synch_store.all_keys():
608                try:
609                    if k.startswith('fedid:'):
610                        fid = fedid(hexstr=k[6:46])
611                        if self.state.has_key(fid):
612                            for a in self.get_alloc_ids(self.state[fid]):
613                                self.auth.set_attribute(a, k)
614                except ValueError, e:
615                    self.log.warn("Cannot deduce permissions for %s" % k)
616
617
618    def write_store(self):
619        """
620        Write a new copy of synch_store after writing current state
621        to a backup.  We use the internal synch_store pickle method to avoid
622        incinsistent data.
623
624        State format is a simple pickling of the store.
625        """
626        if os.access(self.store_filename, os.W_OK):
627            copy_file(self.store_filename, \
628                    "%s.bak" % self.store_filename)
629        try:
630            self.synch_store.save(self.store_filename)
631        except EnvironmentError, e:
632            self.log.error("Can't write file %s: %s" % \
633                    (self.store_filename, e))
634        except TypeError, e:
635            self.log.error("Pickling problem (TypeError): %s" % e)
636
637       
638    def generate_ssh_keys(self, dest, type="rsa" ):
639        """
640        Generate a set of keys for the gateways to use to talk.
641
642        Keys are of type type and are stored in the required dest file.
643        """
644        valid_types = ("rsa", "dsa")
645        t = type.lower();
646        if t not in valid_types: raise ValueError
647        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
648
649        try:
650            trace = open("/dev/null", "w")
651        except EnvironmentError:
652            raise service_error(service_error.internal,
653                    "Cannot open /dev/null??");
654
655        # May raise CalledProcessError
656        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
657        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
658        if rv != 0:
659            raise service_error(service_error.internal, 
660                    "Cannot generate nonce ssh keys.  %s return code %d" \
661                            % (self.ssh_keygen, rv))
662
663    def gentopo(self, str):
664        """
665        Generate the topology dtat structure from the splitter's XML
666        representation of it.
667
668        The topology XML looks like:
669            <experiment>
670                <nodes>
671                    <node><vname></vname><ips>ip1:ip2</ips></node>
672                </nodes>
673                <lans>
674                    <lan>
675                        <vname></vname><vnode></vnode><ip></ip>
676                        <bandwidth></bandwidth><member>node:port</member>
677                    </lan>
678                </lans>
679        """
680        class topo_parse:
681            """
682            Parse the topology XML and create the dats structure.
683            """
684            def __init__(self):
685                # Typing of the subelements for data conversion
686                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
687                self.int_subelements = ( 'bandwidth',)
688                self.float_subelements = ( 'delay',)
689                # The final data structure
690                self.nodes = [ ]
691                self.lans =  [ ]
692                self.topo = { \
693                        'node': self.nodes,\
694                        'lan' : self.lans,\
695                    }
696                self.element = { }  # Current element being created
697                self.chars = ""     # Last text seen
698
699            def end_element(self, name):
700                # After each sub element the contents is added to the current
701                # element or to the appropriate list.
702                if name == 'node':
703                    self.nodes.append(self.element)
704                    self.element = { }
705                elif name == 'lan':
706                    self.lans.append(self.element)
707                    self.element = { }
708                elif name in self.str_subelements:
709                    self.element[name] = self.chars
710                    self.chars = ""
711                elif name in self.int_subelements:
712                    self.element[name] = int(self.chars)
713                    self.chars = ""
714                elif name in self.float_subelements:
715                    self.element[name] = float(self.chars)
716                    self.chars = ""
717
718            def found_chars(self, data):
719                self.chars += data.rstrip()
720
721
722        tp = topo_parse();
723        parser = xml.parsers.expat.ParserCreate()
724        parser.EndElementHandler = tp.end_element
725        parser.CharacterDataHandler = tp.found_chars
726
727        parser.Parse(str)
728
729        return tp.topo
730       
731
732    def genviz(self, topo):
733        """
734        Generate the visualization the virtual topology
735        """
736
737        neato = "/usr/local/bin/neato"
738        # These are used to parse neato output and to create the visualization
739        # file.
740        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="([\d\.]+),([\d\.]+)"')
741        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
742                "%s</type></node>"
743
744        try:
745            # Node names
746            nodes = [ n['vname'] for n in topo['node'] ]
747            topo_lans = topo['lan']
748        except KeyError, e:
749            raise service_error(service_error.internal, "Bad topology: %s" %e)
750
751        lans = { }
752        links = { }
753
754        # Walk through the virtual topology, organizing the connections into
755        # 2-node connections (links) and more-than-2-node connections (lans).
756        # When a lan is created, it's added to the list of nodes (there's a
757        # node in the visualization for the lan).
758        for l in topo_lans:
759            if links.has_key(l['vname']):
760                if len(links[l['vname']]) < 2:
761                    links[l['vname']].append(l['vnode'])
762                else:
763                    nodes.append(l['vname'])
764                    lans[l['vname']] = links[l['vname']]
765                    del links[l['vname']]
766                    lans[l['vname']].append(l['vnode'])
767            elif lans.has_key(l['vname']):
768                lans[l['vname']].append(l['vnode'])
769            else:
770                links[l['vname']] = [ l['vnode'] ]
771
772
773        # Open up a temporary file for dot to turn into a visualization
774        try:
775            df, dotname = tempfile.mkstemp()
776            dotfile = os.fdopen(df, 'w')
777        except EnvironmentError:
778            raise service_error(service_error.internal,
779                    "Failed to open file in genviz")
780
781        try:
782            dnull = open('/dev/null', 'w')
783        except EnvironmentError:
784            service_error(service_error.internal,
785                    "Failed to open /dev/null in genviz")
786
787        # Generate a dot/neato input file from the links, nodes and lans
788        try:
789            print >>dotfile, "graph G {"
790            for n in nodes:
791                print >>dotfile, '\t"%s"' % n
792            for l in links.keys():
793                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
794            for l in lans.keys():
795                for n in lans[l]:
796                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
797            print >>dotfile, "}"
798            dotfile.close()
799        except TypeError:
800            raise service_error(service_error.internal,
801                    "Single endpoint link in vtopo")
802        except EnvironmentError:
803            raise service_error(service_error.internal, "Cannot write dot file")
804
805        # Use dot to create a visualization
806        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
807                '-Gpack=true', dotname], stdout=PIPE, stderr=dnull,
808                close_fds=True)
809        dnull.close()
810
811        # Translate dot to vis format
812        vis_nodes = [ ]
813        vis = { 'node': vis_nodes }
814        for line in dot.stdout:
815            m = vis_re.match(line)
816            if m:
817                vn = m.group(1)
818                vis_node = {'name': vn, \
819                        'x': float(m.group(2)),\
820                        'y' : float(m.group(3)),\
821                    }
822                if vn in links.keys() or vn in lans.keys():
823                    vis_node['type'] = 'lan'
824                else:
825                    vis_node['type'] = 'node'
826                vis_nodes.append(vis_node)
827        rv = dot.wait()
828
829        os.remove(dotname)
830        if rv == 0 : return vis
831        else: return None
832
833    def get_access(self, tb, nodes, tbparam, access_user, masters, tbmap):
834        """
835        Get access to testbed through fedd and set the parameters for that tb
836        """
837        def get_export_project(svcs):
838            """
839            Look through for the list of federated_service for this testbed
840            objects for a project_export service, and extract the project
841            parameter.
842            """
843
844            pe = [s for s in svcs if s.name=='project_export']
845            if len(pe) == 1:
846                return pe[0].params.get('project', None)
847            elif len(pe) == 0:
848                return None
849            else:
850                raise service_error(service_error.req,
851                        "More than one project export is not supported")
852
853        uri = tbmap.get(testbed_base(tb), None)
854        if not uri:
855            raise service_error(service_error.server_config, 
856                    "Unknown testbed: %s" % tb)
857
858        export_svcs = masters.get(tb,[])
859        import_svcs = [ s for m in masters.values() \
860                for s in m \
861                    if tb in s.importers ]
862
863        export_project = get_export_project(export_svcs)
864
865        # Tweak search order so that if there are entries in access_user that
866        # have a project matching the export project, we try them first
867        if export_project: 
868            access_sequence = [ (p, u) for p, u in access_user \
869                    if p == export_project] 
870            access_sequence.extend([(p, u) for p, u in access_user \
871                    if p != export_project]) 
872        else: 
873            access_sequence = access_user
874
875        for p, u in access_sequence: 
876            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
877                    "to %s") %  ((p or "None"), u, uri))
878
879            if p:
880                # Request with user and project specified
881                req = {\
882                        'credential': [ "project: %s" % p, "user: %s"  % u],
883                    }
884            else:
885                # Request with only user specified
886                req = {\
887                        'credential': [ 'user: %s' % u ],
888                    }
889
890            # Make the service request from the services we're importing and
891            # exporting.  Keep track of the export request ids so we can
892            # collect the resulting info from the access response.
893            e_keys = { }
894            if import_svcs or export_svcs:
895                req['service'] = [ ]
896
897                for i, s in enumerate(import_svcs):
898                    idx = 'import%d' % i
899                    sr = {'id': idx, 'name': s.name, 'visibility': 'import' }
900                    if s.params:
901                        sr['fedAttr'] = [ { 'attribute': k, 'value': v } \
902                                for k, v in s.params.items()]
903                    req['service'].append(sr)
904
905                for i, s in enumerate(export_svcs):
906                    idx = 'export%d' % i
907                    e_keys[idx] = s
908                    sr = {'id': idx, 'name': s.name, 'visibility': 'export' }
909                    if s.params:
910                        sr['fedAttr'] = [ { 'attribute': k, 'value': v } 
911                                for k, v in s.params.items()]
912                    req['service'].append(sr)
913
914            # node resources if any
915            if nodes != None and len(nodes) > 0:
916                rnodes = [ ]
917                for n in nodes:
918                    rn = { }
919                    image, hw, count = n.split(":")
920                    if image: rn['image'] = [ image ]
921                    if hw: rn['hardware'] = [ hw ]
922                    if count and int(count) >0 : rn['count'] = int(count)
923                    rnodes.append(rn)
924                req['resources']= { }
925                req['resources']['node'] = rnodes
926
927            try:
928                if self.local_access.has_key(uri):
929                    # Local access call
930                    req = { 'RequestAccessRequestBody' : req }
931                    r = self.local_access[uri].RequestAccess(req, 
932                            fedid(file=self.cert_file))
933                    r = { 'RequestAccessResponseBody' : r }
934                else:
935                    r = self.call_RequestAccess(uri, req, 
936                            self.cert_file, self.cert_pwd, self.trusted_certs)
937            except service_error, e:
938                if e.code == service_error.access:
939                    self.log.debug("[get_access] Access denied")
940                    r = None
941                    continue
942                else:
943                    raise e
944
945            if r.has_key('RequestAccessResponseBody'):
946                # Through to here we have a valid response, not a fault.
947                # Access denied is a fault, so something better or worse than
948                # access denied has happened.
949                r = r['RequestAccessResponseBody']
950                self.log.debug("[get_access] Access granted")
951                break
952            else:
953                raise service_error(service_error.protocol,
954                        "Bad proxy response")
955       
956        if not r:
957            raise service_error(service_error.access, 
958                    "Access denied by %s (%s)" % (tb, uri))
959
960        tbparam[tb] = { 
961                "allocID" : r['allocID'],
962                "uri": uri,
963                }
964
965        # Collect the responses corresponding to the services this testbed
966        # exports.  These will be the service requests that we will include in
967        # the start segment requests (with appropriate visibility values) to
968        # import and export the segments.
969        for s in r.get('service', []):
970            id = s.get('id', None)
971            if id and id in e_keys:
972                e_keys[id].reqs.append(s)
973
974        # Add attributes to parameter space.  We don't allow attributes to
975        # overlay any parameters already installed.
976        for a in r.get('fedAttr', []):
977            try:
978                if a['attribute'] and \
979                        isinstance(a['attribute'], basestring)\
980                        and not tbparam[tb].has_key(a['attribute'].lower()):
981                    tbparam[tb][a['attribute'].lower()] = a['value']
982            except KeyError:
983                self.log.error("Bad attribute in response: %s" % a)
984
985    def release_access(self, tb, aid, tbmap=None, uri=None):
986        """
987        Release access to testbed through fedd
988        """
989
990        if not uri and tbmap:
991            uri = tbmap.get(tb, None)
992        if not uri:
993            raise service_error(service_error.server_config, 
994                    "Unknown testbed: %s" % tb)
995
996        if self.local_access.has_key(uri):
997            resp = self.local_access[uri].ReleaseAccess(\
998                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
999                    fedid(file=self.cert_file))
1000            resp = { 'ReleaseAccessResponseBody': resp } 
1001        else:
1002            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
1003                    self.cert_file, self.cert_pwd, self.trusted_certs)
1004
1005        # better error coding
1006
1007    def remote_ns2topdl(self, uri, desc):
1008
1009        req = {
1010                'description' : { 'ns2description': desc },
1011            }
1012
1013        r = self.call_Ns2Topdl(uri, req, self.cert_file, self.cert_pwd, 
1014                self.trusted_certs)
1015
1016        if r.has_key('Ns2TopdlResponseBody'):
1017            r = r['Ns2TopdlResponseBody']
1018            ed = r.get('experimentdescription', None)
1019            if ed.has_key('topdldescription'):
1020                return topdl.Topology(**ed['topdldescription'])
1021            else:
1022                raise service_error(service_error.protocol, 
1023                        "Bad splitter response (no output)")
1024        else:
1025            raise service_error(service_error.protocol, "Bad splitter response")
1026
1027    class start_segment:
1028        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
1029                cert_pwd=None, trusted_certs=None, caller=None,
1030                log_collector=None):
1031            self.log = log
1032            self.debug = debug
1033            self.cert_file = cert_file
1034            self.cert_pwd = cert_pwd
1035            self.trusted_certs = None
1036            self.caller = caller
1037            self.testbed = testbed
1038            self.log_collector = log_collector
1039            self.response = None
1040            self.node = { }
1041
1042        def make_map(self, resp):
1043            for e in resp.get('embedding', []):
1044                if 'toponame' in e and 'physname' in e:
1045                    self.node[e['toponame']] = e['physname'][0]
1046
1047        def __call__(self, uri, aid, topo, masters, attrs=None, connInfo=None):
1048            req = {
1049                    'allocID': { 'fedid' : aid }, 
1050                    'segmentdescription': { 
1051                        'topdldescription': topo.to_dict(),
1052                    },
1053                }
1054
1055            if connInfo:
1056                req['connection'] = connInfo
1057
1058            import_svcs = [ s for m in masters.values() \
1059                    for s in m if self.testbed in s.importers]
1060
1061            if import_svcs or self.testbed in masters:
1062                req['service'] = []
1063
1064            for s in import_svcs:
1065                for r in s.reqs:
1066                    sr = copy.deepcopy(r)
1067                    sr['visibility'] = 'import';
1068                    req['service'].append(sr)
1069
1070            for s in masters.get(self.testbed, []):
1071                for r in s.reqs:
1072                    sr = copy.deepcopy(r)
1073                    sr['visibility'] = 'export';
1074                    req['service'].append(sr)
1075
1076            if attrs:
1077                req['fedAttr'] = attrs
1078
1079            try:
1080                self.log.debug("Calling StartSegment at %s " % uri)
1081                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
1082                        self.trusted_certs)
1083                if r.has_key('StartSegmentResponseBody'):
1084                    lval = r['StartSegmentResponseBody'].get('allocationLog',
1085                            None)
1086                    if lval and self.log_collector:
1087                        for line in  lval.splitlines(True):
1088                            self.log_collector.write(line)
1089                    self.make_map(r['StartSegmentResponseBody'])
1090                    self.response = r
1091                else:
1092                    raise service_error(service_error.internal, 
1093                            "Bad response!?: %s" %r)
1094                return True
1095            except service_error, e:
1096                self.log.error("Start segment failed on %s: %s" % \
1097                        (self.testbed, e))
1098                return False
1099
1100
1101
1102    class terminate_segment:
1103        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
1104                cert_pwd=None, trusted_certs=None, caller=None):
1105            self.log = log
1106            self.debug = debug
1107            self.cert_file = cert_file
1108            self.cert_pwd = cert_pwd
1109            self.trusted_certs = None
1110            self.caller = caller
1111            self.testbed = testbed
1112
1113        def __call__(self, uri, aid ):
1114            req = {
1115                    'allocID': aid , 
1116                }
1117            try:
1118                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
1119                        self.trusted_certs)
1120                return True
1121            except service_error, e:
1122                self.log.error("Terminate segment failed on %s: %s" % \
1123                        (self.testbed, e))
1124                return False
1125   
1126
1127    def allocate_resources(self, allocated, masters, eid, expid, 
1128            tbparams, top, topo, tmpdir, alloc_log=None, log_collector=None, 
1129            attrs=None, connInfo={}, tbmap=None):
1130
1131        started = { }           # Testbeds where a sub-experiment started
1132                                # successfully
1133
1134        # XXX
1135        fail_soft = False
1136
1137        if tbmap is None: tbmap = { }
1138
1139        log = alloc_log or self.log
1140
1141        thread_pool = self.thread_pool(self.nthreads)
1142        threads = [ ]
1143        starters = [ ]
1144
1145        for tb in allocated.keys():
1146            # Create and start a thread to start the segment, and save it
1147            # to get the return value later
1148            tb_attrs = copy.copy(attrs)
1149            thread_pool.wait_for_slot()
1150            uri = tbparams[tb].get('uri', tbmap.get(testbed_base(tb), None))
1151            base, suffix = split_testbed(tb)
1152            if suffix:
1153                tb_attrs.append({'attribute': 'experiment_name', 
1154                    'value': "%s-%s" % (eid, suffix)})
1155            else:
1156                tb_attrs.append({'attribute': 'experiment_name', 'value': eid})
1157            if not uri:
1158                raise service_error(service_error.internal, 
1159                        "Unknown testbed %s !?" % tb)
1160
1161            if tbparams[tb].has_key('allocID') and \
1162                    tbparams[tb]['allocID'].has_key('fedid'):
1163                aid = tbparams[tb]['allocID']['fedid']
1164            else:
1165                raise service_error(service_error.internal, 
1166                        "No alloc id for testbed %s !?" % tb)
1167
1168            s = self.start_segment(log=log, debug=self.debug,
1169                    testbed=tb, cert_file=self.cert_file,
1170                    cert_pwd=self.cert_pwd, trusted_certs=self.trusted_certs,
1171                    caller=self.call_StartSegment,
1172                    log_collector=log_collector)
1173            starters.append(s)
1174            t  = self.pooled_thread(\
1175                    target=s, name=tb,
1176                    args=(uri, aid, topo[tb], masters, tb_attrs, connInfo[tb]),
1177                    pdata=thread_pool, trace_file=self.trace_file)
1178            threads.append(t)
1179            t.start()
1180
1181        # Wait until all finish (keep pinging the log, though)
1182        mins = 0
1183        revoked = False
1184        while not thread_pool.wait_for_all_done(60.0):
1185            mins += 1
1186            alloc_log.info("Waiting for sub threads (it has been %d mins)" \
1187                    % mins)
1188            if not revoked and \
1189                    len([ t.getName() for t in threads if t.rv == False]) > 0:
1190                # a testbed has failed.  Revoke this experiment's
1191                # synchronizarion values so that sub experiments will not
1192                # deadlock waiting for synchronization that will never happen
1193                self.log.info("A subexperiment has failed to swap in, " + \
1194                        "revoking synch keys")
1195                var_key = "fedid:%s" % expid
1196                for k in self.synch_store.all_keys():
1197                    if len(k) > 45 and k[0:46] == var_key:
1198                        self.synch_store.revoke_key(k)
1199                revoked = True
1200
1201        failed = [ t.getName() for t in threads if not t.rv ]
1202        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1203
1204        # If one failed clean up, unless fail_soft is set
1205        if failed:
1206            if not fail_soft:
1207                thread_pool.clear()
1208                for tb in succeeded:
1209                    # Create and start a thread to stop the segment
1210                    thread_pool.wait_for_slot()
1211                    uri = tbparams[tb]['uri']
1212                    t  = self.pooled_thread(\
1213                            target=self.terminate_segment(log=log,
1214                                testbed=tb,
1215                                cert_file=self.cert_file, 
1216                                cert_pwd=self.cert_pwd,
1217                                trusted_certs=self.trusted_certs,
1218                                caller=self.call_TerminateSegment),
1219                            args=(uri, tbparams[tb]['federant']['allocID']),
1220                            name=tb,
1221                            pdata=thread_pool, trace_file=self.trace_file)
1222                    t.start()
1223                # Wait until all finish (if any are being stopped)
1224                if succeeded:
1225                    thread_pool.wait_for_all_done()
1226
1227                # release the allocations
1228                for tb in tbparams.keys():
1229                    self.release_access(tb, tbparams[tb]['allocID'], 
1230                            tbmap=tbmap, uri=tbparams[tb].get('uri', None))
1231                # Remove the placeholder
1232                self.state_lock.acquire()
1233                self.state[eid]['experimentStatus'] = 'failed'
1234                if self.state_filename: self.write_state()
1235                self.state_lock.release()
1236                # Remove the repo dir
1237                self.remove_dirs("%s/%s" %(self.repodir, expid))
1238                # Walk up tmpdir, deleting as we go
1239                if self.cleanup:
1240                    self.remove_dirs(tmpdir)
1241                else:
1242                    log.debug("[start_experiment]: not removing %s" % tmpdir)
1243
1244
1245                log.error("Swap in failed on %s" % ",".join(failed))
1246                return
1247        else:
1248            # Walk through the successes and gather the virtual to physical
1249            # mapping.
1250            embedding = [ ]
1251            for s in starters:
1252                for k, v in s.node.items():
1253                    embedding.append({
1254                        'toponame': k, 
1255                        'physname': [ v],
1256                        'testbed': s.testbed
1257                        })
1258            log.info("[start_segment]: Experiment %s active" % eid)
1259
1260
1261        # Walk up tmpdir, deleting as we go
1262        if self.cleanup:
1263            self.remove_dirs(tmpdir)
1264        else:
1265            log.debug("[start_experiment]: not removing %s" % tmpdir)
1266
1267        # Insert the experiment into our state and update the disk copy.
1268        self.state_lock.acquire()
1269        self.state[expid]['experimentStatus'] = 'active'
1270        self.state[eid] = self.state[expid]
1271        self.state[eid]['experimentdescription']['topdldescription'] = \
1272                top.to_dict()
1273        self.state[eid]['embedding'] = embedding
1274        if self.state_filename: self.write_state()
1275        self.state_lock.release()
1276        return
1277
1278
1279    def add_kit(self, e, kit):
1280        """
1281        Add a Software object created from the list of (install, location)
1282        tuples passed as kit  to the software attribute of an object e.  We
1283        do this enough to break out the code, but it's kind of a hack to
1284        avoid changing the old tuple rep.
1285        """
1286
1287        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1288
1289        if isinstance(e.software, list): e.software.extend(s)
1290        else: e.software = s
1291
1292
1293    def create_experiment_state(self, fid, req, expid, expcert,
1294            state='starting'):
1295        """
1296        Create the initial entry in the experiment's state.  The expid and
1297        expcert are the experiment's fedid and certifacte that represents that
1298        ID, which are installed in the experiment state.  If the request
1299        includes a suggested local name that is used if possible.  If the local
1300        name is already taken by an experiment owned by this user that has
1301        failed, it is overwritten.  Otherwise new letters are added until a
1302        valid localname is found.  The generated local name is returned.
1303        """
1304
1305        if req.has_key('experimentID') and \
1306                req['experimentID'].has_key('localname'):
1307            overwrite = False
1308            eid = req['experimentID']['localname']
1309            # If there's an old failed experiment here with the same local name
1310            # and accessible by this user, we'll overwrite it, otherwise we'll
1311            # fall through and do the collision avoidance.
1312            old_expid = self.get_experiment_fedid(eid)
1313            if old_expid and self.check_experiment_access(fid, old_expid):
1314                self.state_lock.acquire()
1315                status = self.state[eid].get('experimentStatus', None)
1316                if status and status == 'failed':
1317                    # remove the old access attribute
1318                    self.auth.unset_attribute(fid, old_expid)
1319                    self.auth.save()
1320                    overwrite = True
1321                    del self.state[eid]
1322                    del self.state[old_expid]
1323                self.state_lock.release()
1324            self.state_lock.acquire()
1325            while (self.state.has_key(eid) and not overwrite):
1326                eid += random.choice(string.ascii_letters)
1327            # Initial state
1328            self.state[eid] = {
1329                    'experimentID' : \
1330                            [ { 'localname' : eid }, {'fedid': expid } ],
1331                    'experimentStatus': state,
1332                    'experimentAccess': { 'X509' : expcert },
1333                    'owner': fid,
1334                    'log' : [],
1335                }
1336            self.state[expid] = self.state[eid]
1337            if self.state_filename: self.write_state()
1338            self.state_lock.release()
1339        else:
1340            eid = self.exp_stem
1341            for i in range(0,5):
1342                eid += random.choice(string.ascii_letters)
1343            self.state_lock.acquire()
1344            while (self.state.has_key(eid)):
1345                eid = self.exp_stem
1346                for i in range(0,5):
1347                    eid += random.choice(string.ascii_letters)
1348            # Initial state
1349            self.state[eid] = {
1350                    'experimentID' : \
1351                            [ { 'localname' : eid }, {'fedid': expid } ],
1352                    'experimentStatus': state,
1353                    'experimentAccess': { 'X509' : expcert },
1354                    'owner': fid,
1355                    'log' : [],
1356                }
1357            self.state[expid] = self.state[eid]
1358            if self.state_filename: self.write_state()
1359            self.state_lock.release()
1360
1361        return eid
1362
1363
1364    def allocate_ips_to_topo(self, top):
1365        """
1366        Add an ip4_address attribute to all the hosts in the topology, based on
1367        the shared substrates on which they sit.  An /etc/hosts file is also
1368        created and returned as a list of hostfiles entries.  We also return
1369        the allocator, because we may need to allocate IPs to portals
1370        (specifically DRAGON portals).
1371        """
1372        subs = sorted(top.substrates, 
1373                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1374                reverse=True)
1375        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1376        ifs = { }
1377        hosts = [ ]
1378
1379        for idx, s in enumerate(subs):
1380            net_size = len(s.interfaces)+2
1381
1382            a = ips.allocate(net_size)
1383            if a :
1384                base, num = a
1385                if num < net_size: 
1386                    raise service_error(service_error.internal,
1387                            "Allocator returned wrong number of IPs??")
1388            else:
1389                raise service_error(service_error.req, 
1390                        "Cannot allocate IP addresses")
1391            mask = ips.min_alloc
1392            while mask < net_size:
1393                mask *= 2
1394
1395            netmask = ((2**32-1) ^ (mask-1))
1396
1397            base += 1
1398            for i in s.interfaces:
1399                i.attribute.append(
1400                        topdl.Attribute('ip4_address', 
1401                            "%s" % ip_addr(base)))
1402                i.attribute.append(
1403                        topdl.Attribute('ip4_netmask', 
1404                            "%s" % ip_addr(int(netmask))))
1405
1406                hname = i.element.name
1407                if ifs.has_key(hname):
1408                    hosts.append("%s\t%s-%s %s-%d" % \
1409                            (ip_addr(base), hname, s.name, hname,
1410                                ifs[hname]))
1411                else:
1412                    ifs[hname] = 0
1413                    hosts.append("%s\t%s-%s %s-%d %s" % \
1414                            (ip_addr(base), hname, s.name, hname,
1415                                ifs[hname], hname))
1416
1417                ifs[hname] += 1
1418                base += 1
1419        return hosts, ips
1420
1421    def get_access_to_testbeds(self, testbeds, access_user, allocated, 
1422            tbparams, masters, tbmap):
1423        """
1424        Request access to the various testbeds required for this instantiation
1425        (passed in as testbeds).  User, access_user, expoert_project and master
1426        are used to construct the correct requests.  Per-testbed parameters are
1427        returned in tbparams.
1428        """
1429        for tb in testbeds:
1430            self.get_access(tb, None, tbparams, access_user, masters, tbmap)
1431            allocated[tb] = 1
1432
1433    def split_topology(self, top, topo, testbeds):
1434        """
1435        Create the sub-topologies that are needed for experiment instantiation.
1436        """
1437        for tb in testbeds:
1438            topo[tb] = top.clone()
1439            # copy in for loop allows deletions from the original
1440            for e in [ e for e in topo[tb].elements]:
1441                etb = e.get_attribute('testbed')
1442                # NB: elements without a testbed attribute won't appear in any
1443                # sub topologies. 
1444                if not etb or etb != tb:
1445                    for i in e.interface:
1446                        for s in i.subs:
1447                            try:
1448                                s.interfaces.remove(i)
1449                            except ValueError:
1450                                raise service_error(service_error.internal,
1451                                        "Can't remove interface??")
1452                    topo[tb].elements.remove(e)
1453            topo[tb].make_indices()
1454
1455    def wrangle_software(self, expid, top, topo, tbparams):
1456        """
1457        Copy software out to the repository directory, allocate permissions and
1458        rewrite the segment topologies to look for the software in local
1459        places.
1460        """
1461
1462        # Copy the rpms and tarfiles to a distribution directory from
1463        # which the federants can retrieve them
1464        linkpath = "%s/software" %  expid
1465        softdir ="%s/%s" % ( self.repodir, linkpath)
1466        softmap = { }
1467        # These are in a list of tuples format (each kit).  This comprehension
1468        # unwraps them into a single list of tuples that initilaizes the set of
1469        # tuples.
1470        pkgs = set([ t for l in [self.fedkit, self.gatewaykit] \
1471                for p, t in l ])
1472        pkgs.update([x.location for e in top.elements \
1473                for x in e.software])
1474        try:
1475            os.makedirs(softdir)
1476        except EnvironmentError, e:
1477            raise service_error(
1478                    "Cannot create software directory: %s" % e)
1479        # The actual copying.  Everything's converted into a url for copying.
1480        for pkg in pkgs:
1481            loc = pkg
1482
1483            scheme, host, path = urlparse(loc)[0:3]
1484            dest = os.path.basename(path)
1485            if not scheme:
1486                if not loc.startswith('/'):
1487                    loc = "/%s" % loc
1488                loc = "file://%s" %loc
1489            try:
1490                u = urlopen(loc)
1491            except Exception, e:
1492                raise service_error(service_error.req, 
1493                        "Cannot open %s: %s" % (loc, e))
1494            try:
1495                f = open("%s/%s" % (softdir, dest) , "w")
1496                self.log.debug("Writing %s/%s" % (softdir,dest) )
1497                data = u.read(4096)
1498                while data:
1499                    f.write(data)
1500                    data = u.read(4096)
1501                f.close()
1502                u.close()
1503            except Exception, e:
1504                raise service_error(service_error.internal,
1505                        "Could not copy %s: %s" % (loc, e))
1506            path = re.sub("/tmp", "", linkpath)
1507            # XXX
1508            softmap[pkg] = \
1509                    "%s/%s/%s" %\
1510                    ( self.repo_url, path, dest)
1511
1512            # Allow the individual segments to access the software.
1513            for tb in tbparams.keys():
1514                self.auth.set_attribute(tbparams[tb]['allocID']['fedid'], 
1515                        "/%s/%s" % ( path, dest))
1516            self.auth.save()
1517
1518        # Convert the software locations in the segments into the local
1519        # copies on this host
1520        for soft in [ s for tb in topo.values() \
1521                for e in tb.elements \
1522                    if getattr(e, 'software', False) \
1523                        for s in e.software ]:
1524            if softmap.has_key(soft.location):
1525                soft.location = softmap[soft.location]
1526
1527
1528    def new_experiment(self, req, fid):
1529        """
1530        The external interface to empty initial experiment creation called from
1531        the dispatcher.
1532
1533        Creates a working directory, splits the incoming description using the
1534        splitter script and parses out the avrious subsections using the
1535        lcasses above.  Once each sub-experiment is created, use pooled threads
1536        to instantiate them and start it all up.
1537        """
1538        req = req.get('NewRequestBody', None)
1539        if not req:
1540            raise service_error(service_error.req,
1541                    "Bad request format (no NewRequestBody)")
1542
1543        if self.auth.import_credentials(data_list=req.get('credential', [])):
1544            self.auth.save()
1545       
1546        if not self.auth.check_attribute(fid, 'new'):
1547            raise service_error(service_error.access, "New access denied")
1548
1549        try:
1550            tmpdir = tempfile.mkdtemp(prefix="split-")
1551        except EnvironmentError:
1552            raise service_error(service_error.internal, "Cannot create tmp dir")
1553
1554        try:
1555            access_user = self.accessdb[fid]
1556        except KeyError:
1557            raise service_error(service_error.internal,
1558                    "Access map and authorizer out of sync in " + \
1559                            "new_experiment for fedid %s"  % fid)
1560
1561        pid = "dummy"
1562        gid = "dummy"
1563
1564        # Generate an ID for the experiment (slice) and a certificate that the
1565        # allocator can use to prove they own it.  We'll ship it back through
1566        # the encrypted connection.  If the requester supplied one, use it.
1567        if 'experimentAccess' in req and 'X509' in req['experimentAccess']:
1568            expcert = req['experimentAccess']['X509']
1569            expid = fedid(certstr=expcert)
1570            self.state_lock.acquire()
1571            if expid in self.state:
1572                self.state_lock.release()
1573                raise service_error(service_error.req, 
1574                        'fedid %s identifies an existing experiment' % expid)
1575            self.state_lock.release()
1576        else:
1577            (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1578
1579        #now we're done with the tmpdir, and it should be empty
1580        if self.cleanup:
1581            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1582            os.rmdir(tmpdir)
1583        else:
1584            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1585
1586        eid = self.create_experiment_state(fid, req, expid, expcert, 
1587                state='empty')
1588
1589        # Let users touch the state
1590        self.auth.set_attribute(fid, expid)
1591        self.auth.set_attribute(expid, expid)
1592        # Override fedids can manipulate state as well
1593        for o in self.overrides:
1594            self.auth.set_attribute(o, expid)
1595        self.auth.save()
1596
1597        rv = {
1598                'experimentID': [
1599                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1600                ],
1601                'experimentStatus': 'empty',
1602                'experimentAccess': { 'X509' : expcert }
1603            }
1604
1605        return rv
1606
1607    def create_experiment(self, req, fid):
1608        """
1609        The external interface to experiment creation called from the
1610        dispatcher.
1611
1612        Creates a working directory, splits the incoming description using the
1613        splitter script and parses out the various subsections using the
1614        classes above.  Once each sub-experiment is created, use pooled threads
1615        to instantiate them and start it all up.
1616        """
1617
1618        req = req.get('CreateRequestBody', None)
1619        if not req:
1620            raise service_error(service_error.req,
1621                    "Bad request format (no CreateRequestBody)")
1622
1623        # Get the experiment access
1624        exp = req.get('experimentID', None)
1625        if exp:
1626            if exp.has_key('fedid'):
1627                key = exp['fedid']
1628                expid = key
1629                eid = None
1630            elif exp.has_key('localname'):
1631                key = exp['localname']
1632                eid = key
1633                expid = None
1634            else:
1635                raise service_error(service_error.req, "Unknown lookup type")
1636        else:
1637            raise service_error(service_error.req, "No request?")
1638
1639        self.check_experiment_access(fid, key)
1640
1641        # Install the testbed map entries supplied with the request into a copy
1642        # of the testbed map.
1643        tbmap = dict(self.tbmap)
1644        for m in req.get('testbedmap', []):
1645            if 'testbed' in m and 'uri' in m:
1646                tbmap[m['testbed']] = m['uri']
1647
1648        try:
1649            tmpdir = tempfile.mkdtemp(prefix="split-")
1650            os.mkdir(tmpdir+"/keys")
1651        except EnvironmentError:
1652            raise service_error(service_error.internal, "Cannot create tmp dir")
1653
1654        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1655        gw_secretkey_base = "fed.%s" % self.ssh_type
1656        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1657        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1658        tclfile = tmpdir + "/experiment.tcl"
1659        tbparams = { }
1660        try:
1661            access_user = self.accessdb[fid]
1662        except KeyError:
1663            raise service_error(service_error.internal,
1664                    "Access map and authorizer out of sync in " + \
1665                            "create_experiment for fedid %s"  % fid)
1666
1667        pid = "dummy"
1668        gid = "dummy"
1669
1670        # The tcl parser needs to read a file so put the content into that file
1671        descr=req.get('experimentdescription', None)
1672        if descr:
1673            file_content=descr.get('ns2description', None)
1674            if file_content:
1675                try:
1676                    f = open(tclfile, 'w')
1677                    f.write(file_content)
1678                    f.close()
1679                except EnvironmentError:
1680                    raise service_error(service_error.internal,
1681                            "Cannot write temp experiment description")
1682            else:
1683                raise service_error(service_error.req, 
1684                        "Only ns2descriptions supported")
1685        else:
1686            raise service_error(service_error.req, "No experiment description")
1687
1688        self.state_lock.acquire()
1689        if self.state.has_key(key):
1690            self.state[key]['experimentStatus'] = "starting"
1691            for e in self.state[key].get('experimentID',[]):
1692                if not expid and e.has_key('fedid'):
1693                    expid = e['fedid']
1694                elif not eid and e.has_key('localname'):
1695                    eid = e['localname']
1696        self.state_lock.release()
1697
1698        if not (eid and expid):
1699            raise service_error(service_error.internal, 
1700                    "Cannot find local experiment info!?")
1701
1702        try: 
1703            # This catches exceptions to clear the placeholder if necessary
1704            try:
1705                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1706            except ValueError:
1707                raise service_error(service_error.server_config, 
1708                        "Bad key type (%s)" % self.ssh_type)
1709
1710            # Copy the service request
1711            tb_services = [ s for s in req.get('service',[]) ]
1712            # Translate to topdl
1713            if self.splitter_url:
1714                self.log.debug("Calling remote topdl translator at %s" % \
1715                        self.splitter_url)
1716                top = self.remote_ns2topdl(self.splitter_url, file_content)
1717            else:
1718                tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
1719                    str(self.muxmax), '-m', 'dummy']
1720
1721                tclcmd.extend([pid, gid, eid, tclfile])
1722
1723                self.log.debug("running local splitter %s", " ".join(tclcmd))
1724                # This is just fantastic.  As a side effect the parser copies
1725                # tb_compat.tcl into the current directory, so that directory
1726                # must be writable by the fedd user.  Doing this in the
1727                # temporary subdir ensures this is the case.
1728                tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
1729                        cwd=tmpdir)
1730                split_data = tclparser.stdout
1731
1732                top = topdl.topology_from_xml(file=split_data, top="experiment")
1733
1734            hosts, ip_allocator = self.allocate_ips_to_topo(top)
1735            # Find the testbeds to look up
1736            testbeds = set([ a.value for e in top.elements \
1737                    for a in e.attribute \
1738                        if a.attribute == 'testbed'])
1739
1740            tb_hosts = { }
1741            for tb in testbeds:
1742                tb_hosts[tb] = [ e.name for e in top.elements \
1743                        if isinstance(e, topdl.Computer) and \
1744                            e.get_attribute('testbed') and \
1745                            e.get_attribute('testbed') == tb]
1746
1747            masters = { }           # testbeds exporting services
1748            pmasters = { }          # Testbeds exporting services that
1749                                    # need portals
1750            for s in tb_services:
1751                # If this is a service request with the importall field
1752                # set, fill it out.
1753
1754                if s.get('importall', False):
1755                    s['import'] = [ tb for tb in testbeds \
1756                            if tb not in s.get('export',[])]
1757                    del s['importall']
1758
1759                # Add the service to masters
1760                for tb in s.get('export', []):
1761                    if s.get('name', None):
1762                        if tb not in masters:
1763                            masters[tb] = [ ]
1764
1765                        params = { }
1766                        if 'fedAttr' in s:
1767                            for a in s['fedAttr']:
1768                                params[a.get('attribute', '')] = \
1769                                        a.get('value','')
1770
1771                        fser = federated_service(name=s['name'],
1772                                exporter=tb, importers=s.get('import',[]),
1773                                params=params)
1774                        if fser.name == 'hide_hosts' \
1775                                and 'hosts' not in fser.params:
1776                            fser.params['hosts'] = \
1777                                    ",".join(tb_hosts.get(fser.exporter, []))
1778                        masters[tb].append(fser)
1779
1780                        if fser.portal:
1781                            if tb not in pmasters: pmasters[tb] = [ fser ]
1782                            else: pmasters[tb].append(fser)
1783                    else:
1784                        self.log.error('Testbed service does not have name " + \
1785                                "and importers')
1786
1787
1788            allocated = { }         # Testbeds we can access
1789            topo ={ }               # Sub topologies
1790            connInfo = { }          # Connection information
1791
1792            self.get_access_to_testbeds(testbeds, access_user, allocated, 
1793                    tbparams, masters, tbmap)
1794
1795            self.split_topology(top, topo, testbeds)
1796
1797            # Copy configuration files into the remote file store
1798            # The config urlpath
1799            configpath = "/%s/config" % expid
1800            # The config file system location
1801            configdir ="%s%s" % ( self.repodir, configpath)
1802            try:
1803                os.makedirs(configdir)
1804            except EnvironmentError, e:
1805                raise service_error(service_error.internal,
1806                        "Cannot create config directory: %s" % e)
1807            try:
1808                f = open("%s/hosts" % configdir, "w")
1809                f.write('\n'.join(hosts))
1810                f.close()
1811            except EnvironmentError, e:
1812                raise service_error(service_error.internal, 
1813                        "Cannot write hosts file: %s" % e)
1814            try:
1815                copy_file("%s" % gw_pubkey, "%s/%s" % \
1816                        (configdir, gw_pubkey_base))
1817                copy_file("%s" % gw_secretkey, "%s/%s" % \
1818                        (configdir, gw_secretkey_base))
1819            except EnvironmentError, e:
1820                raise service_error(service_error.internal, 
1821                        "Cannot copy keyfiles: %s" % e)
1822
1823            # Allow the individual testbeds to access the configuration files.
1824            for tb in tbparams.keys():
1825                asignee = tbparams[tb]['allocID']['fedid']
1826                for f in ("hosts", gw_secretkey_base, gw_pubkey_base):
1827                    self.auth.set_attribute(asignee, "%s/%s" % \
1828                            (configpath, f))
1829
1830            part = experiment_partition(self.auth, self.store_url, tbmap,
1831                    self.muxmax, self.direct_transit)
1832            part.add_portals(top, topo, eid, pmasters, tbparams, ip_allocator,
1833                    connInfo, expid)
1834            # Now get access to the dynamic testbeds (those added above)
1835            for tb in [ t for t in topo if t not in allocated]:
1836                self.get_access(tb, None, tbparams, access_user, masters)
1837                allocated[tb] = 1
1838                store_keys = topo[tb].get_attribute('store_keys')
1839                # Give the testbed access to keys it exports or imports
1840                if store_keys:
1841                    for sk in store_keys.split(" "):
1842                        self.auth.set_attribute(\
1843                                tbparams[tb]['allocID']['fedid'], sk)
1844            self.auth.save()
1845
1846            self.wrangle_software(expid, top, topo, tbparams)
1847
1848            vtopo = topdl.topology_to_vtopo(top)
1849            vis = self.genviz(vtopo)
1850
1851            # save federant information
1852            for k in allocated.keys():
1853                tbparams[k]['federant'] = {
1854                        'name': [ { 'localname' : eid} ],
1855                        'allocID' : tbparams[k]['allocID'],
1856                        'uri': tbparams[k]['uri'],
1857                    }
1858
1859            self.state_lock.acquire()
1860            self.state[eid]['vtopo'] = vtopo
1861            self.state[eid]['vis'] = vis
1862            self.state[eid]['experimentdescription'] = \
1863                    { 'topdldescription': top.to_dict() }
1864            self.state[eid]['federant'] = \
1865                    [ tbparams[tb]['federant'] for tb in tbparams.keys() \
1866                        if tbparams[tb].has_key('federant') ]
1867            if self.state_filename: 
1868                self.write_state()
1869            self.state_lock.release()
1870        except service_error, e:
1871            # If something goes wrong in the parse (usually an access error)
1872            # clear the placeholder state.  From here on out the code delays
1873            # exceptions.  Failing at this point returns a fault to the remote
1874            # caller.
1875
1876            self.state_lock.acquire()
1877            del self.state[eid]
1878            del self.state[expid]
1879            if self.state_filename: self.write_state()
1880            self.state_lock.release()
1881            raise e
1882
1883
1884        # Start the background swapper and return the starting state.  From
1885        # here on out, the state will stick around a while.
1886
1887        # Let users touch the state
1888        self.auth.set_attribute(fid, expid)
1889        self.auth.set_attribute(expid, expid)
1890        # Override fedids can manipulate state as well
1891        for o in self.overrides:
1892            self.auth.set_attribute(o, expid)
1893        self.auth.save()
1894
1895        # Create a logger that logs to the experiment's state object as well as
1896        # to the main log file.
1897        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
1898        alloc_collector = self.list_log(self.state[eid]['log'])
1899        h = logging.StreamHandler(alloc_collector)
1900        # XXX: there should be a global one of these rather than repeating the
1901        # code.
1902        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
1903                    '%d %b %y %H:%M:%S'))
1904        alloc_log.addHandler(h)
1905       
1906        attrs = [ 
1907                {
1908                    'attribute': 'ssh_pubkey', 
1909                    'value': '%s/%s/config/%s' % \
1910                            (self.repo_url, expid, gw_pubkey_base)
1911                },
1912                {
1913                    'attribute': 'ssh_secretkey', 
1914                    'value': '%s/%s/config/%s' % \
1915                            (self.repo_url, expid, gw_secretkey_base)
1916                },
1917                {
1918                    'attribute': 'hosts', 
1919                    'value': '%s/%s/config/hosts' % \
1920                            (self.repo_url, expid)
1921                },
1922            ]
1923
1924        # transit and disconnected testbeds may not have a connInfo entry.
1925        # Fill in the blanks.
1926        for t in allocated.keys():
1927            if not connInfo.has_key(t):
1928                connInfo[t] = { }
1929
1930        # Start a thread to do the resource allocation
1931        t  = Thread(target=self.allocate_resources,
1932                args=(allocated, masters, eid, expid, tbparams, 
1933                    top, topo, tmpdir, alloc_log, alloc_collector, attrs,
1934                    connInfo, tbmap),
1935                name=eid)
1936        t.start()
1937
1938        rv = {
1939                'experimentID': [
1940                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1941                ],
1942                'experimentStatus': 'starting',
1943            }
1944
1945        return rv
1946   
1947    def get_experiment_fedid(self, key):
1948        """
1949        find the fedid associated with the localname key in the state database.
1950        """
1951
1952        rv = None
1953        self.state_lock.acquire()
1954        if self.state.has_key(key):
1955            if isinstance(self.state[key], dict):
1956                try:
1957                    kl = [ f['fedid'] for f in \
1958                            self.state[key]['experimentID']\
1959                                if f.has_key('fedid') ]
1960                except KeyError:
1961                    self.state_lock.release()
1962                    raise service_error(service_error.internal, 
1963                            "No fedid for experiment %s when getting "+\
1964                                    "fedid(!?)" % key)
1965                if len(kl) == 1:
1966                    rv = kl[0]
1967                else:
1968                    self.state_lock.release()
1969                    raise service_error(service_error.internal, 
1970                            "multiple fedids for experiment %s when " +\
1971                                    "getting fedid(!?)" % key)
1972            else:
1973                self.state_lock.release()
1974                raise service_error(service_error.internal, 
1975                        "Unexpected state for %s" % key)
1976        self.state_lock.release()
1977        return rv
1978
1979    def check_experiment_access(self, fid, key):
1980        """
1981        Confirm that the fid has access to the experiment.  Though a request
1982        may be made in terms of a local name, the access attribute is always
1983        the experiment's fedid.
1984        """
1985        if not isinstance(key, fedid):
1986            key = self.get_experiment_fedid(key)
1987
1988        if self.auth.check_attribute(fid, key):
1989            return True
1990        else:
1991            raise service_error(service_error.access, "Access Denied")
1992
1993
1994    def get_handler(self, path, fid):
1995        self.log.info("Get handler %s %s" % (path, fid))
1996        if self.auth.check_attribute(fid, path):
1997            return ("%s/%s" % (self.repodir, path), "application/binary")
1998        else:
1999            return (None, None)
2000
2001    def get_vtopo(self, req, fid):
2002        """
2003        Return the stored virtual topology for this experiment
2004        """
2005        rv = None
2006        state = None
2007
2008        req = req.get('VtopoRequestBody', None)
2009        if not req:
2010            raise service_error(service_error.req,
2011                    "Bad request format (no VtopoRequestBody)")
2012        exp = req.get('experiment', None)
2013        if exp:
2014            if exp.has_key('fedid'):
2015                key = exp['fedid']
2016                keytype = "fedid"
2017            elif exp.has_key('localname'):
2018                key = exp['localname']
2019                keytype = "localname"
2020            else:
2021                raise service_error(service_error.req, "Unknown lookup type")
2022        else:
2023            raise service_error(service_error.req, "No request?")
2024
2025        self.check_experiment_access(fid, key)
2026
2027        self.state_lock.acquire()
2028        if self.state.has_key(key):
2029            if self.state[key].has_key('vtopo'):
2030                rv = { 'experiment' : {keytype: key },\
2031                        'vtopo': self.state[key]['vtopo'],\
2032                    }
2033            else:
2034                state = self.state[key]['experimentStatus']
2035        self.state_lock.release()
2036
2037        if rv: return rv
2038        else: 
2039            if state:
2040                raise service_error(service_error.partial, 
2041                        "Not ready: %s" % state)
2042            else:
2043                raise service_error(service_error.req, "No such experiment")
2044
2045    def get_vis(self, req, fid):
2046        """
2047        Return the stored visualization for this experiment
2048        """
2049        rv = None
2050        state = None
2051
2052        req = req.get('VisRequestBody', None)
2053        if not req:
2054            raise service_error(service_error.req,
2055                    "Bad request format (no VisRequestBody)")
2056        exp = req.get('experiment', None)
2057        if exp:
2058            if exp.has_key('fedid'):
2059                key = exp['fedid']
2060                keytype = "fedid"
2061            elif exp.has_key('localname'):
2062                key = exp['localname']
2063                keytype = "localname"
2064            else:
2065                raise service_error(service_error.req, "Unknown lookup type")
2066        else:
2067            raise service_error(service_error.req, "No request?")
2068
2069        self.check_experiment_access(fid, key)
2070
2071        self.state_lock.acquire()
2072        if self.state.has_key(key):
2073            if self.state[key].has_key('vis'):
2074                rv =  { 'experiment' : {keytype: key },\
2075                        'vis': self.state[key]['vis'],\
2076                        }
2077            else:
2078                state = self.state[key]['experimentStatus']
2079        self.state_lock.release()
2080
2081        if rv: return rv
2082        else:
2083            if state:
2084                raise service_error(service_error.partial, 
2085                        "Not ready: %s" % state)
2086            else:
2087                raise service_error(service_error.req, "No such experiment")
2088
2089    def clean_info_response(self, rv):
2090        """
2091        Remove the information in the experiment's state object that is not in
2092        the info response.
2093        """
2094        # Remove the owner info (should always be there, but...)
2095        if rv.has_key('owner'): del rv['owner']
2096
2097        # Convert the log into the allocationLog parameter and remove the
2098        # log entry (with defensive programming)
2099        if rv.has_key('log'):
2100            rv['allocationLog'] = "".join(rv['log'])
2101            del rv['log']
2102        else:
2103            rv['allocationLog'] = ""
2104
2105        if rv['experimentStatus'] != 'active':
2106            if rv.has_key('federant'): del rv['federant']
2107        else:
2108            # remove the allocationID and uri info from each federant
2109            for f in rv.get('federant', []):
2110                if f.has_key('allocID'): del f['allocID']
2111                if f.has_key('uri'): del f['uri']
2112
2113        return rv
2114
2115    def get_info(self, req, fid):
2116        """
2117        Return all the stored info about this experiment
2118        """
2119        rv = None
2120
2121        req = req.get('InfoRequestBody', None)
2122        if not req:
2123            raise service_error(service_error.req,
2124                    "Bad request format (no InfoRequestBody)")
2125        exp = req.get('experiment', None)
2126        if exp:
2127            if exp.has_key('fedid'):
2128                key = exp['fedid']
2129                keytype = "fedid"
2130            elif exp.has_key('localname'):
2131                key = exp['localname']
2132                keytype = "localname"
2133            else:
2134                raise service_error(service_error.req, "Unknown lookup type")
2135        else:
2136            raise service_error(service_error.req, "No request?")
2137
2138        self.check_experiment_access(fid, key)
2139
2140        # The state may be massaged by the service function that called
2141        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
2142        # state.
2143        self.state_lock.acquire()
2144        if self.state.has_key(key):
2145            rv = copy.deepcopy(self.state[key])
2146        self.state_lock.release()
2147
2148        if rv:
2149            return self.clean_info_response(rv)
2150        else:
2151            raise service_error(service_error.req, "No such experiment")
2152
2153    def get_multi_info(self, req, fid):
2154        """
2155        Return all the stored info that this fedid can access
2156        """
2157        rv = { 'info': [ ] }
2158
2159        self.state_lock.acquire()
2160        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2161            try:
2162                self.check_experiment_access(fid, key)
2163            except service_error, e:
2164                if e.code == service_error.access:
2165                    continue
2166                else:
2167                    self.state_lock.release()
2168                    raise e
2169
2170            if self.state.has_key(key):
2171                e = copy.deepcopy(self.state[key])
2172                e = self.clean_info_response(e)
2173                rv['info'].append(e)
2174        self.state_lock.release()
2175        return rv
2176
2177    def remove_dirs(self, dir):
2178        """
2179        Remove the directory tree and all files rooted at dir.  Log any errors,
2180        but continue.
2181        """
2182        self.log.debug("[removedirs]: removing %s" % dir)
2183        try:
2184            for path, dirs, files in os.walk(dir, topdown=False):
2185                for f in files:
2186                    os.remove(os.path.join(path, f))
2187                for d in dirs:
2188                    os.rmdir(os.path.join(path, d))
2189            os.rmdir(dir)
2190        except EnvironmentError, e:
2191            self.log.error("Error deleting directory tree in %s" % e);
2192
2193    def terminate_experiment(self, req, fid):
2194        """
2195        Swap this experiment out on the federants and delete the shared
2196        information
2197        """
2198        tbparams = { }
2199        req = req.get('TerminateRequestBody', None)
2200        if not req:
2201            raise service_error(service_error.req,
2202                    "Bad request format (no TerminateRequestBody)")
2203        force = req.get('force', False)
2204        exp = req.get('experiment', None)
2205        if exp:
2206            if exp.has_key('fedid'):
2207                key = exp['fedid']
2208                keytype = "fedid"
2209            elif exp.has_key('localname'):
2210                key = exp['localname']
2211                keytype = "localname"
2212            else:
2213                raise service_error(service_error.req, "Unknown lookup type")
2214        else:
2215            raise service_error(service_error.req, "No request?")
2216
2217        self.check_experiment_access(fid, key)
2218
2219        dealloc_list = [ ]
2220
2221
2222        # Create a logger that logs to the dealloc_list as well as to the main
2223        # log file.
2224        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2225        h = logging.StreamHandler(self.list_log(dealloc_list))
2226        # XXX: there should be a global one of these rather than repeating the
2227        # code.
2228        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2229                    '%d %b %y %H:%M:%S'))
2230        dealloc_log.addHandler(h)
2231
2232        self.state_lock.acquire()
2233        fed_exp = self.state.get(key, None)
2234        repo = None
2235
2236        if fed_exp:
2237            # This branch of the conditional holds the lock to generate a
2238            # consistent temporary tbparams variable to deallocate experiments.
2239            # It releases the lock to do the deallocations and reacquires it to
2240            # remove the experiment state when the termination is complete.
2241
2242            # First make sure that the experiment creation is complete.
2243            status = fed_exp.get('experimentStatus', None)
2244
2245            if status:
2246                if status in ('starting', 'terminating'):
2247                    if not force:
2248                        self.state_lock.release()
2249                        raise service_error(service_error.partial, 
2250                                'Experiment still being created or destroyed')
2251                    else:
2252                        self.log.warning('Experiment in %s state ' % status + \
2253                                'being terminated by force.')
2254            else:
2255                # No status??? trouble
2256                self.state_lock.release()
2257                raise service_error(service_error.internal,
2258                        "Experiment has no status!?")
2259
2260            ids = []
2261            #  experimentID is a list of dicts that are self-describing
2262            #  identifiers.  This finds all the fedids and localnames - the
2263            #  keys of self.state - and puts them into ids.
2264            for id in fed_exp.get('experimentID', []):
2265                if id.has_key('fedid'): 
2266                    ids.append(id['fedid'])
2267                    repo = "%s" % id['fedid']
2268                if id.has_key('localname'): ids.append(id['localname'])
2269
2270            # Collect the allocation/segment ids into a dict keyed by the fedid
2271            # of the allocation (or a monotonically increasing integer) that
2272            # contains a tuple of uri, aid (which is a dict...)
2273            for i, fed in enumerate(fed_exp.get('federant', [])):
2274                try:
2275                    uri = fed['uri']
2276                    aid = fed['allocID']
2277                    k = fed['allocID'].get('fedid', i)
2278                except KeyError, e:
2279                    continue
2280                tbparams[k] = (uri, aid)
2281            fed_exp['experimentStatus'] = 'terminating'
2282            if self.state_filename: self.write_state()
2283            self.state_lock.release()
2284
2285            # Stop everyone.  NB, wait_for_all waits until a thread starts and
2286            # then completes, so we can't wait if nothing starts.  So, no
2287            # tbparams, no start.
2288            if len(tbparams) > 0:
2289                thread_pool = self.thread_pool(self.nthreads)
2290                for k in tbparams.keys():
2291                    # Create and start a thread to stop the segment
2292                    thread_pool.wait_for_slot()
2293                    uri, aid = tbparams[k]
2294                    t  = self.pooled_thread(\
2295                            target=self.terminate_segment(log=dealloc_log,
2296                                testbed=uri,
2297                                cert_file=self.cert_file, 
2298                                cert_pwd=self.cert_pwd,
2299                                trusted_certs=self.trusted_certs,
2300                                caller=self.call_TerminateSegment),
2301                            args=(uri, aid), name=k,
2302                            pdata=thread_pool, trace_file=self.trace_file)
2303                    t.start()
2304                # Wait for completions
2305                thread_pool.wait_for_all_done()
2306
2307            # release the allocations (failed experiments have done this
2308            # already, and starting experiments may be in odd states, so we
2309            # ignore errors releasing those allocations
2310            try: 
2311                for k in tbparams.keys():
2312                    # This releases access by uri
2313                    uri, aid = tbparams[k]
2314                    self.release_access(None, aid, uri=uri)
2315            except service_error, e:
2316                if status != 'failed' and not force:
2317                    raise e
2318
2319            # Remove the terminated experiment
2320            self.state_lock.acquire()
2321            for id in ids:
2322                if self.state.has_key(id): del self.state[id]
2323
2324            if self.state_filename: self.write_state()
2325            self.state_lock.release()
2326
2327            # Delete any synch points associated with this experiment.  All
2328            # synch points begin with the fedid of the experiment.
2329            fedid_keys = set(["fedid:%s" % f for f in ids \
2330                    if isinstance(f, fedid)])
2331            for k in self.synch_store.all_keys():
2332                try:
2333                    if len(k) > 45 and k[0:46] in fedid_keys:
2334                        self.synch_store.del_value(k)
2335                except synch_store.BadDeletionError:
2336                    pass
2337            self.write_store()
2338
2339            # Remove software and other cached stuff from the filesystem.
2340            if repo:
2341                self.remove_dirs("%s/%s" % (self.repodir, repo))
2342               
2343            return { 
2344                    'experiment': exp , 
2345                    'deallocationLog': "".join(dealloc_list),
2346                    }
2347        else:
2348            # Don't forget to release the lock
2349            self.state_lock.release()
2350            raise service_error(service_error.req, "No saved state")
2351
2352
2353    def GetValue(self, req, fid):
2354        """
2355        Get a value from the synchronized store
2356        """
2357        req = req.get('GetValueRequestBody', None)
2358        if not req:
2359            raise service_error(service_error.req,
2360                    "Bad request format (no GetValueRequestBody)")
2361       
2362        name = req['name']
2363        wait = req['wait']
2364        rv = { 'name': name }
2365
2366        if self.auth.check_attribute(fid, name):
2367            self.log.debug("[GetValue] asking for %s " % name)
2368            try:
2369                v = self.synch_store.get_value(name, wait)
2370            except synch_store.RevokedKeyError:
2371                # No more synch on this key
2372                raise service_error(service_error.federant, 
2373                        "Synch key %s revoked" % name)
2374            if v is not None:
2375                rv['value'] = v
2376            self.log.debug("[GetValue] got %s from %s" % (v, name))
2377            return rv
2378        else:
2379            raise service_error(service_error.access, "Access Denied")
2380       
2381
2382    def SetValue(self, req, fid):
2383        """
2384        Set a value in the synchronized store
2385        """
2386        req = req.get('SetValueRequestBody', None)
2387        if not req:
2388            raise service_error(service_error.req,
2389                    "Bad request format (no SetValueRequestBody)")
2390       
2391        name = req['name']
2392        v = req['value']
2393
2394        if self.auth.check_attribute(fid, name):
2395            try:
2396                self.synch_store.set_value(name, v)
2397                self.write_store()
2398                self.log.debug("[SetValue] set %s to %s" % (name, v))
2399            except synch_store.CollisionError:
2400                # Translate into a service_error
2401                raise service_error(service_error.req,
2402                        "Value already set: %s" %name)
2403            except synch_store.RevokedKeyError:
2404                # No more synch on this key
2405                raise service_error(service_error.federant, 
2406                        "Synch key %s revoked" % name)
2407            return { 'name': name, 'value': v }
2408        else:
2409            raise service_error(service_error.access, "Access Denied")
Note: See TracBrowser for help on using the repository browser.