1 | //==-- llvm/Support/ThreadPool.cpp - A ThreadPool implementation -*- C++ -*-==// |
2 | // |
3 | // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. |
4 | // See https://llvm.org/LICENSE.txt for license information. |
5 | // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception |
6 | // |
7 | //===----------------------------------------------------------------------===// |
8 | // |
9 | // This file implements a crude C++11 based thread pool. |
10 | // |
11 | //===----------------------------------------------------------------------===// |
12 | |
13 | #include "llvm/Support/ThreadPool.h" |
14 | |
15 | #include "llvm/Config/llvm-config.h" |
16 | |
17 | #include "llvm/Support/FormatVariadic.h" |
18 | #include "llvm/Support/Threading.h" |
19 | #include "llvm/Support/raw_ostream.h" |
20 | |
21 | using namespace llvm; |
22 | |
23 | ThreadPoolInterface::~ThreadPoolInterface() = default; |
24 | |
25 | // A note on thread groups: Tasks are by default in no group (represented |
26 | // by nullptr ThreadPoolTaskGroup pointer in the Tasks queue) and functionality |
27 | // here normally works on all tasks regardless of their group (functions |
28 | // in that case receive nullptr ThreadPoolTaskGroup pointer as argument). |
29 | // A task in a group has a pointer to that ThreadPoolTaskGroup in the Tasks |
30 | // queue, and functions called to work only on tasks from one group take that |
31 | // pointer. |
32 | |
33 | #if LLVM_ENABLE_THREADS |
34 | |
35 | StdThreadPool::StdThreadPool(ThreadPoolStrategy S) |
36 | : Strategy(S), MaxThreadCount(S.compute_thread_count()) {} |
37 | |
38 | void StdThreadPool::grow(int requested) { |
39 | llvm::sys::ScopedWriter LockGuard(ThreadsLock); |
40 | if (Threads.size() >= MaxThreadCount) |
41 | return; // Already hit the max thread pool size. |
42 | int newThreadCount = std::min<int>(a: requested, b: MaxThreadCount); |
43 | while (static_cast<int>(Threads.size()) < newThreadCount) { |
44 | int ThreadID = Threads.size(); |
45 | Threads.emplace_back(args: [this, ThreadID] { |
46 | set_thread_name(formatv(Fmt: "llvm-worker-{0}" , Vals: ThreadID)); |
47 | Strategy.apply_thread_strategy(ThreadPoolNum: ThreadID); |
48 | processTasks(WaitingForGroup: nullptr); |
49 | }); |
50 | } |
51 | } |
52 | |
53 | #ifndef NDEBUG |
54 | // The group of the tasks run by the current thread. |
55 | static LLVM_THREAD_LOCAL std::vector<ThreadPoolTaskGroup *> |
56 | *CurrentThreadTaskGroups = nullptr; |
57 | #endif |
58 | |
59 | // WaitingForGroup == nullptr means all tasks regardless of their group. |
60 | void StdThreadPool::processTasks(ThreadPoolTaskGroup *WaitingForGroup) { |
61 | while (true) { |
62 | std::function<void()> Task; |
63 | ThreadPoolTaskGroup *GroupOfTask; |
64 | { |
65 | std::unique_lock<std::mutex> LockGuard(QueueLock); |
66 | bool workCompletedForGroup = false; // Result of workCompletedUnlocked() |
67 | // Wait for tasks to be pushed in the queue |
68 | QueueCondition.wait(lock&: LockGuard, p: [&] { |
69 | return !EnableFlag || !Tasks.empty() || |
70 | (WaitingForGroup != nullptr && |
71 | (workCompletedForGroup = |
72 | workCompletedUnlocked(Group: WaitingForGroup))); |
73 | }); |
74 | // Exit condition |
75 | if (!EnableFlag && Tasks.empty()) |
76 | return; |
77 | if (WaitingForGroup != nullptr && workCompletedForGroup) |
78 | return; |
79 | // Yeah, we have a task, grab it and release the lock on the queue |
80 | |
81 | // We first need to signal that we are active before popping the queue |
82 | // in order for wait() to properly detect that even if the queue is |
83 | // empty, there is still a task in flight. |
84 | ++ActiveThreads; |
85 | Task = std::move(Tasks.front().first); |
86 | GroupOfTask = Tasks.front().second; |
87 | // Need to count active threads in each group separately, ActiveThreads |
88 | // would never be 0 if waiting for another group inside a wait. |
89 | if (GroupOfTask != nullptr) |
90 | ++ActiveGroups[GroupOfTask]; // Increment or set to 1 if new item |
91 | Tasks.pop_front(); |
92 | } |
93 | #ifndef NDEBUG |
94 | if (CurrentThreadTaskGroups == nullptr) |
95 | CurrentThreadTaskGroups = new std::vector<ThreadPoolTaskGroup *>; |
96 | CurrentThreadTaskGroups->push_back(GroupOfTask); |
97 | #endif |
98 | |
99 | // Run the task we just grabbed |
100 | Task(); |
101 | |
102 | #ifndef NDEBUG |
103 | CurrentThreadTaskGroups->pop_back(); |
104 | if (CurrentThreadTaskGroups->empty()) { |
105 | delete CurrentThreadTaskGroups; |
106 | CurrentThreadTaskGroups = nullptr; |
107 | } |
108 | #endif |
109 | |
110 | bool Notify; |
111 | bool NotifyGroup; |
112 | { |
113 | // Adjust `ActiveThreads`, in case someone waits on StdThreadPool::wait() |
114 | std::lock_guard<std::mutex> LockGuard(QueueLock); |
115 | --ActiveThreads; |
116 | if (GroupOfTask != nullptr) { |
117 | auto A = ActiveGroups.find(Val: GroupOfTask); |
118 | if (--(A->second) == 0) |
119 | ActiveGroups.erase(I: A); |
120 | } |
121 | Notify = workCompletedUnlocked(Group: GroupOfTask); |
122 | NotifyGroup = GroupOfTask != nullptr && Notify; |
123 | } |
124 | // Notify task completion if this is the last active thread, in case |
125 | // someone waits on StdThreadPool::wait(). |
126 | if (Notify) |
127 | CompletionCondition.notify_all(); |
128 | // If this was a task in a group, notify also threads waiting for tasks |
129 | // in this function on QueueCondition, to make a recursive wait() return |
130 | // after the group it's been waiting for has finished. |
131 | if (NotifyGroup) |
132 | QueueCondition.notify_all(); |
133 | } |
134 | } |
135 | |
136 | bool StdThreadPool::workCompletedUnlocked(ThreadPoolTaskGroup *Group) const { |
137 | if (Group == nullptr) |
138 | return !ActiveThreads && Tasks.empty(); |
139 | return ActiveGroups.count(Val: Group) == 0 && |
140 | !llvm::any_of(Range: Tasks, |
141 | P: [Group](const auto &T) { return T.second == Group; }); |
142 | } |
143 | |
144 | void StdThreadPool::wait() { |
145 | assert(!isWorkerThread()); // Would deadlock waiting for itself. |
146 | // Wait for all threads to complete and the queue to be empty |
147 | std::unique_lock<std::mutex> LockGuard(QueueLock); |
148 | CompletionCondition.wait(lock&: LockGuard, |
149 | p: [&] { return workCompletedUnlocked(Group: nullptr); }); |
150 | } |
151 | |
152 | void StdThreadPool::wait(ThreadPoolTaskGroup &Group) { |
153 | // Wait for all threads in the group to complete. |
154 | if (!isWorkerThread()) { |
155 | std::unique_lock<std::mutex> LockGuard(QueueLock); |
156 | CompletionCondition.wait(lock&: LockGuard, |
157 | p: [&] { return workCompletedUnlocked(Group: &Group); }); |
158 | return; |
159 | } |
160 | // Make sure to not deadlock waiting for oneself. |
161 | assert(CurrentThreadTaskGroups == nullptr || |
162 | !llvm::is_contained(*CurrentThreadTaskGroups, &Group)); |
163 | // Handle the case of recursive call from another task in a different group, |
164 | // in which case process tasks while waiting to keep the thread busy and avoid |
165 | // possible deadlock. |
166 | processTasks(WaitingForGroup: &Group); |
167 | } |
168 | |
169 | bool StdThreadPool::isWorkerThread() const { |
170 | llvm::sys::ScopedReader LockGuard(ThreadsLock); |
171 | llvm::thread::id CurrentThreadId = llvm::this_thread::get_id(); |
172 | for (const llvm::thread &Thread : Threads) |
173 | if (CurrentThreadId == Thread.get_id()) |
174 | return true; |
175 | return false; |
176 | } |
177 | |
178 | // The destructor joins all threads, waiting for completion. |
179 | StdThreadPool::~StdThreadPool() { |
180 | { |
181 | std::unique_lock<std::mutex> LockGuard(QueueLock); |
182 | EnableFlag = false; |
183 | } |
184 | QueueCondition.notify_all(); |
185 | llvm::sys::ScopedReader LockGuard(ThreadsLock); |
186 | for (auto &Worker : Threads) |
187 | Worker.join(); |
188 | } |
189 | |
190 | #endif // LLVM_ENABLE_THREADS Disabled |
191 | |
192 | // No threads are launched, issue a warning if ThreadCount is not 0 |
193 | SingleThreadExecutor::SingleThreadExecutor(ThreadPoolStrategy S) { |
194 | int ThreadCount = S.compute_thread_count(); |
195 | if (ThreadCount != 1) { |
196 | errs() << "Warning: request a ThreadPool with " << ThreadCount |
197 | << " threads, but LLVM_ENABLE_THREADS has been turned off\n" ; |
198 | } |
199 | } |
200 | |
201 | void SingleThreadExecutor::wait() { |
202 | // Sequential implementation running the tasks |
203 | while (!Tasks.empty()) { |
204 | auto Task = std::move(Tasks.front().first); |
205 | Tasks.pop_front(); |
206 | Task(); |
207 | } |
208 | } |
209 | |
210 | void SingleThreadExecutor::wait(ThreadPoolTaskGroup &) { |
211 | // Simply wait for all, this works even if recursive (the running task |
212 | // is already removed from the queue). |
213 | wait(); |
214 | } |
215 | |
216 | bool SingleThreadExecutor::isWorkerThread() const { |
217 | report_fatal_error(reason: "LLVM compiled without multithreading" ); |
218 | } |
219 | |
220 | SingleThreadExecutor::~SingleThreadExecutor() { wait(); } |
221 | |