summaryrefslogtreecommitdiff
path: root/utils/lit/lit/run.py
blob: 8642ff189270428e0e5ffe5b055a741b31c80a6a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
import os
import threading
import time
import traceback
try:
    import Queue as queue
except ImportError:
    import queue

try:
    import win32api
except ImportError:
    win32api = None

try:
    import multiprocessing
except ImportError:
    multiprocessing = None

import lit.Test

###
# Test Execution Implementation

class LockedValue(object):
    def __init__(self, value):
        self.lock = threading.Lock()
        self._value = value

    def _get_value(self):
        self.lock.acquire()
        try:
            return self._value
        finally:
            self.lock.release()

    def _set_value(self, value):
        self.lock.acquire()
        try:
            self._value = value
        finally:
            self.lock.release()

    value = property(_get_value, _set_value)

class TestProvider(object):
    def __init__(self, tests, num_jobs, queue_impl, canceled_flag):
        self.canceled_flag = canceled_flag

        # Create a shared queue to provide the test indices.
        self.queue = queue_impl()
        for i in range(len(tests)):
            self.queue.put(i)
        for i in range(num_jobs):
            self.queue.put(None)

    def cancel(self):
        self.canceled_flag.value = 1

    def get(self):
        # Check if we are canceled.
        if self.canceled_flag.value:
          return None

        # Otherwise take the next test.
        return self.queue.get()

class Tester(object):
    def __init__(self, run_instance, provider, consumer):
        self.run_instance = run_instance
        self.provider = provider
        self.consumer = consumer

    def run(self):
        while True:
            item = self.provider.get()
            if item is None:
                break
            self.run_test(item)
        self.consumer.task_finished()

    def run_test(self, test_index):
        test = self.run_instance.tests[test_index]
        try:
            self.run_instance.execute_test(test)
        except KeyboardInterrupt:
            # This is a sad hack. Unfortunately subprocess goes
            # bonkers with ctrl-c and we start forking merrily.
            print('\nCtrl-C detected, goodbye.')
            os.kill(0,9)
        self.consumer.update(test_index, test)

class ThreadResultsConsumer(object):
    def __init__(self, display):
        self.display = display
        self.lock = threading.Lock()

    def update(self, test_index, test):
        self.lock.acquire()
        try:
            self.display.update(test)
        finally:
            self.lock.release()

    def task_finished(self):
        pass

    def handle_results(self):
        pass

class MultiprocessResultsConsumer(object):
    def __init__(self, run, display, num_jobs):
        self.run = run
        self.display = display
        self.num_jobs = num_jobs
        self.queue = multiprocessing.Queue()

    def update(self, test_index, test):
        # This method is called in the child processes, and communicates the
        # results to the actual display implementation via an output queue.
        self.queue.put((test_index, test.result))

    def task_finished(self):
        # This method is called in the child processes, and communicates that
        # individual tasks are complete.
        self.queue.put(None)

    def handle_results(self):
        # This method is called in the parent, and consumes the results from the
        # output queue and dispatches to the actual display. The method will
        # complete after each of num_jobs tasks has signalled completion.
        completed = 0
        while completed != self.num_jobs:
            # Wait for a result item.
            item = self.queue.get()
            if item is None:
                completed += 1
                continue

            # Update the test result in the parent process.
            index,result = item
            test = self.run.tests[index]
            test.result = result

            self.display.update(test)

def run_one_tester(run, provider, display):
    tester = Tester(run, provider, display)
    tester.run()

###

class Run(object):
    """
    This class represents a concrete, configured testing run.
    """

    def __init__(self, lit_config, tests):
        self.lit_config = lit_config
        self.tests = tests

    def execute_test(self, test):
        result = None
        start_time = time.time()
        try:
            result = test.config.test_format.execute(test, self.lit_config)

            # Support deprecated result from execute() which returned the result
            # code and additional output as a tuple.
            if isinstance(result, tuple):
                code, output = result
                result = lit.Test.Result(code, output)
            elif not isinstance(result, lit.Test.Result):
                raise ValueError("unexpected result from test execution")
        except KeyboardInterrupt:
            raise
        except:
            if self.lit_config.debug:
                raise
            output = 'Exception during script execution:\n'
            output += traceback.format_exc()
            output += '\n'
            result = lit.Test.Result(lit.Test.UNRESOLVED, output)
        result.elapsed = time.time() - start_time

        test.setResult(result)

    def execute_tests(self, display, jobs, max_time=None,
                      use_processes=False):
        """
        execute_tests(display, jobs, [max_time])

        Execute each of the tests in the run, using up to jobs number of
        parallel tasks, and inform the display of each individual result. The
        provided tests should be a subset of the tests available in this run
        object.

        If max_time is non-None, it should be a time in seconds after which to
        stop executing tests.

        The display object will have its update method called with each test as
        it is completed. The calls are guaranteed to be locked with respect to
        one another, but are *not* guaranteed to be called on the same thread as
        this method was invoked on.

        Upon completion, each test in the run will have its result
        computed. Tests which were not actually executed (for any reason) will
        be given an UNRESOLVED result.
        """

        # Choose the appropriate parallel execution implementation.
        if jobs == 1 or not use_processes or multiprocessing is None:
            task_impl = threading.Thread
            queue_impl = queue.Queue
            canceled_flag = LockedValue(0)
            consumer = ThreadResultsConsumer(display)
        else:
            task_impl = multiprocessing.Process
            queue_impl = multiprocessing.Queue
            canceled_flag =  multiprocessing.Value('i', 0)
            consumer = MultiprocessResultsConsumer(self, display, jobs)

        # Create the test provider.
        provider = TestProvider(self.tests, jobs, queue_impl, canceled_flag)

        # Install a console-control signal handler on Windows.
        if win32api is not None:
            def console_ctrl_handler(type):
                provider.cancel()
                return True
            win32api.SetConsoleCtrlHandler(console_ctrl_handler, True)

        # Install a timeout handler, if requested.
        if max_time is not None:
            def timeout_handler():
                provider.cancel()
            timeout_timer = threading.Timer(max_time, timeout_handler)
            timeout_timer.start()

        # If not using multiple tasks, just run the tests directly.
        if jobs == 1:
            run_one_tester(self, provider, consumer)
        else:
            # Otherwise, execute the tests in parallel
            self._execute_tests_in_parallel(task_impl, provider, consumer, jobs)

        # Cancel the timeout handler.
        if max_time is not None:
            timeout_timer.cancel()

        # Update results for any tests which weren't run.
        for test in self.tests:
            if test.result is None:
                test.setResult(lit.Test.Result(lit.Test.UNRESOLVED, '', 0.0))

    def _execute_tests_in_parallel(self, task_impl, provider, consumer, jobs):
        # Start all of the tasks.
        tasks = [task_impl(target=run_one_tester,
                           args=(self, provider, consumer))
                 for i in range(jobs)]
        for t in tasks:
            t.start()

        # Allow the consumer to handle results, if necessary.
        consumer.handle_results()

        # Wait for all the tasks to complete.
        for t in tasks:
            t.join()