| 1 | //===- llvm/Support/Parallel.cpp - Parallel algorithms --------------------===// |
| 2 | // |
| 3 | // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. |
| 4 | // See https://llvm.org/LICENSE.txt for license information. |
| 5 | // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception |
| 6 | // |
| 7 | //===----------------------------------------------------------------------===// |
| 8 | |
| 9 | #include "llvm/Support/Parallel.h" |
| 10 | #include "llvm/Config/llvm-config.h" |
| 11 | #include "llvm/Support/ManagedStatic.h" |
| 12 | #include "llvm/Support/Threading.h" |
| 13 | |
| 14 | #include <atomic> |
| 15 | #include <future> |
| 16 | #include <thread> |
| 17 | #include <vector> |
| 18 | |
| 19 | llvm::ThreadPoolStrategy llvm::parallel::strategy; |
| 20 | |
| 21 | namespace llvm { |
| 22 | namespace parallel { |
| 23 | #if LLVM_ENABLE_THREADS |
| 24 | |
| 25 | #ifdef _WIN32 |
| 26 | static thread_local unsigned threadIndex = UINT_MAX; |
| 27 | |
| 28 | unsigned getThreadIndex() { GET_THREAD_INDEX_IMPL; } |
| 29 | #else |
| 30 | thread_local unsigned threadIndex = UINT_MAX; |
| 31 | #endif |
| 32 | |
| 33 | namespace detail { |
| 34 | |
| 35 | namespace { |
| 36 | |
| 37 | /// An abstract class that takes closures and runs them asynchronously. |
| 38 | class Executor { |
| 39 | public: |
| 40 | virtual ~Executor() = default; |
| 41 | virtual void add(std::function<void()> func) = 0; |
| 42 | virtual size_t getThreadCount() const = 0; |
| 43 | |
| 44 | static Executor *getDefaultExecutor(); |
| 45 | }; |
| 46 | |
| 47 | /// An implementation of an Executor that runs closures on a thread pool |
| 48 | /// in filo order. |
| 49 | class ThreadPoolExecutor : public Executor { |
| 50 | public: |
| 51 | explicit ThreadPoolExecutor(ThreadPoolStrategy S) { |
| 52 | ThreadCount = S.compute_thread_count(); |
| 53 | // Spawn all but one of the threads in another thread as spawning threads |
| 54 | // can take a while. |
| 55 | Threads.reserve(n: ThreadCount); |
| 56 | Threads.resize(new_size: 1); |
| 57 | std::lock_guard<std::mutex> Lock(Mutex); |
| 58 | // Use operator[] before creating the thread to avoid data race in .size() |
| 59 | // in 'safe libc++' mode. |
| 60 | auto &Thread0 = Threads[0]; |
| 61 | Thread0 = std::thread([this, S] { |
| 62 | for (unsigned I = 1; I < ThreadCount; ++I) { |
| 63 | Threads.emplace_back(args: [=] { work(S, ThreadID: I); }); |
| 64 | if (Stop) |
| 65 | break; |
| 66 | } |
| 67 | ThreadsCreated.set_value(); |
| 68 | work(S, ThreadID: 0); |
| 69 | }); |
| 70 | } |
| 71 | |
| 72 | void stop() { |
| 73 | { |
| 74 | std::lock_guard<std::mutex> Lock(Mutex); |
| 75 | if (Stop) |
| 76 | return; |
| 77 | Stop = true; |
| 78 | } |
| 79 | Cond.notify_all(); |
| 80 | ThreadsCreated.get_future().wait(); |
| 81 | } |
| 82 | |
| 83 | ~ThreadPoolExecutor() override { |
| 84 | stop(); |
| 85 | std::thread::id CurrentThreadId = std::this_thread::get_id(); |
| 86 | for (std::thread &T : Threads) |
| 87 | if (T.get_id() == CurrentThreadId) |
| 88 | T.detach(); |
| 89 | else |
| 90 | T.join(); |
| 91 | } |
| 92 | |
| 93 | struct Creator { |
| 94 | static void *call() { return new ThreadPoolExecutor(strategy); } |
| 95 | }; |
| 96 | struct Deleter { |
| 97 | static void call(void *Ptr) { ((ThreadPoolExecutor *)Ptr)->stop(); } |
| 98 | }; |
| 99 | |
| 100 | void add(std::function<void()> F) override { |
| 101 | { |
| 102 | std::lock_guard<std::mutex> Lock(Mutex); |
| 103 | WorkStack.push_back(x: std::move(F)); |
| 104 | } |
| 105 | Cond.notify_one(); |
| 106 | } |
| 107 | |
| 108 | size_t getThreadCount() const override { return ThreadCount; } |
| 109 | |
| 110 | private: |
| 111 | void work(ThreadPoolStrategy S, unsigned ThreadID) { |
| 112 | threadIndex = ThreadID; |
| 113 | S.apply_thread_strategy(ThreadPoolNum: ThreadID); |
| 114 | while (true) { |
| 115 | std::unique_lock<std::mutex> Lock(Mutex); |
| 116 | Cond.wait(lock&: Lock, p: [&] { return Stop || !WorkStack.empty(); }); |
| 117 | if (Stop) |
| 118 | break; |
| 119 | auto Task = std::move(WorkStack.back()); |
| 120 | WorkStack.pop_back(); |
| 121 | Lock.unlock(); |
| 122 | Task(); |
| 123 | } |
| 124 | } |
| 125 | |
| 126 | std::atomic<bool> Stop{false}; |
| 127 | std::vector<std::function<void()>> WorkStack; |
| 128 | std::mutex Mutex; |
| 129 | std::condition_variable Cond; |
| 130 | std::promise<void> ThreadsCreated; |
| 131 | std::vector<std::thread> Threads; |
| 132 | unsigned ThreadCount; |
| 133 | }; |
| 134 | |
| 135 | Executor *Executor::getDefaultExecutor() { |
| 136 | #ifdef _WIN32 |
| 137 | // The ManagedStatic enables the ThreadPoolExecutor to be stopped via |
| 138 | // llvm_shutdown() which allows a "clean" fast exit, e.g. via _exit(). This |
| 139 | // stops the thread pool and waits for any worker thread creation to complete |
| 140 | // but does not wait for the threads to finish. The wait for worker thread |
| 141 | // creation to complete is important as it prevents intermittent crashes on |
| 142 | // Windows due to a race condition between thread creation and process exit. |
| 143 | // |
| 144 | // The ThreadPoolExecutor will only be destroyed when the static unique_ptr to |
| 145 | // it is destroyed, i.e. in a normal full exit. The ThreadPoolExecutor |
| 146 | // destructor ensures it has been stopped and waits for worker threads to |
| 147 | // finish. The wait is important as it prevents intermittent crashes on |
| 148 | // Windows when the process is doing a full exit. |
| 149 | // |
| 150 | // The Windows crashes appear to only occur with the MSVC static runtimes and |
| 151 | // are more frequent with the debug static runtime. |
| 152 | // |
| 153 | // This also prevents intermittent deadlocks on exit with the MinGW runtime. |
| 154 | |
| 155 | static ManagedStatic<ThreadPoolExecutor, ThreadPoolExecutor::Creator, |
| 156 | ThreadPoolExecutor::Deleter> |
| 157 | ManagedExec; |
| 158 | static std::unique_ptr<ThreadPoolExecutor> Exec(&(*ManagedExec)); |
| 159 | return Exec.get(); |
| 160 | #else |
| 161 | // ManagedStatic is not desired on other platforms. When `Exec` is destroyed |
| 162 | // by llvm_shutdown(), worker threads will clean up and invoke TLS |
| 163 | // destructors. This can lead to race conditions if other threads attempt to |
| 164 | // access TLS objects that have already been destroyed. |
| 165 | static ThreadPoolExecutor Exec(strategy); |
| 166 | return &Exec; |
| 167 | #endif |
| 168 | } |
| 169 | } // namespace |
| 170 | } // namespace detail |
| 171 | |
| 172 | size_t getThreadCount() { |
| 173 | return detail::Executor::getDefaultExecutor()->getThreadCount(); |
| 174 | } |
| 175 | #endif |
| 176 | |
| 177 | // Latch::sync() called by the dtor may cause one thread to block. If is a dead |
| 178 | // lock if all threads in the default executor are blocked. To prevent the dead |
| 179 | // lock, only allow the root TaskGroup to run tasks parallelly. In the scenario |
| 180 | // of nested parallel_for_each(), only the outermost one runs parallelly. |
| 181 | TaskGroup::TaskGroup() |
| 182 | #if LLVM_ENABLE_THREADS |
| 183 | : Parallel((parallel::strategy.ThreadsRequested != 1) && |
| 184 | (threadIndex == UINT_MAX)) {} |
| 185 | #else |
| 186 | : Parallel(false) {} |
| 187 | #endif |
| 188 | TaskGroup::~TaskGroup() { |
| 189 | // We must ensure that all the workloads have finished before decrementing the |
| 190 | // instances count. |
| 191 | L.sync(); |
| 192 | } |
| 193 | |
| 194 | void TaskGroup::spawn(std::function<void()> F) { |
| 195 | #if LLVM_ENABLE_THREADS |
| 196 | if (Parallel) { |
| 197 | L.inc(); |
| 198 | detail::Executor::getDefaultExecutor()->add(func: [&, F = std::move(F)] { |
| 199 | F(); |
| 200 | L.dec(); |
| 201 | }); |
| 202 | return; |
| 203 | } |
| 204 | #endif |
| 205 | F(); |
| 206 | } |
| 207 | |
| 208 | } // namespace parallel |
| 209 | } // namespace llvm |
| 210 | |
| 211 | void llvm::parallelFor(size_t Begin, size_t End, |
| 212 | llvm::function_ref<void(size_t)> Fn) { |
| 213 | #if LLVM_ENABLE_THREADS |
| 214 | if (parallel::strategy.ThreadsRequested != 1) { |
| 215 | auto NumItems = End - Begin; |
| 216 | // Limit the number of tasks to MaxTasksPerGroup to limit job scheduling |
| 217 | // overhead on large inputs. |
| 218 | auto TaskSize = NumItems / parallel::detail::MaxTasksPerGroup; |
| 219 | if (TaskSize == 0) |
| 220 | TaskSize = 1; |
| 221 | |
| 222 | parallel::TaskGroup TG; |
| 223 | for (; Begin + TaskSize < End; Begin += TaskSize) { |
| 224 | TG.spawn(F: [=, &Fn] { |
| 225 | for (size_t I = Begin, E = Begin + TaskSize; I != E; ++I) |
| 226 | Fn(I); |
| 227 | }); |
| 228 | } |
| 229 | if (Begin != End) { |
| 230 | TG.spawn(F: [=, &Fn] { |
| 231 | for (size_t I = Begin; I != End; ++I) |
| 232 | Fn(I); |
| 233 | }); |
| 234 | } |
| 235 | return; |
| 236 | } |
| 237 | #endif |
| 238 | |
| 239 | for (; Begin != End; ++Begin) |
| 240 | Fn(Begin); |
| 241 | } |
| 242 | |