1//===- llvm/Support/Parallel.cpp - Parallel algorithms --------------------===//
2//
3// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4// See https://llvm.org/LICENSE.txt for license information.
5// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
6//
7//===----------------------------------------------------------------------===//
8
9#include "llvm/Support/Parallel.h"
10#include "llvm/Config/llvm-config.h"
11#include "llvm/Support/ManagedStatic.h"
12#include "llvm/Support/Threading.h"
13
14#include <atomic>
15#include <deque>
16#include <future>
17#include <thread>
18#include <vector>
19
20llvm::ThreadPoolStrategy llvm::parallel::strategy;
21
22namespace llvm {
23namespace parallel {
24#if LLVM_ENABLE_THREADS
25
26#ifdef _WIN32
27static thread_local unsigned threadIndex = UINT_MAX;
28
29unsigned getThreadIndex() { GET_THREAD_INDEX_IMPL; }
30#else
31thread_local unsigned threadIndex = UINT_MAX;
32#endif
33
34namespace detail {
35
36namespace {
37
38/// An abstract class that takes closures and runs them asynchronously.
39class Executor {
40public:
41 virtual ~Executor() = default;
42 virtual void add(std::function<void()> func, bool Sequential = false) = 0;
43 virtual size_t getThreadCount() const = 0;
44
45 static Executor *getDefaultExecutor();
46};
47
48/// An implementation of an Executor that runs closures on a thread pool
49/// in filo order.
50class ThreadPoolExecutor : public Executor {
51public:
52 explicit ThreadPoolExecutor(ThreadPoolStrategy S = hardware_concurrency()) {
53 ThreadCount = S.compute_thread_count();
54 // Spawn all but one of the threads in another thread as spawning threads
55 // can take a while.
56 Threads.reserve(n: ThreadCount);
57 Threads.resize(new_size: 1);
58 std::lock_guard<std::mutex> Lock(Mutex);
59 // Use operator[] before creating the thread to avoid data race in .size()
60 // in 'safe libc++' mode.
61 auto &Thread0 = Threads[0];
62 Thread0 = std::thread([this, S] {
63 for (unsigned I = 1; I < ThreadCount; ++I) {
64 Threads.emplace_back(args: [=] { work(S, ThreadID: I); });
65 if (Stop)
66 break;
67 }
68 ThreadsCreated.set_value();
69 work(S, ThreadID: 0);
70 });
71 }
72
73 void stop() {
74 {
75 std::lock_guard<std::mutex> Lock(Mutex);
76 if (Stop)
77 return;
78 Stop = true;
79 }
80 Cond.notify_all();
81 ThreadsCreated.get_future().wait();
82 }
83
84 ~ThreadPoolExecutor() override {
85 stop();
86 std::thread::id CurrentThreadId = std::this_thread::get_id();
87 for (std::thread &T : Threads)
88 if (T.get_id() == CurrentThreadId)
89 T.detach();
90 else
91 T.join();
92 }
93
94 struct Creator {
95 static void *call() { return new ThreadPoolExecutor(strategy); }
96 };
97 struct Deleter {
98 static void call(void *Ptr) { ((ThreadPoolExecutor *)Ptr)->stop(); }
99 };
100
101 void add(std::function<void()> F, bool Sequential = false) override {
102 {
103 std::lock_guard<std::mutex> Lock(Mutex);
104 if (Sequential)
105 WorkQueueSequential.emplace_front(args: std::move(F));
106 else
107 WorkQueue.emplace_back(args: std::move(F));
108 }
109 Cond.notify_one();
110 }
111
112 size_t getThreadCount() const override { return ThreadCount; }
113
114private:
115 bool hasSequentialTasks() const {
116 return !WorkQueueSequential.empty() && !SequentialQueueIsLocked;
117 }
118
119 bool hasGeneralTasks() const { return !WorkQueue.empty(); }
120
121 void work(ThreadPoolStrategy S, unsigned ThreadID) {
122 threadIndex = ThreadID;
123 S.apply_thread_strategy(ThreadPoolNum: ThreadID);
124 while (true) {
125 std::unique_lock<std::mutex> Lock(Mutex);
126 Cond.wait(lock&: Lock, p: [&] {
127 return Stop || hasGeneralTasks() || hasSequentialTasks();
128 });
129 if (Stop)
130 break;
131 bool Sequential = hasSequentialTasks();
132 if (Sequential)
133 SequentialQueueIsLocked = true;
134 else
135 assert(hasGeneralTasks());
136
137 auto &Queue = Sequential ? WorkQueueSequential : WorkQueue;
138 auto Task = std::move(Queue.back());
139 Queue.pop_back();
140 Lock.unlock();
141 Task();
142 if (Sequential)
143 SequentialQueueIsLocked = false;
144 }
145 }
146
147 std::atomic<bool> Stop{false};
148 std::atomic<bool> SequentialQueueIsLocked{false};
149 std::deque<std::function<void()>> WorkQueue;
150 std::deque<std::function<void()>> WorkQueueSequential;
151 std::mutex Mutex;
152 std::condition_variable Cond;
153 std::promise<void> ThreadsCreated;
154 std::vector<std::thread> Threads;
155 unsigned ThreadCount;
156};
157
158Executor *Executor::getDefaultExecutor() {
159 // The ManagedStatic enables the ThreadPoolExecutor to be stopped via
160 // llvm_shutdown() which allows a "clean" fast exit, e.g. via _exit(). This
161 // stops the thread pool and waits for any worker thread creation to complete
162 // but does not wait for the threads to finish. The wait for worker thread
163 // creation to complete is important as it prevents intermittent crashes on
164 // Windows due to a race condition between thread creation and process exit.
165 //
166 // The ThreadPoolExecutor will only be destroyed when the static unique_ptr to
167 // it is destroyed, i.e. in a normal full exit. The ThreadPoolExecutor
168 // destructor ensures it has been stopped and waits for worker threads to
169 // finish. The wait is important as it prevents intermittent crashes on
170 // Windows when the process is doing a full exit.
171 //
172 // The Windows crashes appear to only occur with the MSVC static runtimes and
173 // are more frequent with the debug static runtime.
174 //
175 // This also prevents intermittent deadlocks on exit with the MinGW runtime.
176
177 static ManagedStatic<ThreadPoolExecutor, ThreadPoolExecutor::Creator,
178 ThreadPoolExecutor::Deleter>
179 ManagedExec;
180 static std::unique_ptr<ThreadPoolExecutor> Exec(&(*ManagedExec));
181 return Exec.get();
182}
183} // namespace
184} // namespace detail
185
186size_t getThreadCount() {
187 return detail::Executor::getDefaultExecutor()->getThreadCount();
188}
189#endif
190
191// Latch::sync() called by the dtor may cause one thread to block. If is a dead
192// lock if all threads in the default executor are blocked. To prevent the dead
193// lock, only allow the root TaskGroup to run tasks parallelly. In the scenario
194// of nested parallel_for_each(), only the outermost one runs parallelly.
195TaskGroup::TaskGroup()
196#if LLVM_ENABLE_THREADS
197 : Parallel((parallel::strategy.ThreadsRequested != 1) &&
198 (threadIndex == UINT_MAX)) {}
199#else
200 : Parallel(false) {}
201#endif
202TaskGroup::~TaskGroup() {
203 // We must ensure that all the workloads have finished before decrementing the
204 // instances count.
205 L.sync();
206}
207
208void TaskGroup::spawn(std::function<void()> F, bool Sequential) {
209#if LLVM_ENABLE_THREADS
210 if (Parallel) {
211 L.inc();
212 detail::Executor::getDefaultExecutor()->add(
213 func: [&, F = std::move(F)] {
214 F();
215 L.dec();
216 },
217 Sequential);
218 return;
219 }
220#endif
221 F();
222}
223
224} // namespace parallel
225} // namespace llvm
226
227void llvm::parallelFor(size_t Begin, size_t End,
228 llvm::function_ref<void(size_t)> Fn) {
229#if LLVM_ENABLE_THREADS
230 if (parallel::strategy.ThreadsRequested != 1) {
231 auto NumItems = End - Begin;
232 // Limit the number of tasks to MaxTasksPerGroup to limit job scheduling
233 // overhead on large inputs.
234 auto TaskSize = NumItems / parallel::detail::MaxTasksPerGroup;
235 if (TaskSize == 0)
236 TaskSize = 1;
237
238 parallel::TaskGroup TG;
239 for (; Begin + TaskSize < End; Begin += TaskSize) {
240 TG.spawn(F: [=, &Fn] {
241 for (size_t I = Begin, E = Begin + TaskSize; I != E; ++I)
242 Fn(I);
243 });
244 }
245 if (Begin != End) {
246 TG.spawn(F: [=, &Fn] {
247 for (size_t I = Begin; I != End; ++I)
248 Fn(I);
249 });
250 }
251 return;
252 }
253#endif
254
255 for (; Begin != End; ++Begin)
256 Fn(Begin);
257}
258