summaryrefslogtreecommitdiff
path: root/utils
diff options
context:
space:
mode:
authorDaniel Dunbar <daniel@zuster.org>2013-08-29 00:54:23 +0000
committerDaniel Dunbar <daniel@zuster.org>2013-08-29 00:54:23 +0000
commit4ac723b53f2eb69e604891853ca87d1e2b3ee788 (patch)
tree5f8cf19a5417efd106aecbfb6a6e903f83363db2 /utils
parentdf44de6d918255eb51f3d042681e006f33948f80 (diff)
downloadllvm-4ac723b53f2eb69e604891853ca87d1e2b3ee788.tar.gz
llvm-4ac723b53f2eb69e604891853ca87d1e2b3ee788.tar.bz2
llvm-4ac723b53f2eb69e604891853ca87d1e2b3ee788.tar.xz
[lit] Add support for multiprocessing, under --use-processes for now.
git-svn-id: https://llvm.org/svn/llvm-project/llvm/trunk@189556 91177308-0d34-0410-b5e6-96231b3b80d8
Diffstat (limited to 'utils')
-rw-r--r--utils/lit/TODO6
-rwxr-xr-xutils/lit/lit/main.py9
-rw-r--r--utils/lit/lit/run.py139
3 files changed, 115 insertions, 39 deletions
diff --git a/utils/lit/TODO b/utils/lit/TODO
index e6aeb3d933..c1a60c6f4f 100644
--- a/utils/lit/TODO
+++ b/utils/lit/TODO
@@ -113,8 +113,8 @@ Infrastructure
module. This is currently blocked on:
* The external execution mode is faster in some situations, because it avoids
- being bottlenecked on the GIL. We could fix this by moving to a good
- multiprocessing model.
+ being bottlenecked on the GIL. This can hopefully be obviated simply by
+ using --use-processes.
* Some tests in LLVM/Clang are explicitly disabled with the internal shell
(because they use features specific to bash). We would need to rewrite these
@@ -158,8 +158,6 @@ Miscellaneous
* Add --show-unsupported, don't show by default?
-* Optionally use multiprocessing.
-
* Support valgrind in all configs, and LLVM style valgrind.
* Support a timeout / ulimit.
diff --git a/utils/lit/lit/main.py b/utils/lit/lit/main.py
index 5c40f1ca53..50c9a66c8d 100755
--- a/utils/lit/lit/main.py
+++ b/utils/lit/lit/main.py
@@ -142,6 +142,12 @@ def main(builtinParameters = {}):
group.add_option("", "--show-tests", dest="showTests",
help="Show all discovered tests",
action="store_true", default=False)
+ group.add_option("", "--use-processes", dest="useProcesses",
+ help="Run tests in parallel with processes (not threads)",
+ action="store_true", default=False)
+ group.add_option("", "--use-threads", dest="useProcesses",
+ help="Run tests in parallel with threads (not processes)",
+ action="store_false", default=False)
parser.add_option_group(group)
(opts, args) = parser.parse_args()
@@ -264,7 +270,8 @@ def main(builtinParameters = {}):
startTime = time.time()
display = TestingProgressDisplay(opts, len(run.tests), progressBar)
try:
- run.execute_tests(display, opts.numThreads, opts.maxTime)
+ run.execute_tests(display, opts.numThreads, opts.maxTime,
+ opts.useProcesses)
except KeyboardInterrupt:
sys.exit(2)
display.finish()
diff --git a/utils/lit/lit/run.py b/utils/lit/lit/run.py
index 617c3b988f..8642ff1892 100644
--- a/utils/lit/lit/run.py
+++ b/utils/lit/lit/run.py
@@ -2,42 +2,68 @@ import os
import threading
import time
import traceback
+try:
+ import Queue as queue
+except ImportError:
+ import queue
try:
import win32api
except ImportError:
win32api = None
+try:
+ import multiprocessing
+except ImportError:
+ multiprocessing = None
+
import lit.Test
###
# Test Execution Implementation
-class TestProvider(object):
- def __init__(self, tests):
- self.iter = iter(range(len(tests)))
+class LockedValue(object):
+ def __init__(self, value):
self.lock = threading.Lock()
- self.canceled = False
+ self._value = value
- def cancel(self):
+ def _get_value(self):
self.lock.acquire()
- self.canceled = True
- self.lock.release()
+ try:
+ return self._value
+ finally:
+ self.lock.release()
- def get(self):
- # Check if we are cancelled.
+ def _set_value(self, value):
self.lock.acquire()
- if self.canceled:
- self.lock.release()
+ try:
+ self._value = value
+ finally:
+ self.lock.release()
+
+ value = property(_get_value, _set_value)
+
+class TestProvider(object):
+ def __init__(self, tests, num_jobs, queue_impl, canceled_flag):
+ self.canceled_flag = canceled_flag
+
+ # Create a shared queue to provide the test indices.
+ self.queue = queue_impl()
+ for i in range(len(tests)):
+ self.queue.put(i)
+ for i in range(num_jobs):
+ self.queue.put(None)
+
+ def cancel(self):
+ self.canceled_flag.value = 1
+
+ def get(self):
+ # Check if we are canceled.
+ if self.canceled_flag.value:
return None
# Otherwise take the next test.
- for item in self.iter:
- break
- else:
- item = None
- self.lock.release()
- return item
+ return self.queue.get()
class Tester(object):
def __init__(self, run_instance, provider, consumer):
@@ -46,7 +72,7 @@ class Tester(object):
self.consumer = consumer
def run(self):
- while 1:
+ while True:
item = self.provider.get()
if item is None:
break
@@ -82,6 +108,42 @@ class ThreadResultsConsumer(object):
def handle_results(self):
pass
+class MultiprocessResultsConsumer(object):
+ def __init__(self, run, display, num_jobs):
+ self.run = run
+ self.display = display
+ self.num_jobs = num_jobs
+ self.queue = multiprocessing.Queue()
+
+ def update(self, test_index, test):
+ # This method is called in the child processes, and communicates the
+ # results to the actual display implementation via an output queue.
+ self.queue.put((test_index, test.result))
+
+ def task_finished(self):
+ # This method is called in the child processes, and communicates that
+ # individual tasks are complete.
+ self.queue.put(None)
+
+ def handle_results(self):
+ # This method is called in the parent, and consumes the results from the
+ # output queue and dispatches to the actual display. The method will
+ # complete after each of num_jobs tasks has signalled completion.
+ completed = 0
+ while completed != self.num_jobs:
+ # Wait for a result item.
+ item = self.queue.get()
+ if item is None:
+ completed += 1
+ continue
+
+ # Update the test result in the parent process.
+ index,result = item
+ test = self.run.tests[index]
+ test.result = result
+
+ self.display.update(test)
+
def run_one_tester(run, provider, display):
tester = Tester(run, provider, display)
tester.run()
@@ -123,7 +185,8 @@ class Run(object):
test.setResult(result)
- def execute_tests(self, display, jobs, max_time=None):
+ def execute_tests(self, display, jobs, max_time=None,
+ use_processes=False):
"""
execute_tests(display, jobs, [max_time])
@@ -145,8 +208,20 @@ class Run(object):
be given an UNRESOLVED result.
"""
- # Create the test provider object.
- provider = TestProvider(self.tests)
+ # Choose the appropriate parallel execution implementation.
+ if jobs == 1 or not use_processes or multiprocessing is None:
+ task_impl = threading.Thread
+ queue_impl = queue.Queue
+ canceled_flag = LockedValue(0)
+ consumer = ThreadResultsConsumer(display)
+ else:
+ task_impl = multiprocessing.Process
+ queue_impl = multiprocessing.Queue
+ canceled_flag = multiprocessing.Value('i', 0)
+ consumer = MultiprocessResultsConsumer(self, display, jobs)
+
+ # Create the test provider.
+ provider = TestProvider(self.tests, jobs, queue_impl, canceled_flag)
# Install a console-control signal handler on Windows.
if win32api is not None:
@@ -162,8 +237,12 @@ class Run(object):
timeout_timer = threading.Timer(max_time, timeout_handler)
timeout_timer.start()
- # Actually execute the tests.
- self._execute_tests_with_provider(provider, display, jobs)
+ # If not using multiple tasks, just run the tests directly.
+ if jobs == 1:
+ run_one_tester(self, provider, consumer)
+ else:
+ # Otherwise, execute the tests in parallel
+ self._execute_tests_in_parallel(task_impl, provider, consumer, jobs)
# Cancel the timeout handler.
if max_time is not None:
@@ -174,18 +253,10 @@ class Run(object):
if test.result is None:
test.setResult(lit.Test.Result(lit.Test.UNRESOLVED, '', 0.0))
- def _execute_tests_with_provider(self, provider, display, jobs):
- consumer = ThreadResultsConsumer(display)
-
- # If only using one testing thread, don't use tasks at all; this lets us
- # profile, among other things.
- if jobs == 1:
- run_one_tester(self, provider, consumer)
- return
-
+ def _execute_tests_in_parallel(self, task_impl, provider, consumer, jobs):
# Start all of the tasks.
- tasks = [threading.Thread(target=run_one_tester,
- args=(self, provider, consumer))
+ tasks = [task_impl(target=run_one_tester,
+ args=(self, provider, consumer))
for i in range(jobs)]
for t in tasks:
t.start()