| 1 | //===- llvm/Support/Parallel.cpp - Parallel algorithms --------------------===// |
| 2 | // |
| 3 | // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. |
| 4 | // See https://llvm.org/LICENSE.txt for license information. |
| 5 | // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception |
| 6 | // |
| 7 | //===----------------------------------------------------------------------===// |
| 8 | |
| 9 | #include "llvm/Support/Parallel.h" |
| 10 | #include "llvm/ADT/ScopeExit.h" |
| 11 | #include "llvm/Config/llvm-config.h" |
| 12 | #include "llvm/Support/ExponentialBackoff.h" |
| 13 | #include "llvm/Support/Jobserver.h" |
| 14 | #include "llvm/Support/ManagedStatic.h" |
| 15 | #include "llvm/Support/Threading.h" |
| 16 | |
| 17 | #include <atomic> |
| 18 | #include <future> |
| 19 | #include <memory> |
| 20 | #include <mutex> |
| 21 | #include <thread> |
| 22 | #include <vector> |
| 23 | |
| 24 | llvm::ThreadPoolStrategy llvm::parallel::strategy; |
| 25 | |
| 26 | namespace llvm { |
| 27 | namespace parallel { |
| 28 | #if LLVM_ENABLE_THREADS |
| 29 | |
| 30 | #ifdef _WIN32 |
| 31 | static thread_local unsigned threadIndex = UINT_MAX; |
| 32 | |
| 33 | unsigned getThreadIndex() { GET_THREAD_INDEX_IMPL; } |
| 34 | #else |
| 35 | thread_local unsigned threadIndex = UINT_MAX; |
| 36 | #endif |
| 37 | |
| 38 | namespace detail { |
| 39 | |
| 40 | namespace { |
| 41 | |
| 42 | /// An abstract class that takes closures and runs them asynchronously. |
| 43 | class Executor { |
| 44 | public: |
| 45 | virtual ~Executor() = default; |
| 46 | virtual void add(std::function<void()> func) = 0; |
| 47 | virtual size_t getThreadCount() const = 0; |
| 48 | |
| 49 | static Executor *getDefaultExecutor(); |
| 50 | }; |
| 51 | |
| 52 | /// An implementation of an Executor that runs closures on a thread pool |
| 53 | /// in filo order. |
| 54 | class ThreadPoolExecutor : public Executor { |
| 55 | public: |
| 56 | explicit ThreadPoolExecutor(ThreadPoolStrategy S) { |
| 57 | if (S.UseJobserver) |
| 58 | TheJobserver = JobserverClient::getInstance(); |
| 59 | |
| 60 | ThreadCount = S.compute_thread_count(); |
| 61 | // Spawn all but one of the threads in another thread as spawning threads |
| 62 | // can take a while. |
| 63 | Threads.reserve(n: ThreadCount); |
| 64 | Threads.resize(new_size: 1); |
| 65 | std::lock_guard<std::mutex> Lock(Mutex); |
| 66 | // Use operator[] before creating the thread to avoid data race in .size() |
| 67 | // in 'safe libc++' mode. |
| 68 | auto &Thread0 = Threads[0]; |
| 69 | Thread0 = std::thread([this, S] { |
| 70 | for (unsigned I = 1; I < ThreadCount; ++I) { |
| 71 | Threads.emplace_back(args: [this, S, I] { work(S, ThreadID: I); }); |
| 72 | if (Stop) |
| 73 | break; |
| 74 | } |
| 75 | ThreadsCreated.set_value(); |
| 76 | work(S, ThreadID: 0); |
| 77 | }); |
| 78 | } |
| 79 | |
| 80 | // To make sure the thread pool executor can only be created with a parallel |
| 81 | // strategy. |
| 82 | ThreadPoolExecutor() = delete; |
| 83 | |
| 84 | void stop() { |
| 85 | { |
| 86 | std::lock_guard<std::mutex> Lock(Mutex); |
| 87 | if (Stop) |
| 88 | return; |
| 89 | Stop = true; |
| 90 | } |
| 91 | Cond.notify_all(); |
| 92 | ThreadsCreated.get_future().wait(); |
| 93 | |
| 94 | std::thread::id CurrentThreadId = std::this_thread::get_id(); |
| 95 | for (std::thread &T : Threads) |
| 96 | if (T.get_id() == CurrentThreadId) |
| 97 | T.detach(); |
| 98 | else |
| 99 | T.join(); |
| 100 | } |
| 101 | |
| 102 | ~ThreadPoolExecutor() override { stop(); } |
| 103 | |
| 104 | struct Creator { |
| 105 | static void *call() { return new ThreadPoolExecutor(strategy); } |
| 106 | }; |
| 107 | struct Deleter { |
| 108 | static void call(void *Ptr) { ((ThreadPoolExecutor *)Ptr)->stop(); } |
| 109 | }; |
| 110 | |
| 111 | void add(std::function<void()> F) override { |
| 112 | { |
| 113 | std::lock_guard<std::mutex> Lock(Mutex); |
| 114 | WorkStack.push_back(x: std::move(F)); |
| 115 | } |
| 116 | Cond.notify_one(); |
| 117 | } |
| 118 | |
| 119 | size_t getThreadCount() const override { return ThreadCount; } |
| 120 | |
| 121 | private: |
| 122 | void work(ThreadPoolStrategy S, unsigned ThreadID) { |
| 123 | threadIndex = ThreadID; |
| 124 | S.apply_thread_strategy(ThreadPoolNum: ThreadID); |
| 125 | // Note on jobserver deadlock avoidance: |
| 126 | // GNU Make grants each invoked process one implicit job slot. Our |
| 127 | // JobserverClient models this by returning an implicit JobSlot on the |
| 128 | // first successful tryAcquire() in a process. This guarantees forward |
| 129 | // progress without requiring a dedicated "always-on" thread here. |
| 130 | |
| 131 | while (true) { |
| 132 | if (TheJobserver) { |
| 133 | // Jobserver-mode scheduling: |
| 134 | // - Acquire one job slot (with exponential backoff to avoid busy-wait). |
| 135 | // - While holding the slot, drain and run tasks from the local queue. |
| 136 | // - Release the slot when the queue is empty or when shutting down. |
| 137 | // Rationale: Holding a slot amortizes acquire/release overhead over |
| 138 | // multiple tasks and avoids requeue/yield churn, while still enforcing |
| 139 | // the jobserver’s global concurrency limit. With K available slots, |
| 140 | // up to K workers run tasks in parallel; within each worker tasks run |
| 141 | // sequentially until the local queue is empty. |
| 142 | ExponentialBackoff Backoff(std::chrono::hours(24)); |
| 143 | JobSlot Slot; |
| 144 | do { |
| 145 | if (Stop) |
| 146 | return; |
| 147 | Slot = TheJobserver->tryAcquire(); |
| 148 | if (Slot.isValid()) |
| 149 | break; |
| 150 | } while (Backoff.waitForNextAttempt()); |
| 151 | |
| 152 | llvm::scope_exit SlotReleaser( |
| 153 | [&] { TheJobserver->release(Slot: std::move(Slot)); }); |
| 154 | |
| 155 | while (true) { |
| 156 | std::function<void()> Task; |
| 157 | { |
| 158 | std::unique_lock<std::mutex> Lock(Mutex); |
| 159 | Cond.wait(lock&: Lock, p: [&] { return Stop || !WorkStack.empty(); }); |
| 160 | if (Stop && WorkStack.empty()) |
| 161 | return; |
| 162 | if (WorkStack.empty()) |
| 163 | break; |
| 164 | Task = std::move(WorkStack.back()); |
| 165 | WorkStack.pop_back(); |
| 166 | } |
| 167 | Task(); |
| 168 | } |
| 169 | } else { |
| 170 | std::unique_lock<std::mutex> Lock(Mutex); |
| 171 | Cond.wait(lock&: Lock, p: [&] { return Stop || !WorkStack.empty(); }); |
| 172 | if (Stop) |
| 173 | break; |
| 174 | auto Task = std::move(WorkStack.back()); |
| 175 | WorkStack.pop_back(); |
| 176 | Lock.unlock(); |
| 177 | Task(); |
| 178 | } |
| 179 | } |
| 180 | } |
| 181 | |
| 182 | std::atomic<bool> Stop{false}; |
| 183 | std::vector<std::function<void()>> WorkStack; |
| 184 | std::mutex Mutex; |
| 185 | std::condition_variable Cond; |
| 186 | std::promise<void> ThreadsCreated; |
| 187 | std::vector<std::thread> Threads; |
| 188 | unsigned ThreadCount; |
| 189 | |
| 190 | JobserverClient *TheJobserver = nullptr; |
| 191 | }; |
| 192 | |
| 193 | Executor *Executor::getDefaultExecutor() { |
| 194 | #ifdef _WIN32 |
| 195 | // The ManagedStatic enables the ThreadPoolExecutor to be stopped via |
| 196 | // llvm_shutdown() on Windows. This is important to avoid various race |
| 197 | // conditions at process exit that can cause crashes or deadlocks. |
| 198 | |
| 199 | static ManagedStatic<ThreadPoolExecutor, ThreadPoolExecutor::Creator, |
| 200 | ThreadPoolExecutor::Deleter> |
| 201 | ManagedExec; |
| 202 | static std::unique_ptr<ThreadPoolExecutor> Exec(&(*ManagedExec)); |
| 203 | return Exec.get(); |
| 204 | #else |
| 205 | // ManagedStatic is not desired on other platforms. When `Exec` is destroyed |
| 206 | // by llvm_shutdown(), worker threads will clean up and invoke TLS |
| 207 | // destructors. This can lead to race conditions if other threads attempt to |
| 208 | // access TLS objects that have already been destroyed. |
| 209 | static ThreadPoolExecutor Exec(strategy); |
| 210 | return &Exec; |
| 211 | #endif |
| 212 | } |
| 213 | } // namespace |
| 214 | } // namespace detail |
| 215 | |
| 216 | size_t getThreadCount() { |
| 217 | return detail::Executor::getDefaultExecutor()->getThreadCount(); |
| 218 | } |
| 219 | #endif |
| 220 | |
| 221 | // Latch::sync() called by the dtor may cause one thread to block. If is a dead |
| 222 | // lock if all threads in the default executor are blocked. To prevent the dead |
| 223 | // lock, only allow the root TaskGroup to run tasks parallelly. In the scenario |
| 224 | // of nested parallel_for_each(), only the outermost one runs parallelly. |
| 225 | TaskGroup::TaskGroup() |
| 226 | #if LLVM_ENABLE_THREADS |
| 227 | : Parallel((parallel::strategy.ThreadsRequested != 1) && |
| 228 | (threadIndex == UINT_MAX)) {} |
| 229 | #else |
| 230 | : Parallel(false) {} |
| 231 | #endif |
| 232 | TaskGroup::~TaskGroup() { |
| 233 | // We must ensure that all the workloads have finished before decrementing the |
| 234 | // instances count. |
| 235 | L.sync(); |
| 236 | } |
| 237 | |
| 238 | void TaskGroup::spawn(std::function<void()> F) { |
| 239 | #if LLVM_ENABLE_THREADS |
| 240 | if (Parallel) { |
| 241 | L.inc(); |
| 242 | detail::Executor::getDefaultExecutor()->add(func: [&, F = std::move(F)] { |
| 243 | F(); |
| 244 | L.dec(); |
| 245 | }); |
| 246 | return; |
| 247 | } |
| 248 | #endif |
| 249 | F(); |
| 250 | } |
| 251 | |
| 252 | } // namespace parallel |
| 253 | } // namespace llvm |
| 254 | |
| 255 | void llvm::parallelFor(size_t Begin, size_t End, |
| 256 | llvm::function_ref<void(size_t)> Fn) { |
| 257 | #if LLVM_ENABLE_THREADS |
| 258 | if (parallel::strategy.ThreadsRequested != 1) { |
| 259 | auto NumItems = End - Begin; |
| 260 | // Limit the number of tasks to MaxTasksPerGroup to limit job scheduling |
| 261 | // overhead on large inputs. |
| 262 | auto TaskSize = NumItems / parallel::detail::MaxTasksPerGroup; |
| 263 | if (TaskSize == 0) |
| 264 | TaskSize = 1; |
| 265 | |
| 266 | parallel::TaskGroup TG; |
| 267 | for (; Begin + TaskSize < End; Begin += TaskSize) { |
| 268 | TG.spawn(F: [=, &Fn] { |
| 269 | for (size_t I = Begin, E = Begin + TaskSize; I != E; ++I) |
| 270 | Fn(I); |
| 271 | }); |
| 272 | } |
| 273 | if (Begin != End) { |
| 274 | TG.spawn(F: [=, &Fn] { |
| 275 | for (size_t I = Begin; I != End; ++I) |
| 276 | Fn(I); |
| 277 | }); |
| 278 | } |
| 279 | return; |
| 280 | } |
| 281 | #endif |
| 282 | |
| 283 | for (; Begin != End; ++Begin) |
| 284 | Fn(Begin); |
| 285 | } |
| 286 | |