1 | //===- llvm/Support/Parallel.cpp - Parallel algorithms --------------------===// |
2 | // |
3 | // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. |
4 | // See https://llvm.org/LICENSE.txt for license information. |
5 | // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception |
6 | // |
7 | //===----------------------------------------------------------------------===// |
8 | |
9 | #include "llvm/Support/Parallel.h" |
10 | #include "llvm/Config/llvm-config.h" |
11 | #include "llvm/Support/ManagedStatic.h" |
12 | #include "llvm/Support/Threading.h" |
13 | |
14 | #include <atomic> |
15 | #include <future> |
16 | #include <thread> |
17 | #include <vector> |
18 | |
19 | llvm::ThreadPoolStrategy llvm::parallel::strategy; |
20 | |
21 | namespace llvm { |
22 | namespace parallel { |
23 | #if LLVM_ENABLE_THREADS |
24 | |
25 | #ifdef _WIN32 |
26 | static thread_local unsigned threadIndex = UINT_MAX; |
27 | |
28 | unsigned getThreadIndex() { GET_THREAD_INDEX_IMPL; } |
29 | #else |
30 | thread_local unsigned threadIndex = UINT_MAX; |
31 | #endif |
32 | |
33 | namespace detail { |
34 | |
35 | namespace { |
36 | |
37 | /// An abstract class that takes closures and runs them asynchronously. |
38 | class Executor { |
39 | public: |
40 | virtual ~Executor() = default; |
41 | virtual void add(std::function<void()> func) = 0; |
42 | virtual size_t getThreadCount() const = 0; |
43 | |
44 | static Executor *getDefaultExecutor(); |
45 | }; |
46 | |
47 | /// An implementation of an Executor that runs closures on a thread pool |
48 | /// in filo order. |
49 | class ThreadPoolExecutor : public Executor { |
50 | public: |
51 | explicit ThreadPoolExecutor(ThreadPoolStrategy S) { |
52 | ThreadCount = S.compute_thread_count(); |
53 | // Spawn all but one of the threads in another thread as spawning threads |
54 | // can take a while. |
55 | Threads.reserve(n: ThreadCount); |
56 | Threads.resize(new_size: 1); |
57 | std::lock_guard<std::mutex> Lock(Mutex); |
58 | // Use operator[] before creating the thread to avoid data race in .size() |
59 | // in 'safe libc++' mode. |
60 | auto &Thread0 = Threads[0]; |
61 | Thread0 = std::thread([this, S] { |
62 | for (unsigned I = 1; I < ThreadCount; ++I) { |
63 | Threads.emplace_back(args: [=] { work(S, ThreadID: I); }); |
64 | if (Stop) |
65 | break; |
66 | } |
67 | ThreadsCreated.set_value(); |
68 | work(S, ThreadID: 0); |
69 | }); |
70 | } |
71 | |
72 | void stop() { |
73 | { |
74 | std::lock_guard<std::mutex> Lock(Mutex); |
75 | if (Stop) |
76 | return; |
77 | Stop = true; |
78 | } |
79 | Cond.notify_all(); |
80 | ThreadsCreated.get_future().wait(); |
81 | } |
82 | |
83 | ~ThreadPoolExecutor() override { |
84 | stop(); |
85 | std::thread::id CurrentThreadId = std::this_thread::get_id(); |
86 | for (std::thread &T : Threads) |
87 | if (T.get_id() == CurrentThreadId) |
88 | T.detach(); |
89 | else |
90 | T.join(); |
91 | } |
92 | |
93 | struct Creator { |
94 | static void *call() { return new ThreadPoolExecutor(strategy); } |
95 | }; |
96 | struct Deleter { |
97 | static void call(void *Ptr) { ((ThreadPoolExecutor *)Ptr)->stop(); } |
98 | }; |
99 | |
100 | void add(std::function<void()> F) override { |
101 | { |
102 | std::lock_guard<std::mutex> Lock(Mutex); |
103 | WorkStack.push_back(x: std::move(F)); |
104 | } |
105 | Cond.notify_one(); |
106 | } |
107 | |
108 | size_t getThreadCount() const override { return ThreadCount; } |
109 | |
110 | private: |
111 | void work(ThreadPoolStrategy S, unsigned ThreadID) { |
112 | threadIndex = ThreadID; |
113 | S.apply_thread_strategy(ThreadPoolNum: ThreadID); |
114 | while (true) { |
115 | std::unique_lock<std::mutex> Lock(Mutex); |
116 | Cond.wait(lock&: Lock, p: [&] { return Stop || !WorkStack.empty(); }); |
117 | if (Stop) |
118 | break; |
119 | auto Task = std::move(WorkStack.back()); |
120 | WorkStack.pop_back(); |
121 | Lock.unlock(); |
122 | Task(); |
123 | } |
124 | } |
125 | |
126 | std::atomic<bool> Stop{false}; |
127 | std::vector<std::function<void()>> WorkStack; |
128 | std::mutex Mutex; |
129 | std::condition_variable Cond; |
130 | std::promise<void> ThreadsCreated; |
131 | std::vector<std::thread> Threads; |
132 | unsigned ThreadCount; |
133 | }; |
134 | |
135 | Executor *Executor::getDefaultExecutor() { |
136 | #ifdef _WIN32 |
137 | // The ManagedStatic enables the ThreadPoolExecutor to be stopped via |
138 | // llvm_shutdown() which allows a "clean" fast exit, e.g. via _exit(). This |
139 | // stops the thread pool and waits for any worker thread creation to complete |
140 | // but does not wait for the threads to finish. The wait for worker thread |
141 | // creation to complete is important as it prevents intermittent crashes on |
142 | // Windows due to a race condition between thread creation and process exit. |
143 | // |
144 | // The ThreadPoolExecutor will only be destroyed when the static unique_ptr to |
145 | // it is destroyed, i.e. in a normal full exit. The ThreadPoolExecutor |
146 | // destructor ensures it has been stopped and waits for worker threads to |
147 | // finish. The wait is important as it prevents intermittent crashes on |
148 | // Windows when the process is doing a full exit. |
149 | // |
150 | // The Windows crashes appear to only occur with the MSVC static runtimes and |
151 | // are more frequent with the debug static runtime. |
152 | // |
153 | // This also prevents intermittent deadlocks on exit with the MinGW runtime. |
154 | |
155 | static ManagedStatic<ThreadPoolExecutor, ThreadPoolExecutor::Creator, |
156 | ThreadPoolExecutor::Deleter> |
157 | ManagedExec; |
158 | static std::unique_ptr<ThreadPoolExecutor> Exec(&(*ManagedExec)); |
159 | return Exec.get(); |
160 | #else |
161 | // ManagedStatic is not desired on other platforms. When `Exec` is destroyed |
162 | // by llvm_shutdown(), worker threads will clean up and invoke TLS |
163 | // destructors. This can lead to race conditions if other threads attempt to |
164 | // access TLS objects that have already been destroyed. |
165 | static ThreadPoolExecutor Exec(strategy); |
166 | return &Exec; |
167 | #endif |
168 | } |
169 | } // namespace |
170 | } // namespace detail |
171 | |
172 | size_t getThreadCount() { |
173 | return detail::Executor::getDefaultExecutor()->getThreadCount(); |
174 | } |
175 | #endif |
176 | |
177 | // Latch::sync() called by the dtor may cause one thread to block. If is a dead |
178 | // lock if all threads in the default executor are blocked. To prevent the dead |
179 | // lock, only allow the root TaskGroup to run tasks parallelly. In the scenario |
180 | // of nested parallel_for_each(), only the outermost one runs parallelly. |
181 | TaskGroup::TaskGroup() |
182 | #if LLVM_ENABLE_THREADS |
183 | : Parallel((parallel::strategy.ThreadsRequested != 1) && |
184 | (threadIndex == UINT_MAX)) {} |
185 | #else |
186 | : Parallel(false) {} |
187 | #endif |
188 | TaskGroup::~TaskGroup() { |
189 | // We must ensure that all the workloads have finished before decrementing the |
190 | // instances count. |
191 | L.sync(); |
192 | } |
193 | |
194 | void TaskGroup::spawn(std::function<void()> F) { |
195 | #if LLVM_ENABLE_THREADS |
196 | if (Parallel) { |
197 | L.inc(); |
198 | detail::Executor::getDefaultExecutor()->add(func: [&, F = std::move(F)] { |
199 | F(); |
200 | L.dec(); |
201 | }); |
202 | return; |
203 | } |
204 | #endif |
205 | F(); |
206 | } |
207 | |
208 | } // namespace parallel |
209 | } // namespace llvm |
210 | |
211 | void llvm::parallelFor(size_t Begin, size_t End, |
212 | llvm::function_ref<void(size_t)> Fn) { |
213 | #if LLVM_ENABLE_THREADS |
214 | if (parallel::strategy.ThreadsRequested != 1) { |
215 | auto NumItems = End - Begin; |
216 | // Limit the number of tasks to MaxTasksPerGroup to limit job scheduling |
217 | // overhead on large inputs. |
218 | auto TaskSize = NumItems / parallel::detail::MaxTasksPerGroup; |
219 | if (TaskSize == 0) |
220 | TaskSize = 1; |
221 | |
222 | parallel::TaskGroup TG; |
223 | for (; Begin + TaskSize < End; Begin += TaskSize) { |
224 | TG.spawn(F: [=, &Fn] { |
225 | for (size_t I = Begin, E = Begin + TaskSize; I != E; ++I) |
226 | Fn(I); |
227 | }); |
228 | } |
229 | if (Begin != End) { |
230 | TG.spawn(F: [=, &Fn] { |
231 | for (size_t I = Begin; I != End; ++I) |
232 | Fn(I); |
233 | }); |
234 | } |
235 | return; |
236 | } |
237 | #endif |
238 | |
239 | for (; Begin != End; ++Begin) |
240 | Fn(Begin); |
241 | } |
242 | |