1//===- llvm/Support/Parallel.cpp - Parallel algorithms --------------------===//
2//
3// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4// See https://llvm.org/LICENSE.txt for license information.
5// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
6//
7//===----------------------------------------------------------------------===//
8
9#include "llvm/Support/Parallel.h"
10#include "llvm/ADT/ScopeExit.h"
11#include "llvm/Config/llvm-config.h"
12#include "llvm/Support/ExponentialBackoff.h"
13#include "llvm/Support/Jobserver.h"
14#include "llvm/Support/ManagedStatic.h"
15#include "llvm/Support/Threading.h"
16
17#include <atomic>
18#include <future>
19#include <memory>
20#include <mutex>
21#include <thread>
22#include <vector>
23
24using namespace llvm;
25using namespace llvm::parallel;
26
27llvm::ThreadPoolStrategy parallel::strategy;
28
29#if LLVM_ENABLE_THREADS
30
31#ifdef _WIN32
32static thread_local unsigned threadIndex = UINT_MAX;
33
34unsigned parallel::getThreadIndex() { GET_THREAD_INDEX_IMPL; }
35#else
36thread_local unsigned parallel::threadIndex = UINT_MAX;
37#endif
38
39namespace {
40
41/// Runs closures on a thread pool in filo order.
42class ThreadPoolExecutor {
43public:
44 explicit ThreadPoolExecutor(ThreadPoolStrategy S) {
45 if (S.UseJobserver)
46 TheJobserver = JobserverClient::getInstance();
47
48 ThreadCount = S.compute_thread_count();
49 // Spawn all but one of the threads in another thread as spawning threads
50 // can take a while.
51 Threads.reserve(n: ThreadCount);
52 Threads.resize(new_size: 1);
53 std::lock_guard<std::mutex> Lock(Mutex);
54 // Use operator[] before creating the thread to avoid data race in .size()
55 // in 'safe libc++' mode.
56 auto &Thread0 = Threads[0];
57 Thread0 = std::thread([this, S] {
58 for (unsigned I = 1; I < ThreadCount; ++I) {
59 Threads.emplace_back(args: [this, S, I] { work(S, ThreadID: I); });
60 if (Stop)
61 break;
62 }
63 ThreadsCreated.set_value();
64 work(S, ThreadID: 0);
65 });
66 }
67
68 // To make sure the thread pool executor can only be created with a parallel
69 // strategy.
70 ThreadPoolExecutor() = delete;
71
72 void stop() {
73 {
74 std::lock_guard<std::mutex> Lock(Mutex);
75 if (Stop)
76 return;
77 Stop = true;
78 }
79 Cond.notify_all();
80 ThreadsCreated.get_future().wait();
81
82 std::thread::id CurrentThreadId = std::this_thread::get_id();
83 for (std::thread &T : Threads)
84 if (T.get_id() == CurrentThreadId)
85 T.detach();
86 else
87 T.join();
88 }
89
90 ~ThreadPoolExecutor() { stop(); }
91
92 struct Creator {
93 static void *call() { return new ThreadPoolExecutor(strategy); }
94 };
95 struct Deleter {
96 static void call(void *Ptr) { ((ThreadPoolExecutor *)Ptr)->stop(); }
97 };
98
99 struct WorkItem {
100 std::function<void()> F;
101 std::reference_wrapper<parallel::detail::Latch> L;
102 void operator()() {
103 F();
104 L.get().dec();
105 }
106 };
107
108 void add(std::function<void()> F, parallel::detail::Latch &L) {
109 {
110 std::lock_guard<std::mutex> Lock(Mutex);
111 WorkStack.push_back(x: {.F: std::move(F), .L: std::ref(t&: L)});
112 }
113 Cond.notify_one();
114 }
115
116 // Execute tasks from the work queue until the latch reaches zero.
117 // Used by nested TaskGroups (on worker threads) to prevent deadlock:
118 // instead of blocking in sync(), actively help drain the queue.
119 void helpSync(const parallel::detail::Latch &L) {
120 while (L.getCount() != 0) {
121 std::unique_lock<std::mutex> Lock(Mutex);
122 if (Stop || WorkStack.empty())
123 return;
124 popAndRun(Lock);
125 }
126 }
127
128 size_t getThreadCount() const { return ThreadCount; }
129
130private:
131 // Pop one task from the queue and run it. Must be called with Lock held;
132 // releases Lock before executing the task.
133 void popAndRun(std::unique_lock<std::mutex> &Lock) {
134 auto Item = std::move(WorkStack.back());
135 WorkStack.pop_back();
136 Lock.unlock();
137 Item();
138 }
139
140 void work(ThreadPoolStrategy S, unsigned ThreadID) {
141 threadIndex = ThreadID;
142 S.apply_thread_strategy(ThreadPoolNum: ThreadID);
143 // Note on jobserver deadlock avoidance:
144 // GNU Make grants each invoked process one implicit job slot. Our
145 // JobserverClient models this by returning an implicit JobSlot on the
146 // first successful tryAcquire() in a process. This guarantees forward
147 // progress without requiring a dedicated "always-on" thread here.
148
149 while (true) {
150 if (TheJobserver) {
151 // Jobserver-mode scheduling:
152 // - Acquire one job slot (with exponential backoff to avoid busy-wait).
153 // - While holding the slot, drain and run tasks from the local queue.
154 // - Release the slot when the queue is empty or when shutting down.
155 // Rationale: Holding a slot amortizes acquire/release overhead over
156 // multiple tasks and avoids requeue/yield churn, while still enforcing
157 // the jobserver’s global concurrency limit. With K available slots,
158 // up to K workers run tasks in parallel; within each worker tasks run
159 // sequentially until the local queue is empty.
160 ExponentialBackoff Backoff(std::chrono::hours(24));
161 JobSlot Slot;
162 do {
163 if (Stop)
164 return;
165 Slot = TheJobserver->tryAcquire();
166 if (Slot.isValid())
167 break;
168 } while (Backoff.waitForNextAttempt());
169
170 llvm::scope_exit SlotReleaser(
171 [&] { TheJobserver->release(Slot: std::move(Slot)); });
172
173 while (true) {
174 std::unique_lock<std::mutex> Lock(Mutex);
175 Cond.wait(lock&: Lock, p: [&] { return Stop || !WorkStack.empty(); });
176 if (Stop && WorkStack.empty())
177 return;
178 if (WorkStack.empty())
179 break;
180 popAndRun(Lock);
181 }
182 } else {
183 std::unique_lock<std::mutex> Lock(Mutex);
184 Cond.wait(lock&: Lock, p: [&] { return Stop || !WorkStack.empty(); });
185 if (Stop)
186 break;
187 popAndRun(Lock);
188 }
189 }
190 }
191
192 std::atomic<bool> Stop{false};
193 std::vector<WorkItem> WorkStack;
194 std::mutex Mutex;
195 std::condition_variable Cond;
196 std::promise<void> ThreadsCreated;
197 std::vector<std::thread> Threads;
198 unsigned ThreadCount;
199
200 JobserverClient *TheJobserver = nullptr;
201};
202} // namespace
203
204static ThreadPoolExecutor *getDefaultExecutor() {
205#ifdef _WIN32
206 // The ManagedStatic enables the ThreadPoolExecutor to be stopped via
207 // llvm_shutdown() on Windows. This is important to avoid various race
208 // conditions at process exit that can cause crashes or deadlocks.
209
210 static ManagedStatic<ThreadPoolExecutor, ThreadPoolExecutor::Creator,
211 ThreadPoolExecutor::Deleter>
212 ManagedExec;
213 static std::unique_ptr<ThreadPoolExecutor> Exec(&(*ManagedExec));
214 return Exec.get();
215#else
216 // ManagedStatic is not desired on other platforms. When `Exec` is destroyed
217 // by llvm_shutdown(), worker threads will clean up and invoke TLS
218 // destructors. This can lead to race conditions if other threads attempt to
219 // access TLS objects that have already been destroyed.
220 static ThreadPoolExecutor Exec(strategy);
221 return &Exec;
222#endif
223}
224
225size_t parallel::getThreadCount() {
226 return getDefaultExecutor()->getThreadCount();
227}
228#endif
229
230static bool isNested() {
231#if LLVM_ENABLE_THREADS
232 return threadIndex != UINT_MAX;
233#else
234 return false;
235#endif
236}
237
238TaskGroup::TaskGroup()
239 : Parallel(
240#if LLVM_ENABLE_THREADS
241 strategy.ThreadsRequested != 1
242#else
243 false
244#endif
245 ) {
246}
247
248TaskGroup::~TaskGroup() {
249#if LLVM_ENABLE_THREADS
250 // In a nested TaskGroup (threadIndex != -1u), actively help drain the queue.
251 if (Parallel && isNested())
252 getDefaultExecutor()->helpSync(L);
253#endif
254 L.sync();
255}
256
257void TaskGroup::spawn(std::function<void()> F) {
258#if LLVM_ENABLE_THREADS
259 if (Parallel) {
260 L.inc();
261 getDefaultExecutor()->add(F: std::move(F), L);
262 return;
263 }
264#endif
265 F();
266}
267
268void llvm::parallelFor(size_t Begin, size_t End,
269 function_ref<void(size_t)> Fn) {
270#if LLVM_ENABLE_THREADS
271 if (strategy.ThreadsRequested != 1) {
272 size_t NumItems = End - Begin;
273 if (NumItems == 0)
274 return;
275 // Distribute work via an atomic counter shared by NumWorkers threads,
276 // keeping the task count (and thus Linux futex calls) at O(ThreadCount)
277 // For lld, per-file work is somewhat uneven, so a multipler > 1 is safer.
278 // While 2 vs 4 vs 8 makes no measurable difference, 4 is used as a
279 // reasonable default.
280 size_t NumWorkers = std::min<size_t>(a: NumItems, b: getThreadCount());
281 size_t ChunkSize = std::max(a: size_t(1), b: NumItems / (NumWorkers * 4));
282 std::atomic<size_t> Idx{Begin};
283 auto Worker = [&] {
284 while (true) {
285 size_t I = Idx.fetch_add(i: ChunkSize, m: std::memory_order_relaxed);
286 if (I >= End)
287 break;
288 size_t IEnd = std::min(a: I + ChunkSize, b: End);
289 for (; I < IEnd; ++I)
290 Fn(I);
291 }
292 };
293
294 TaskGroup TG;
295 for (size_t I = 0; I != NumWorkers; ++I)
296 TG.spawn(F: Worker);
297 return;
298 }
299#endif
300
301 for (; Begin != End; ++Begin)
302 Fn(Begin);
303}
304