diff options
Diffstat (limited to 'utils/lit/lit/run.py')
-rw-r--r-- | utils/lit/lit/run.py | 139 |
1 files changed, 105 insertions, 34 deletions
diff --git a/utils/lit/lit/run.py b/utils/lit/lit/run.py index 617c3b988f..8642ff1892 100644 --- a/utils/lit/lit/run.py +++ b/utils/lit/lit/run.py @@ -2,42 +2,68 @@ import os import threading import time import traceback +try: + import Queue as queue +except ImportError: + import queue try: import win32api except ImportError: win32api = None +try: + import multiprocessing +except ImportError: + multiprocessing = None + import lit.Test ### # Test Execution Implementation -class TestProvider(object): - def __init__(self, tests): - self.iter = iter(range(len(tests))) +class LockedValue(object): + def __init__(self, value): self.lock = threading.Lock() - self.canceled = False + self._value = value - def cancel(self): + def _get_value(self): self.lock.acquire() - self.canceled = True - self.lock.release() + try: + return self._value + finally: + self.lock.release() - def get(self): - # Check if we are cancelled. + def _set_value(self, value): self.lock.acquire() - if self.canceled: - self.lock.release() + try: + self._value = value + finally: + self.lock.release() + + value = property(_get_value, _set_value) + +class TestProvider(object): + def __init__(self, tests, num_jobs, queue_impl, canceled_flag): + self.canceled_flag = canceled_flag + + # Create a shared queue to provide the test indices. + self.queue = queue_impl() + for i in range(len(tests)): + self.queue.put(i) + for i in range(num_jobs): + self.queue.put(None) + + def cancel(self): + self.canceled_flag.value = 1 + + def get(self): + # Check if we are canceled. + if self.canceled_flag.value: return None # Otherwise take the next test. - for item in self.iter: - break - else: - item = None - self.lock.release() - return item + return self.queue.get() class Tester(object): def __init__(self, run_instance, provider, consumer): @@ -46,7 +72,7 @@ class Tester(object): self.consumer = consumer def run(self): - while 1: + while True: item = self.provider.get() if item is None: break @@ -82,6 +108,42 @@ class ThreadResultsConsumer(object): def handle_results(self): pass +class MultiprocessResultsConsumer(object): + def __init__(self, run, display, num_jobs): + self.run = run + self.display = display + self.num_jobs = num_jobs + self.queue = multiprocessing.Queue() + + def update(self, test_index, test): + # This method is called in the child processes, and communicates the + # results to the actual display implementation via an output queue. + self.queue.put((test_index, test.result)) + + def task_finished(self): + # This method is called in the child processes, and communicates that + # individual tasks are complete. + self.queue.put(None) + + def handle_results(self): + # This method is called in the parent, and consumes the results from the + # output queue and dispatches to the actual display. The method will + # complete after each of num_jobs tasks has signalled completion. + completed = 0 + while completed != self.num_jobs: + # Wait for a result item. + item = self.queue.get() + if item is None: + completed += 1 + continue + + # Update the test result in the parent process. + index,result = item + test = self.run.tests[index] + test.result = result + + self.display.update(test) + def run_one_tester(run, provider, display): tester = Tester(run, provider, display) tester.run() @@ -123,7 +185,8 @@ class Run(object): test.setResult(result) - def execute_tests(self, display, jobs, max_time=None): + def execute_tests(self, display, jobs, max_time=None, + use_processes=False): """ execute_tests(display, jobs, [max_time]) @@ -145,8 +208,20 @@ class Run(object): be given an UNRESOLVED result. """ - # Create the test provider object. - provider = TestProvider(self.tests) + # Choose the appropriate parallel execution implementation. + if jobs == 1 or not use_processes or multiprocessing is None: + task_impl = threading.Thread + queue_impl = queue.Queue + canceled_flag = LockedValue(0) + consumer = ThreadResultsConsumer(display) + else: + task_impl = multiprocessing.Process + queue_impl = multiprocessing.Queue + canceled_flag = multiprocessing.Value('i', 0) + consumer = MultiprocessResultsConsumer(self, display, jobs) + + # Create the test provider. + provider = TestProvider(self.tests, jobs, queue_impl, canceled_flag) # Install a console-control signal handler on Windows. if win32api is not None: @@ -162,8 +237,12 @@ class Run(object): timeout_timer = threading.Timer(max_time, timeout_handler) timeout_timer.start() - # Actually execute the tests. - self._execute_tests_with_provider(provider, display, jobs) + # If not using multiple tasks, just run the tests directly. + if jobs == 1: + run_one_tester(self, provider, consumer) + else: + # Otherwise, execute the tests in parallel + self._execute_tests_in_parallel(task_impl, provider, consumer, jobs) # Cancel the timeout handler. if max_time is not None: @@ -174,18 +253,10 @@ class Run(object): if test.result is None: test.setResult(lit.Test.Result(lit.Test.UNRESOLVED, '', 0.0)) - def _execute_tests_with_provider(self, provider, display, jobs): - consumer = ThreadResultsConsumer(display) - - # If only using one testing thread, don't use tasks at all; this lets us - # profile, among other things. - if jobs == 1: - run_one_tester(self, provider, consumer) - return - + def _execute_tests_in_parallel(self, task_impl, provider, consumer, jobs): # Start all of the tasks. - tasks = [threading.Thread(target=run_one_tester, - args=(self, provider, consumer)) + tasks = [task_impl(target=run_one_tester, + args=(self, provider, consumer)) for i in range(jobs)] for t in tasks: t.start() |