1//===-- xray_buffer_queue.cpp ----------------------------------*- C++ -*-===//
2//
3// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4// See https://llvm.org/LICENSE.txt for license information.
5// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
6//
7//===----------------------------------------------------------------------===//
8//
9// This file is a part of XRay, a dynamic runtime instrumentation system.
10//
11// Defines the interface for a buffer queue implementation.
12//
13//===----------------------------------------------------------------------===//
14#include "xray_buffer_queue.h"
15#include "sanitizer_common/sanitizer_atomic.h"
16#include "sanitizer_common/sanitizer_common.h"
17#include "sanitizer_common/sanitizer_libc.h"
18#if !SANITIZER_FUCHSIA
19#include "sanitizer_common/sanitizer_posix.h"
20#endif
21#include "xray_allocator.h"
22#include "xray_defs.h"
23#include <memory>
24#include <sys/mman.h>
25
26using namespace __xray;
27
28namespace {
29
30BufferQueue::ControlBlock *allocControlBlock(size_t Size, size_t Count) {
31 auto B =
32 allocateBuffer(S: (sizeof(BufferQueue::ControlBlock) - 1) + (Size * Count));
33 return B == nullptr ? nullptr
34 : reinterpret_cast<BufferQueue::ControlBlock *>(B);
35}
36
37void deallocControlBlock(BufferQueue::ControlBlock *C, size_t Size,
38 size_t Count) {
39 deallocateBuffer(B: reinterpret_cast<unsigned char *>(C),
40 S: (sizeof(BufferQueue::ControlBlock) - 1) + (Size * Count));
41}
42
43void decRefCount(BufferQueue::ControlBlock *C, size_t Size, size_t Count) {
44 if (C == nullptr)
45 return;
46 if (atomic_fetch_sub(a: &C->RefCount, v: 1, mo: memory_order_acq_rel) == 1)
47 deallocControlBlock(C, Size, Count);
48}
49
50void incRefCount(BufferQueue::ControlBlock *C) {
51 if (C == nullptr)
52 return;
53 atomic_fetch_add(a: &C->RefCount, v: 1, mo: memory_order_acq_rel);
54}
55
56// We use a struct to ensure that we are allocating one atomic_uint64_t per
57// cache line. This allows us to not worry about false-sharing among atomic
58// objects being updated (constantly) by different threads.
59struct ExtentsPadded {
60 union {
61 atomic_uint64_t Extents;
62 unsigned char Storage[kCacheLineSize];
63 };
64};
65
66constexpr size_t kExtentsSize = sizeof(ExtentsPadded);
67
68} // namespace
69
70BufferQueue::ErrorCode BufferQueue::init(size_t BS, size_t BC) {
71 SpinMutexLock Guard(&Mutex);
72
73 if (!finalizing())
74 return BufferQueue::ErrorCode::AlreadyInitialized;
75
76 cleanupBuffers();
77
78 bool Success = false;
79 BufferSize = BS;
80 BufferCount = BC;
81
82 BackingStore = allocControlBlock(Size: BufferSize, Count: BufferCount);
83 if (BackingStore == nullptr)
84 return BufferQueue::ErrorCode::NotEnoughMemory;
85
86 auto CleanupBackingStore = at_scope_exit(fn: [&, this] {
87 if (Success)
88 return;
89 deallocControlBlock(C: BackingStore, Size: BufferSize, Count: BufferCount);
90 BackingStore = nullptr;
91 });
92
93 // Initialize enough atomic_uint64_t instances, each
94 ExtentsBackingStore = allocControlBlock(Size: kExtentsSize, Count: BufferCount);
95 if (ExtentsBackingStore == nullptr)
96 return BufferQueue::ErrorCode::NotEnoughMemory;
97
98 auto CleanupExtentsBackingStore = at_scope_exit(fn: [&, this] {
99 if (Success)
100 return;
101 deallocControlBlock(C: ExtentsBackingStore, Size: kExtentsSize, Count: BufferCount);
102 ExtentsBackingStore = nullptr;
103 });
104
105 Buffers = initArray<BufferRep>(N: BufferCount);
106 if (Buffers == nullptr)
107 return BufferQueue::ErrorCode::NotEnoughMemory;
108
109 // At this point we increment the generation number to associate the buffers
110 // to the new generation.
111 atomic_fetch_add(a: &Generation, v: 1, mo: memory_order_acq_rel);
112
113 // First, we initialize the refcount in the ControlBlock, which we treat as
114 // being at the start of the BackingStore pointer.
115 atomic_store(a: &BackingStore->RefCount, v: 1, mo: memory_order_release);
116 atomic_store(a: &ExtentsBackingStore->RefCount, v: 1, mo: memory_order_release);
117
118 // Then we initialise the individual buffers that sub-divide the whole backing
119 // store. Each buffer will start at the `Data` member of the ControlBlock, and
120 // will be offsets from these locations.
121 for (size_t i = 0; i < BufferCount; ++i) {
122 auto &T = Buffers[i];
123 auto &Buf = T.Buff;
124 auto *E = reinterpret_cast<ExtentsPadded *>(&ExtentsBackingStore->Data +
125 (kExtentsSize * i));
126 Buf.Extents = &E->Extents;
127 atomic_store(a: Buf.Extents, v: 0, mo: memory_order_release);
128 Buf.Generation = generation();
129 Buf.Data = &BackingStore->Data + (BufferSize * i);
130 Buf.Size = BufferSize;
131 Buf.BackingStore = BackingStore;
132 Buf.ExtentsBackingStore = ExtentsBackingStore;
133 Buf.Count = BufferCount;
134 T.Used = false;
135 }
136
137 Next = Buffers;
138 First = Buffers;
139 LiveBuffers = 0;
140 atomic_store(a: &Finalizing, v: 0, mo: memory_order_release);
141 Success = true;
142 return BufferQueue::ErrorCode::Ok;
143}
144
145BufferQueue::BufferQueue(size_t B, size_t N,
146 bool &Success) XRAY_NEVER_INSTRUMENT
147 : BufferSize(B),
148 BufferCount(N),
149 Mutex(),
150 Finalizing{.val_dont_use: 1},
151 BackingStore(nullptr),
152 ExtentsBackingStore(nullptr),
153 Buffers(nullptr),
154 Next(Buffers),
155 First(Buffers),
156 LiveBuffers(0),
157 Generation{.val_dont_use: 0} {
158 Success = init(BS: B, BC: N) == BufferQueue::ErrorCode::Ok;
159}
160
161BufferQueue::ErrorCode BufferQueue::getBuffer(Buffer &Buf) {
162 if (atomic_load(a: &Finalizing, mo: memory_order_acquire))
163 return ErrorCode::QueueFinalizing;
164
165 BufferRep *B = nullptr;
166 {
167 SpinMutexLock Guard(&Mutex);
168 if (LiveBuffers == BufferCount)
169 return ErrorCode::NotEnoughMemory;
170 B = Next++;
171 if (Next == (Buffers + BufferCount))
172 Next = Buffers;
173 ++LiveBuffers;
174 }
175
176 incRefCount(C: BackingStore);
177 incRefCount(C: ExtentsBackingStore);
178 Buf = B->Buff;
179 Buf.Generation = generation();
180 B->Used = true;
181 return ErrorCode::Ok;
182}
183
184BufferQueue::ErrorCode BufferQueue::releaseBuffer(Buffer &Buf) {
185 // Check whether the buffer being referred to is within the bounds of the
186 // backing store's range.
187 BufferRep *B = nullptr;
188 {
189 SpinMutexLock Guard(&Mutex);
190 if (Buf.Generation != generation() || LiveBuffers == 0) {
191 Buf = {};
192 decRefCount(C: Buf.BackingStore, Size: Buf.Size, Count: Buf.Count);
193 decRefCount(C: Buf.ExtentsBackingStore, Size: kExtentsSize, Count: Buf.Count);
194 return BufferQueue::ErrorCode::Ok;
195 }
196
197 if (Buf.Data < &BackingStore->Data ||
198 Buf.Data > &BackingStore->Data + (BufferCount * BufferSize))
199 return BufferQueue::ErrorCode::UnrecognizedBuffer;
200
201 --LiveBuffers;
202 B = First++;
203 if (First == (Buffers + BufferCount))
204 First = Buffers;
205 }
206
207 // Now that the buffer has been released, we mark it as "used".
208 B->Buff = Buf;
209 B->Used = true;
210 decRefCount(C: Buf.BackingStore, Size: Buf.Size, Count: Buf.Count);
211 decRefCount(C: Buf.ExtentsBackingStore, Size: kExtentsSize, Count: Buf.Count);
212 atomic_store(a: B->Buff.Extents, v: atomic_load(a: Buf.Extents, mo: memory_order_acquire),
213 mo: memory_order_release);
214 Buf = {};
215 return ErrorCode::Ok;
216}
217
218BufferQueue::ErrorCode BufferQueue::finalize() {
219 if (atomic_exchange(a: &Finalizing, v: 1, mo: memory_order_acq_rel))
220 return ErrorCode::QueueFinalizing;
221 return ErrorCode::Ok;
222}
223
224void BufferQueue::cleanupBuffers() {
225 for (auto B = Buffers, E = Buffers + BufferCount; B != E; ++B)
226 B->~BufferRep();
227 deallocateBuffer(B: Buffers, S: BufferCount);
228 decRefCount(C: BackingStore, Size: BufferSize, Count: BufferCount);
229 decRefCount(C: ExtentsBackingStore, Size: kExtentsSize, Count: BufferCount);
230 BackingStore = nullptr;
231 ExtentsBackingStore = nullptr;
232 Buffers = nullptr;
233 BufferCount = 0;
234 BufferSize = 0;
235}
236
237BufferQueue::~BufferQueue() { cleanupBuffers(); }
238