1//===------- SimpleRemoteEPC.cpp -- Simple remote executor control --------===//
2//
3// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4// See https://llvm.org/LICENSE.txt for license information.
5// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
6//
7//===----------------------------------------------------------------------===//
8
9#include "llvm/ExecutionEngine/Orc/SimpleRemoteEPC.h"
10#include "llvm/ExecutionEngine/Orc/EPCGenericDylibManager.h"
11#include "llvm/ExecutionEngine/Orc/EPCGenericJITLinkMemoryManager.h"
12#include "llvm/ExecutionEngine/Orc/EPCGenericMemoryAccess.h"
13#include "llvm/ExecutionEngine/Orc/Shared/OrcRTBridge.h"
14#include "llvm/Support/FormatVariadic.h"
15
16#define DEBUG_TYPE "orc"
17
18namespace llvm {
19namespace orc {
20
21SimpleRemoteEPC::~SimpleRemoteEPC() {
22#ifndef NDEBUG
23 std::lock_guard<std::mutex> Lock(SimpleRemoteEPCMutex);
24 assert(Disconnected && "Destroyed without disconnection");
25#endif // NDEBUG
26}
27
28Expected<int32_t> SimpleRemoteEPC::runAsMain(ExecutorAddr MainFnAddr,
29 ArrayRef<std::string> Args) {
30 int64_t Result = 0;
31 if (auto Err = callSPSWrapper<rt::SPSRunAsMainSignature>(
32 WrapperFnAddr: RunAsMainAddr, WrapperCallArgs&: Result, WrapperCallArgs&: MainFnAddr, WrapperCallArgs&: Args))
33 return std::move(Err);
34 return Result;
35}
36
37Expected<int32_t> SimpleRemoteEPC::runAsVoidFunction(ExecutorAddr VoidFnAddr) {
38 int32_t Result = 0;
39 if (auto Err = callSPSWrapper<rt::SPSRunAsVoidFunctionSignature>(
40 WrapperFnAddr: RunAsVoidFunctionAddr, WrapperCallArgs&: Result, WrapperCallArgs&: VoidFnAddr))
41 return std::move(Err);
42 return Result;
43}
44
45Expected<int32_t> SimpleRemoteEPC::runAsIntFunction(ExecutorAddr IntFnAddr,
46 int Arg) {
47 int32_t Result = 0;
48 if (auto Err = callSPSWrapper<rt::SPSRunAsIntFunctionSignature>(
49 WrapperFnAddr: RunAsIntFunctionAddr, WrapperCallArgs&: Result, WrapperCallArgs&: IntFnAddr, WrapperCallArgs&: Arg))
50 return std::move(Err);
51 return Result;
52}
53
54void SimpleRemoteEPC::callWrapperAsync(ExecutorAddr WrapperFnAddr,
55 IncomingWFRHandler OnComplete,
56 ArrayRef<char> ArgBuffer) {
57 uint64_t SeqNo;
58 {
59 std::lock_guard<std::mutex> Lock(SimpleRemoteEPCMutex);
60 SeqNo = getNextSeqNo();
61 assert(!PendingCallWrapperResults.count(SeqNo) && "SeqNo already in use");
62 PendingCallWrapperResults[SeqNo] = std::move(OnComplete);
63 }
64
65 if (auto Err = sendMessage(OpC: SimpleRemoteEPCOpcode::CallWrapper, SeqNo,
66 TagAddr: WrapperFnAddr, ArgBytes: ArgBuffer)) {
67 IncomingWFRHandler H;
68
69 // We just registered OnComplete, but there may be a race between this
70 // thread returning from sendMessage and handleDisconnect being called from
71 // the transport's listener thread. If handleDisconnect gets there first
72 // then it will have failed 'H' for us. If we get there first (or if
73 // handleDisconnect already ran) then we need to take care of it.
74 {
75 std::lock_guard<std::mutex> Lock(SimpleRemoteEPCMutex);
76 auto I = PendingCallWrapperResults.find(Val: SeqNo);
77 if (I != PendingCallWrapperResults.end()) {
78 H = std::move(I->second);
79 PendingCallWrapperResults.erase(I);
80 }
81 }
82
83 if (H)
84 H(shared::WrapperFunctionBuffer::createOutOfBandError(Msg: "disconnecting"));
85
86 getExecutionSession().reportError(Err: std::move(Err));
87 }
88}
89
90Expected<std::unique_ptr<jitlink::JITLinkMemoryManager>>
91SimpleRemoteEPC::createDefaultMemoryManager() {
92 return EPCGenericJITLinkMemoryManager::Create(ES&: getExecutionSession());
93}
94
95Expected<std::unique_ptr<DylibManager>>
96SimpleRemoteEPC::createDefaultDylibMgr() {
97 auto DM = EPCGenericDylibManager::CreateWithDefaultBootstrapSymbols(EPC&: *this);
98 if (!DM)
99 return DM.takeError();
100 return std::make_unique<EPCGenericDylibManager>(args: std::move(*DM));
101}
102
103Expected<std::unique_ptr<MemoryAccess>>
104SimpleRemoteEPC::createDefaultMemoryAccess() {
105 EPCGenericMemoryAccess::FuncAddrs FAs;
106 if (auto Err = getBootstrapSymbols(
107 Pairs: {{FAs.WriteUInt8s, rt::MemoryWriteUInt8sWrapperName},
108 {FAs.WriteUInt16s, rt::MemoryWriteUInt16sWrapperName},
109 {FAs.WriteUInt32s, rt::MemoryWriteUInt32sWrapperName},
110 {FAs.WriteUInt64s, rt::MemoryWriteUInt64sWrapperName},
111 {FAs.WriteBuffers, rt::MemoryWriteBuffersWrapperName},
112 {FAs.WritePointers, rt::MemoryWritePointersWrapperName},
113 {FAs.ReadUInt8s, rt::MemoryReadUInt8sWrapperName},
114 {FAs.ReadUInt16s, rt::MemoryReadUInt16sWrapperName},
115 {FAs.ReadUInt32s, rt::MemoryReadUInt32sWrapperName},
116 {FAs.ReadUInt64s, rt::MemoryReadUInt64sWrapperName},
117 {FAs.ReadBuffers, rt::MemoryReadBuffersWrapperName},
118 {FAs.ReadStrings, rt::MemoryReadStringsWrapperName}}))
119 return std::move(Err);
120
121 return std::make_unique<EPCGenericMemoryAccess>(args&: *this, args&: FAs);
122}
123
124Error SimpleRemoteEPC::disconnect() {
125 T->disconnect();
126 D->shutdown();
127 std::unique_lock<std::mutex> Lock(SimpleRemoteEPCMutex);
128 DisconnectCV.wait(lock&: Lock, p: [this] { return Disconnected; });
129 return std::move(DisconnectErr);
130}
131
132Expected<SimpleRemoteEPCTransportClient::HandleMessageAction>
133SimpleRemoteEPC::handleMessage(SimpleRemoteEPCOpcode OpC, uint64_t SeqNo,
134 ExecutorAddr TagAddr,
135 shared::WrapperFunctionBuffer ArgBytes) {
136
137 LLVM_DEBUG({
138 dbgs() << "SimpleRemoteEPC::handleMessage: opc = ";
139 switch (OpC) {
140 case SimpleRemoteEPCOpcode::Setup:
141 dbgs() << "Setup";
142 assert(SeqNo == 0 && "Non-zero SeqNo for Setup?");
143 assert(!TagAddr && "Non-zero TagAddr for Setup?");
144 break;
145 case SimpleRemoteEPCOpcode::Hangup:
146 dbgs() << "Hangup";
147 assert(SeqNo == 0 && "Non-zero SeqNo for Hangup?");
148 assert(!TagAddr && "Non-zero TagAddr for Hangup?");
149 break;
150 case SimpleRemoteEPCOpcode::Result:
151 dbgs() << "Result";
152 assert(!TagAddr && "Non-zero TagAddr for Result?");
153 break;
154 case SimpleRemoteEPCOpcode::CallWrapper:
155 dbgs() << "CallWrapper";
156 break;
157 }
158 dbgs() << ", seqno = " << SeqNo << ", tag-addr = " << TagAddr
159 << ", arg-buffer = " << formatv("{0:x}", ArgBytes.size())
160 << " bytes\n";
161 });
162
163 using UT = std::underlying_type_t<SimpleRemoteEPCOpcode>;
164 if (static_cast<UT>(OpC) > static_cast<UT>(SimpleRemoteEPCOpcode::LastOpC))
165 return make_error<StringError>(Args: "Unexpected opcode",
166 Args: inconvertibleErrorCode());
167
168 switch (OpC) {
169 case SimpleRemoteEPCOpcode::Setup:
170 if (auto Err = handleSetup(SeqNo, TagAddr, ArgBytes: std::move(ArgBytes)))
171 return std::move(Err);
172 break;
173 case SimpleRemoteEPCOpcode::Hangup:
174 T->disconnect();
175 if (auto Err = handleHangup(ArgBytes: std::move(ArgBytes)))
176 return std::move(Err);
177 return EndSession;
178 case SimpleRemoteEPCOpcode::Result:
179 if (auto Err = handleResult(SeqNo, TagAddr, ArgBytes: std::move(ArgBytes)))
180 return std::move(Err);
181 break;
182 case SimpleRemoteEPCOpcode::CallWrapper:
183 handleCallWrapper(RemoteSeqNo: SeqNo, TagAddr, ArgBytes: std::move(ArgBytes));
184 break;
185 }
186 return ContinueSession;
187}
188
189void SimpleRemoteEPC::handleDisconnect(Error Err) {
190 LLVM_DEBUG({
191 dbgs() << "SimpleRemoteEPC::handleDisconnect: "
192 << (Err ? "failure" : "success") << "\n";
193 });
194
195 PendingCallWrapperResultsMap TmpPending;
196
197 {
198 std::lock_guard<std::mutex> Lock(SimpleRemoteEPCMutex);
199 std::swap(a&: TmpPending, b&: PendingCallWrapperResults);
200 }
201
202 for (auto &KV : TmpPending)
203 KV.second(
204 shared::WrapperFunctionBuffer::createOutOfBandError(Msg: "disconnecting"));
205
206 std::lock_guard<std::mutex> Lock(SimpleRemoteEPCMutex);
207 DisconnectErr = joinErrors(E1: std::move(DisconnectErr), E2: std::move(Err));
208 Disconnected = true;
209 DisconnectCV.notify_all();
210}
211
212Expected<std::unique_ptr<jitlink::JITLinkMemoryManager>>
213SimpleRemoteEPC::createDefaultMemoryManager(SimpleRemoteEPC &SREPC) {
214 EPCGenericJITLinkMemoryManager::SymbolAddrs SAs;
215 if (auto Err = SREPC.getBootstrapSymbols(
216 Pairs: {{SAs.Allocator, rt::SimpleExecutorMemoryManagerInstanceName},
217 {SAs.Reserve, rt::SimpleExecutorMemoryManagerReserveWrapperName},
218 {SAs.Initialize,
219 rt::SimpleExecutorMemoryManagerInitializeWrapperName},
220 {SAs.Release, rt::SimpleExecutorMemoryManagerReleaseWrapperName}}))
221 return std::move(Err);
222
223 return std::make_unique<EPCGenericJITLinkMemoryManager>(args&: SREPC, args&: SAs);
224}
225
226Error SimpleRemoteEPC::sendMessage(SimpleRemoteEPCOpcode OpC, uint64_t SeqNo,
227 ExecutorAddr TagAddr,
228 ArrayRef<char> ArgBytes) {
229 assert(OpC != SimpleRemoteEPCOpcode::Setup &&
230 "SimpleRemoteEPC sending Setup message? That's the wrong direction.");
231
232 LLVM_DEBUG({
233 dbgs() << "SimpleRemoteEPC::sendMessage: opc = ";
234 switch (OpC) {
235 case SimpleRemoteEPCOpcode::Hangup:
236 dbgs() << "Hangup";
237 assert(SeqNo == 0 && "Non-zero SeqNo for Hangup?");
238 assert(!TagAddr && "Non-zero TagAddr for Hangup?");
239 break;
240 case SimpleRemoteEPCOpcode::Result:
241 dbgs() << "Result";
242 assert(!TagAddr && "Non-zero TagAddr for Result?");
243 break;
244 case SimpleRemoteEPCOpcode::CallWrapper:
245 dbgs() << "CallWrapper";
246 break;
247 default:
248 llvm_unreachable("Invalid opcode");
249 }
250 dbgs() << ", seqno = " << SeqNo << ", tag-addr = " << TagAddr
251 << ", arg-buffer = " << formatv("{0:x}", ArgBytes.size())
252 << " bytes\n";
253 });
254 auto Err = T->sendMessage(OpC, SeqNo, TagAddr, ArgBytes);
255 LLVM_DEBUG({
256 if (Err)
257 dbgs() << " \\--> SimpleRemoteEPC::sendMessage failed\n";
258 });
259 return Err;
260}
261
262Error SimpleRemoteEPC::handleSetup(uint64_t SeqNo, ExecutorAddr TagAddr,
263 shared::WrapperFunctionBuffer ArgBytes) {
264 if (SeqNo != 0)
265 return make_error<StringError>(Args: "Setup packet SeqNo not zero",
266 Args: inconvertibleErrorCode());
267
268 if (TagAddr)
269 return make_error<StringError>(Args: "Setup packet TagAddr not zero",
270 Args: inconvertibleErrorCode());
271
272 std::lock_guard<std::mutex> Lock(SimpleRemoteEPCMutex);
273 auto I = PendingCallWrapperResults.find(Val: 0);
274 assert(PendingCallWrapperResults.size() == 1 &&
275 I != PendingCallWrapperResults.end() &&
276 "Setup message handler not connectly set up");
277 auto SetupMsgHandler = std::move(I->second);
278 PendingCallWrapperResults.erase(I);
279
280 auto WFR =
281 shared::WrapperFunctionBuffer::copyFrom(Source: ArgBytes.data(), Size: ArgBytes.size());
282 SetupMsgHandler(std::move(WFR));
283 return Error::success();
284}
285
286Error SimpleRemoteEPC::setup() {
287 using namespace SimpleRemoteEPCDefaultBootstrapSymbolNames;
288
289 std::promise<MSVCPExpected<SimpleRemoteEPCExecutorInfo>> EIP;
290 auto EIF = EIP.get_future();
291
292 // Prepare a handler for the setup packet.
293 PendingCallWrapperResults[0] =
294 RunInPlace()(
295 [&](shared::WrapperFunctionBuffer SetupMsgBytes) {
296 if (const char *ErrMsg = SetupMsgBytes.getOutOfBandError()) {
297 EIP.set_value(
298 make_error<StringError>(Args&: ErrMsg, Args: inconvertibleErrorCode()));
299 return;
300 }
301 using SPSSerialize =
302 shared::SPSArgList<shared::SPSSimpleRemoteEPCExecutorInfo>;
303 shared::SPSInputBuffer IB(SetupMsgBytes.data(), SetupMsgBytes.size());
304 SimpleRemoteEPCExecutorInfo EI;
305 if (SPSSerialize::deserialize(IB, Arg&: EI))
306 EIP.set_value(EI);
307 else
308 EIP.set_value(make_error<StringError>(
309 Args: "Could not deserialize setup message", Args: inconvertibleErrorCode()));
310 });
311
312 // Start the transport.
313 if (auto Err = T->start())
314 return Err;
315
316 // Wait for setup packet to arrive.
317 auto EI = EIF.get();
318 if (!EI) {
319 T->disconnect();
320 return EI.takeError();
321 }
322
323 LLVM_DEBUG({
324 dbgs() << "SimpleRemoteEPC received setup message:\n"
325 << " Triple: " << EI->TargetTriple << "\n"
326 << " Page size: " << EI->PageSize << "\n"
327 << " Bootstrap map" << (EI->BootstrapMap.empty() ? " empty" : ":")
328 << "\n";
329 for (const auto &KV : EI->BootstrapMap)
330 dbgs() << " " << KV.first() << ": " << KV.second.size()
331 << "-byte SPS encoded buffer\n";
332 dbgs() << " Bootstrap symbols"
333 << (EI->BootstrapSymbols.empty() ? " empty" : ":") << "\n";
334 for (const auto &KV : EI->BootstrapSymbols)
335 dbgs() << " " << KV.first() << ": " << KV.second << "\n";
336 });
337 TargetTriple = Triple(EI->TargetTriple);
338 PageSize = EI->PageSize;
339 BootstrapMap = std::move(EI->BootstrapMap);
340 BootstrapSymbols = std::move(EI->BootstrapSymbols);
341
342 if (auto Err = getBootstrapSymbols(
343 Pairs: {{JDI.JITDispatchContext, ExecutorSessionObjectName},
344 {JDI.JITDispatchFunction, DispatchFnName},
345 {RunAsMainAddr, rt::RunAsMainWrapperName},
346 {RunAsVoidFunctionAddr, rt::RunAsVoidFunctionWrapperName},
347 {RunAsIntFunctionAddr, rt::RunAsIntFunctionWrapperName}}))
348 return Err;
349
350 return Error::success();
351}
352
353Error SimpleRemoteEPC::handleResult(uint64_t SeqNo, ExecutorAddr TagAddr,
354 shared::WrapperFunctionBuffer ArgBytes) {
355 IncomingWFRHandler SendResult;
356
357 if (TagAddr)
358 return make_error<StringError>(Args: "Unexpected TagAddr in result message",
359 Args: inconvertibleErrorCode());
360
361 {
362 std::lock_guard<std::mutex> Lock(SimpleRemoteEPCMutex);
363 auto I = PendingCallWrapperResults.find(Val: SeqNo);
364 if (I == PendingCallWrapperResults.end())
365 return make_error<StringError>(Args: "No call for sequence number " +
366 Twine(SeqNo),
367 Args: inconvertibleErrorCode());
368 SendResult = std::move(I->second);
369 PendingCallWrapperResults.erase(I);
370 releaseSeqNo(SeqNo);
371 }
372
373 auto WFR =
374 shared::WrapperFunctionBuffer::copyFrom(Source: ArgBytes.data(), Size: ArgBytes.size());
375 SendResult(std::move(WFR));
376 return Error::success();
377}
378
379void SimpleRemoteEPC::handleCallWrapper(
380 uint64_t RemoteSeqNo, ExecutorAddr TagAddr,
381 shared::WrapperFunctionBuffer ArgBytes) {
382 assert(ES && "No ExecutionSession attached");
383 D->dispatch(T: makeGenericNamedTask(
384 Fn: [this, RemoteSeqNo, TagAddr, ArgBytes = std::move(ArgBytes)]() mutable {
385 ES->runJITDispatchHandler(
386 SendResult: [this, RemoteSeqNo](shared::WrapperFunctionBuffer WFR) {
387 if (auto Err =
388 sendMessage(OpC: SimpleRemoteEPCOpcode::Result, SeqNo: RemoteSeqNo,
389 TagAddr: ExecutorAddr(), ArgBytes: {WFR.data(), WFR.size()}))
390 getExecutionSession().reportError(Err: std::move(Err));
391 },
392 HandlerFnTagAddr: TagAddr, ArgBytes: std::move(ArgBytes));
393 },
394 Desc: "callWrapper task"));
395}
396
397Error SimpleRemoteEPC::handleHangup(shared::WrapperFunctionBuffer ArgBytes) {
398 using namespace llvm::orc::shared;
399 auto WFR = WrapperFunctionBuffer::copyFrom(Source: ArgBytes.data(), Size: ArgBytes.size());
400 if (const char *ErrMsg = WFR.getOutOfBandError())
401 return make_error<StringError>(Args&: ErrMsg, Args: inconvertibleErrorCode());
402
403 orc::shared::detail::SPSSerializableError Info;
404 SPSInputBuffer IB(WFR.data(), WFR.size());
405 if (!SPSArgList<SPSError>::deserialize(IB, Arg&: Info))
406 return make_error<StringError>(Args: "Could not deserialize hangup info",
407 Args: inconvertibleErrorCode());
408 return fromSPSSerializable(BSE: std::move(Info));
409}
410
411} // end namespace orc
412} // end namespace llvm
413