1//===------- SimpleEPCServer.cpp - EPC over simple abstract channel -------===//
2//
3// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4// See https://llvm.org/LICENSE.txt for license information.
5// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
6//
7//===----------------------------------------------------------------------===//
8
9#include "llvm/ExecutionEngine/Orc/TargetProcess/SimpleRemoteEPCServer.h"
10
11#include "llvm/ExecutionEngine/Orc/Shared/OrcRTBridge.h"
12#include "llvm/ExecutionEngine/Orc/TargetProcess/RegisterEHFrames.h"
13#include "llvm/Support/FormatVariadic.h"
14#include "llvm/Support/Process.h"
15#include "llvm/TargetParser/Host.h"
16
17#include "OrcRTBootstrap.h"
18
19#define DEBUG_TYPE "orc"
20
21using namespace llvm::orc::shared;
22
23namespace llvm {
24namespace orc {
25
26ExecutorBootstrapService::~ExecutorBootstrapService() = default;
27
28SimpleRemoteEPCServer::Dispatcher::~Dispatcher() = default;
29
30#if LLVM_ENABLE_THREADS
31void SimpleRemoteEPCServer::ThreadDispatcher::dispatch(
32 unique_function<void()> Work) {
33 {
34 std::lock_guard<std::mutex> Lock(DispatchMutex);
35 if (!Running)
36 return;
37 ++Outstanding;
38 }
39
40 std::thread([this, Work = std::move(Work)]() mutable {
41 Work();
42 std::lock_guard<std::mutex> Lock(DispatchMutex);
43 --Outstanding;
44 OutstandingCV.notify_all();
45 }).detach();
46}
47
48void SimpleRemoteEPCServer::ThreadDispatcher::shutdown() {
49 std::unique_lock<std::mutex> Lock(DispatchMutex);
50 Running = false;
51 OutstandingCV.wait(lock&: Lock, p: [this]() { return Outstanding == 0; });
52}
53#endif
54
55StringMap<ExecutorAddr> SimpleRemoteEPCServer::defaultBootstrapSymbols() {
56 StringMap<ExecutorAddr> DBS;
57 rt_bootstrap::addTo(M&: DBS);
58 return DBS;
59}
60
61Expected<SimpleRemoteEPCTransportClient::HandleMessageAction>
62SimpleRemoteEPCServer::handleMessage(SimpleRemoteEPCOpcode OpC, uint64_t SeqNo,
63 ExecutorAddr TagAddr,
64 SimpleRemoteEPCArgBytesVector ArgBytes) {
65
66 LLVM_DEBUG({
67 dbgs() << "SimpleRemoteEPCServer::handleMessage: opc = ";
68 switch (OpC) {
69 case SimpleRemoteEPCOpcode::Setup:
70 dbgs() << "Setup";
71 assert(SeqNo == 0 && "Non-zero SeqNo for Setup?");
72 assert(!TagAddr && "Non-zero TagAddr for Setup?");
73 break;
74 case SimpleRemoteEPCOpcode::Hangup:
75 dbgs() << "Hangup";
76 assert(SeqNo == 0 && "Non-zero SeqNo for Hangup?");
77 assert(!TagAddr && "Non-zero TagAddr for Hangup?");
78 break;
79 case SimpleRemoteEPCOpcode::Result:
80 dbgs() << "Result";
81 assert(!TagAddr && "Non-zero TagAddr for Result?");
82 break;
83 case SimpleRemoteEPCOpcode::CallWrapper:
84 dbgs() << "CallWrapper";
85 break;
86 }
87 dbgs() << ", seqno = " << SeqNo << ", tag-addr = " << TagAddr
88 << ", arg-buffer = " << formatv("{0:x}", ArgBytes.size())
89 << " bytes\n";
90 });
91
92 using UT = std::underlying_type_t<SimpleRemoteEPCOpcode>;
93 if (static_cast<UT>(OpC) > static_cast<UT>(SimpleRemoteEPCOpcode::LastOpC))
94 return make_error<StringError>(Args: "Unexpected opcode",
95 Args: inconvertibleErrorCode());
96
97 // TODO: Clean detach message?
98 switch (OpC) {
99 case SimpleRemoteEPCOpcode::Setup:
100 return make_error<StringError>(Args: "Unexpected Setup opcode",
101 Args: inconvertibleErrorCode());
102 case SimpleRemoteEPCOpcode::Hangup:
103 return SimpleRemoteEPCTransportClient::EndSession;
104 case SimpleRemoteEPCOpcode::Result:
105 if (auto Err = handleResult(SeqNo, TagAddr, ArgBytes: std::move(ArgBytes)))
106 return std::move(Err);
107 break;
108 case SimpleRemoteEPCOpcode::CallWrapper:
109 handleCallWrapper(RemoteSeqNo: SeqNo, TagAddr, ArgBytes: std::move(ArgBytes));
110 break;
111 }
112 return ContinueSession;
113}
114
115Error SimpleRemoteEPCServer::waitForDisconnect() {
116 std::unique_lock<std::mutex> Lock(ServerStateMutex);
117 ShutdownCV.wait(lock&: Lock, p: [this]() { return RunState == ServerShutDown; });
118 return std::move(ShutdownErr);
119}
120
121void SimpleRemoteEPCServer::handleDisconnect(Error Err) {
122 PendingJITDispatchResultsMap TmpPending;
123
124 {
125 std::lock_guard<std::mutex> Lock(ServerStateMutex);
126 std::swap(a&: TmpPending, b&: PendingJITDispatchResults);
127 RunState = ServerShuttingDown;
128 }
129
130 // Send out-of-band errors to any waiting threads.
131 for (auto &KV : TmpPending)
132 KV.second->set_value(
133 shared::WrapperFunctionResult::createOutOfBandError(Msg: "disconnecting"));
134
135 // Wait for dispatcher to clear.
136 D->shutdown();
137
138 // Shut down services.
139 while (!Services.empty()) {
140 ShutdownErr =
141 joinErrors(E1: std::move(ShutdownErr), E2: Services.back()->shutdown());
142 Services.pop_back();
143 }
144
145 std::lock_guard<std::mutex> Lock(ServerStateMutex);
146 ShutdownErr = joinErrors(E1: std::move(ShutdownErr), E2: std::move(Err));
147 RunState = ServerShutDown;
148 ShutdownCV.notify_all();
149}
150
151Error SimpleRemoteEPCServer::sendMessage(SimpleRemoteEPCOpcode OpC,
152 uint64_t SeqNo, ExecutorAddr TagAddr,
153 ArrayRef<char> ArgBytes) {
154
155 LLVM_DEBUG({
156 dbgs() << "SimpleRemoteEPCServer::sendMessage: opc = ";
157 switch (OpC) {
158 case SimpleRemoteEPCOpcode::Setup:
159 dbgs() << "Setup";
160 assert(SeqNo == 0 && "Non-zero SeqNo for Setup?");
161 assert(!TagAddr && "Non-zero TagAddr for Setup?");
162 break;
163 case SimpleRemoteEPCOpcode::Hangup:
164 dbgs() << "Hangup";
165 assert(SeqNo == 0 && "Non-zero SeqNo for Hangup?");
166 assert(!TagAddr && "Non-zero TagAddr for Hangup?");
167 break;
168 case SimpleRemoteEPCOpcode::Result:
169 dbgs() << "Result";
170 assert(!TagAddr && "Non-zero TagAddr for Result?");
171 break;
172 case SimpleRemoteEPCOpcode::CallWrapper:
173 dbgs() << "CallWrapper";
174 break;
175 }
176 dbgs() << ", seqno = " << SeqNo << ", tag-addr = " << TagAddr
177 << ", arg-buffer = " << formatv("{0:x}", ArgBytes.size())
178 << " bytes\n";
179 });
180 auto Err = T->sendMessage(OpC, SeqNo, TagAddr, ArgBytes);
181 LLVM_DEBUG({
182 if (Err)
183 dbgs() << " \\--> SimpleRemoteEPC::sendMessage failed\n";
184 });
185 return Err;
186}
187
188Error SimpleRemoteEPCServer::sendSetupMessage(
189 StringMap<std::vector<char>> BootstrapMap,
190 StringMap<ExecutorAddr> BootstrapSymbols) {
191
192 using namespace SimpleRemoteEPCDefaultBootstrapSymbolNames;
193
194 SimpleRemoteEPCExecutorInfo EI;
195 EI.TargetTriple = sys::getProcessTriple();
196 if (auto PageSize = sys::Process::getPageSize())
197 EI.PageSize = *PageSize;
198 else
199 return PageSize.takeError();
200 EI.BootstrapMap = std::move(BootstrapMap);
201 EI.BootstrapSymbols = std::move(BootstrapSymbols);
202
203 assert(!EI.BootstrapSymbols.count(ExecutorSessionObjectName) &&
204 "Dispatch context name should not be set");
205 assert(!EI.BootstrapSymbols.count(DispatchFnName) &&
206 "Dispatch function name should not be set");
207 EI.BootstrapSymbols[ExecutorSessionObjectName] = ExecutorAddr::fromPtr(Ptr: this);
208 EI.BootstrapSymbols[DispatchFnName] = ExecutorAddr::fromPtr(Ptr: jitDispatchEntry);
209 EI.BootstrapSymbols[rt::RegisterEHFrameSectionAllocActionName] =
210 ExecutorAddr::fromPtr(Ptr: &llvm_orc_registerEHFrameSectionAllocAction);
211 EI.BootstrapSymbols[rt::DeregisterEHFrameSectionAllocActionName] =
212 ExecutorAddr::fromPtr(Ptr: &llvm_orc_deregisterEHFrameSectionAllocAction);
213
214 using SPSSerialize =
215 shared::SPSArgList<shared::SPSSimpleRemoteEPCExecutorInfo>;
216 auto SetupPacketBytes =
217 shared::WrapperFunctionResult::allocate(Size: SPSSerialize::size(Arg: EI));
218 shared::SPSOutputBuffer OB(SetupPacketBytes.data(), SetupPacketBytes.size());
219 if (!SPSSerialize::serialize(OB, Arg: EI))
220 return make_error<StringError>(Args: "Could not send setup packet",
221 Args: inconvertibleErrorCode());
222
223 return sendMessage(OpC: SimpleRemoteEPCOpcode::Setup, SeqNo: 0, TagAddr: ExecutorAddr(),
224 ArgBytes: {SetupPacketBytes.data(), SetupPacketBytes.size()});
225}
226
227Error SimpleRemoteEPCServer::handleResult(
228 uint64_t SeqNo, ExecutorAddr TagAddr,
229 SimpleRemoteEPCArgBytesVector ArgBytes) {
230 std::promise<shared::WrapperFunctionResult> *P = nullptr;
231 {
232 std::lock_guard<std::mutex> Lock(ServerStateMutex);
233 auto I = PendingJITDispatchResults.find(Val: SeqNo);
234 if (I == PendingJITDispatchResults.end())
235 return make_error<StringError>(Args: "No call for sequence number " +
236 Twine(SeqNo),
237 Args: inconvertibleErrorCode());
238 P = I->second;
239 PendingJITDispatchResults.erase(I);
240 releaseSeqNo(SeqNo);
241 }
242 auto R = shared::WrapperFunctionResult::allocate(Size: ArgBytes.size());
243 memcpy(dest: R.data(), src: ArgBytes.data(), n: ArgBytes.size());
244 P->set_value(std::move(R));
245 return Error::success();
246}
247
248void SimpleRemoteEPCServer::handleCallWrapper(
249 uint64_t RemoteSeqNo, ExecutorAddr TagAddr,
250 SimpleRemoteEPCArgBytesVector ArgBytes) {
251 D->dispatch(Work: [this, RemoteSeqNo, TagAddr, ArgBytes = std::move(ArgBytes)]() {
252 using WrapperFnTy =
253 shared::CWrapperFunctionResult (*)(const char *, size_t);
254 auto *Fn = TagAddr.toPtr<WrapperFnTy>();
255 shared::WrapperFunctionResult ResultBytes(
256 Fn(ArgBytes.data(), ArgBytes.size()));
257 if (auto Err = sendMessage(OpC: SimpleRemoteEPCOpcode::Result, SeqNo: RemoteSeqNo,
258 TagAddr: ExecutorAddr(),
259 ArgBytes: {ResultBytes.data(), ResultBytes.size()}))
260 ReportError(std::move(Err));
261 });
262}
263
264shared::WrapperFunctionResult
265SimpleRemoteEPCServer::doJITDispatch(const void *FnTag, const char *ArgData,
266 size_t ArgSize) {
267 uint64_t SeqNo;
268 std::promise<shared::WrapperFunctionResult> ResultP;
269 auto ResultF = ResultP.get_future();
270 {
271 std::lock_guard<std::mutex> Lock(ServerStateMutex);
272 if (RunState != ServerRunning)
273 return shared::WrapperFunctionResult::createOutOfBandError(
274 Msg: "jit_dispatch not available (EPC server shut down)");
275
276 SeqNo = getNextSeqNo();
277 assert(!PendingJITDispatchResults.count(SeqNo) && "SeqNo already in use");
278 PendingJITDispatchResults[SeqNo] = &ResultP;
279 }
280
281 if (auto Err = sendMessage(OpC: SimpleRemoteEPCOpcode::CallWrapper, SeqNo,
282 TagAddr: ExecutorAddr::fromPtr(Ptr: FnTag), ArgBytes: {ArgData, ArgSize}))
283 ReportError(std::move(Err));
284
285 return ResultF.get();
286}
287
288shared::CWrapperFunctionResult
289SimpleRemoteEPCServer::jitDispatchEntry(void *DispatchCtx, const void *FnTag,
290 const char *ArgData, size_t ArgSize) {
291 return reinterpret_cast<SimpleRemoteEPCServer *>(DispatchCtx)
292 ->doJITDispatch(FnTag, ArgData, ArgSize)
293 .release();
294}
295
296} // end namespace orc
297} // end namespace llvm
298