1 | //===- llvm/Support/Parallel.cpp - Parallel algorithms --------------------===// |
2 | // |
3 | // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. |
4 | // See https://llvm.org/LICENSE.txt for license information. |
5 | // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception |
6 | // |
7 | //===----------------------------------------------------------------------===// |
8 | |
9 | #include "llvm/Support/Parallel.h" |
10 | #include "llvm/Config/llvm-config.h" |
11 | #include "llvm/Support/ManagedStatic.h" |
12 | #include "llvm/Support/Threading.h" |
13 | |
14 | #include <atomic> |
15 | #include <deque> |
16 | #include <future> |
17 | #include <thread> |
18 | #include <vector> |
19 | |
20 | llvm::ThreadPoolStrategy llvm::parallel::strategy; |
21 | |
22 | namespace llvm { |
23 | namespace parallel { |
24 | #if LLVM_ENABLE_THREADS |
25 | |
26 | #ifdef _WIN32 |
27 | static thread_local unsigned threadIndex = UINT_MAX; |
28 | |
29 | unsigned getThreadIndex() { GET_THREAD_INDEX_IMPL; } |
30 | #else |
31 | thread_local unsigned threadIndex = UINT_MAX; |
32 | #endif |
33 | |
34 | namespace detail { |
35 | |
36 | namespace { |
37 | |
38 | /// An abstract class that takes closures and runs them asynchronously. |
39 | class Executor { |
40 | public: |
41 | virtual ~Executor() = default; |
42 | virtual void add(std::function<void()> func, bool Sequential = false) = 0; |
43 | virtual size_t getThreadCount() const = 0; |
44 | |
45 | static Executor *getDefaultExecutor(); |
46 | }; |
47 | |
48 | /// An implementation of an Executor that runs closures on a thread pool |
49 | /// in filo order. |
50 | class ThreadPoolExecutor : public Executor { |
51 | public: |
52 | explicit ThreadPoolExecutor(ThreadPoolStrategy S = hardware_concurrency()) { |
53 | ThreadCount = S.compute_thread_count(); |
54 | // Spawn all but one of the threads in another thread as spawning threads |
55 | // can take a while. |
56 | Threads.reserve(n: ThreadCount); |
57 | Threads.resize(new_size: 1); |
58 | std::lock_guard<std::mutex> Lock(Mutex); |
59 | // Use operator[] before creating the thread to avoid data race in .size() |
60 | // in 'safe libc++' mode. |
61 | auto &Thread0 = Threads[0]; |
62 | Thread0 = std::thread([this, S] { |
63 | for (unsigned I = 1; I < ThreadCount; ++I) { |
64 | Threads.emplace_back(args: [=] { work(S, ThreadID: I); }); |
65 | if (Stop) |
66 | break; |
67 | } |
68 | ThreadsCreated.set_value(); |
69 | work(S, ThreadID: 0); |
70 | }); |
71 | } |
72 | |
73 | void stop() { |
74 | { |
75 | std::lock_guard<std::mutex> Lock(Mutex); |
76 | if (Stop) |
77 | return; |
78 | Stop = true; |
79 | } |
80 | Cond.notify_all(); |
81 | ThreadsCreated.get_future().wait(); |
82 | } |
83 | |
84 | ~ThreadPoolExecutor() override { |
85 | stop(); |
86 | std::thread::id CurrentThreadId = std::this_thread::get_id(); |
87 | for (std::thread &T : Threads) |
88 | if (T.get_id() == CurrentThreadId) |
89 | T.detach(); |
90 | else |
91 | T.join(); |
92 | } |
93 | |
94 | struct Creator { |
95 | static void *call() { return new ThreadPoolExecutor(strategy); } |
96 | }; |
97 | struct Deleter { |
98 | static void call(void *Ptr) { ((ThreadPoolExecutor *)Ptr)->stop(); } |
99 | }; |
100 | |
101 | void add(std::function<void()> F, bool Sequential = false) override { |
102 | { |
103 | std::lock_guard<std::mutex> Lock(Mutex); |
104 | if (Sequential) |
105 | WorkQueueSequential.emplace_front(args: std::move(F)); |
106 | else |
107 | WorkQueue.emplace_back(args: std::move(F)); |
108 | } |
109 | Cond.notify_one(); |
110 | } |
111 | |
112 | size_t getThreadCount() const override { return ThreadCount; } |
113 | |
114 | private: |
115 | bool hasSequentialTasks() const { |
116 | return !WorkQueueSequential.empty() && !SequentialQueueIsLocked; |
117 | } |
118 | |
119 | bool hasGeneralTasks() const { return !WorkQueue.empty(); } |
120 | |
121 | void work(ThreadPoolStrategy S, unsigned ThreadID) { |
122 | threadIndex = ThreadID; |
123 | S.apply_thread_strategy(ThreadPoolNum: ThreadID); |
124 | while (true) { |
125 | std::unique_lock<std::mutex> Lock(Mutex); |
126 | Cond.wait(lock&: Lock, p: [&] { |
127 | return Stop || hasGeneralTasks() || hasSequentialTasks(); |
128 | }); |
129 | if (Stop) |
130 | break; |
131 | bool Sequential = hasSequentialTasks(); |
132 | if (Sequential) |
133 | SequentialQueueIsLocked = true; |
134 | else |
135 | assert(hasGeneralTasks()); |
136 | |
137 | auto &Queue = Sequential ? WorkQueueSequential : WorkQueue; |
138 | auto Task = std::move(Queue.back()); |
139 | Queue.pop_back(); |
140 | Lock.unlock(); |
141 | Task(); |
142 | if (Sequential) |
143 | SequentialQueueIsLocked = false; |
144 | } |
145 | } |
146 | |
147 | std::atomic<bool> Stop{false}; |
148 | std::atomic<bool> SequentialQueueIsLocked{false}; |
149 | std::deque<std::function<void()>> WorkQueue; |
150 | std::deque<std::function<void()>> WorkQueueSequential; |
151 | std::mutex Mutex; |
152 | std::condition_variable Cond; |
153 | std::promise<void> ThreadsCreated; |
154 | std::vector<std::thread> Threads; |
155 | unsigned ThreadCount; |
156 | }; |
157 | |
158 | Executor *Executor::getDefaultExecutor() { |
159 | // The ManagedStatic enables the ThreadPoolExecutor to be stopped via |
160 | // llvm_shutdown() which allows a "clean" fast exit, e.g. via _exit(). This |
161 | // stops the thread pool and waits for any worker thread creation to complete |
162 | // but does not wait for the threads to finish. The wait for worker thread |
163 | // creation to complete is important as it prevents intermittent crashes on |
164 | // Windows due to a race condition between thread creation and process exit. |
165 | // |
166 | // The ThreadPoolExecutor will only be destroyed when the static unique_ptr to |
167 | // it is destroyed, i.e. in a normal full exit. The ThreadPoolExecutor |
168 | // destructor ensures it has been stopped and waits for worker threads to |
169 | // finish. The wait is important as it prevents intermittent crashes on |
170 | // Windows when the process is doing a full exit. |
171 | // |
172 | // The Windows crashes appear to only occur with the MSVC static runtimes and |
173 | // are more frequent with the debug static runtime. |
174 | // |
175 | // This also prevents intermittent deadlocks on exit with the MinGW runtime. |
176 | |
177 | static ManagedStatic<ThreadPoolExecutor, ThreadPoolExecutor::Creator, |
178 | ThreadPoolExecutor::Deleter> |
179 | ManagedExec; |
180 | static std::unique_ptr<ThreadPoolExecutor> Exec(&(*ManagedExec)); |
181 | return Exec.get(); |
182 | } |
183 | } // namespace |
184 | } // namespace detail |
185 | |
186 | size_t getThreadCount() { |
187 | return detail::Executor::getDefaultExecutor()->getThreadCount(); |
188 | } |
189 | #endif |
190 | |
191 | // Latch::sync() called by the dtor may cause one thread to block. If is a dead |
192 | // lock if all threads in the default executor are blocked. To prevent the dead |
193 | // lock, only allow the root TaskGroup to run tasks parallelly. In the scenario |
194 | // of nested parallel_for_each(), only the outermost one runs parallelly. |
195 | TaskGroup::TaskGroup() |
196 | #if LLVM_ENABLE_THREADS |
197 | : Parallel((parallel::strategy.ThreadsRequested != 1) && |
198 | (threadIndex == UINT_MAX)) {} |
199 | #else |
200 | : Parallel(false) {} |
201 | #endif |
202 | TaskGroup::~TaskGroup() { |
203 | // We must ensure that all the workloads have finished before decrementing the |
204 | // instances count. |
205 | L.sync(); |
206 | } |
207 | |
208 | void TaskGroup::spawn(std::function<void()> F, bool Sequential) { |
209 | #if LLVM_ENABLE_THREADS |
210 | if (Parallel) { |
211 | L.inc(); |
212 | detail::Executor::getDefaultExecutor()->add( |
213 | func: [&, F = std::move(F)] { |
214 | F(); |
215 | L.dec(); |
216 | }, |
217 | Sequential); |
218 | return; |
219 | } |
220 | #endif |
221 | F(); |
222 | } |
223 | |
224 | } // namespace parallel |
225 | } // namespace llvm |
226 | |
227 | void llvm::parallelFor(size_t Begin, size_t End, |
228 | llvm::function_ref<void(size_t)> Fn) { |
229 | #if LLVM_ENABLE_THREADS |
230 | if (parallel::strategy.ThreadsRequested != 1) { |
231 | auto NumItems = End - Begin; |
232 | // Limit the number of tasks to MaxTasksPerGroup to limit job scheduling |
233 | // overhead on large inputs. |
234 | auto TaskSize = NumItems / parallel::detail::MaxTasksPerGroup; |
235 | if (TaskSize == 0) |
236 | TaskSize = 1; |
237 | |
238 | parallel::TaskGroup TG; |
239 | for (; Begin + TaskSize < End; Begin += TaskSize) { |
240 | TG.spawn(F: [=, &Fn] { |
241 | for (size_t I = Begin, E = Begin + TaskSize; I != E; ++I) |
242 | Fn(I); |
243 | }); |
244 | } |
245 | if (Begin != End) { |
246 | TG.spawn(F: [=, &Fn] { |
247 | for (size_t I = Begin; I != End; ++I) |
248 | Fn(I); |
249 | }); |
250 | } |
251 | return; |
252 | } |
253 | #endif |
254 | |
255 | for (; Begin != End; ++Begin) |
256 | Fn(Begin); |
257 | } |
258 | |