1//===------- SimpleRemoteEPC.cpp -- Simple remote executor control --------===//
2//
3// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4// See https://llvm.org/LICENSE.txt for license information.
5// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
6//
7//===----------------------------------------------------------------------===//
8
9#include "llvm/ExecutionEngine/Orc/SimpleRemoteEPC.h"
10#include "llvm/ExecutionEngine/Orc/EPCGenericDylibManager.h"
11#include "llvm/ExecutionEngine/Orc/EPCGenericJITLinkMemoryManager.h"
12#include "llvm/ExecutionEngine/Orc/Shared/OrcRTBridge.h"
13#include "llvm/Support/FormatVariadic.h"
14
15#define DEBUG_TYPE "orc"
16
17namespace llvm {
18namespace orc {
19
20SimpleRemoteEPC::~SimpleRemoteEPC() {
21#ifndef NDEBUG
22 std::lock_guard<std::mutex> Lock(SimpleRemoteEPCMutex);
23 assert(Disconnected && "Destroyed without disconnection");
24#endif // NDEBUG
25}
26
27Expected<int32_t> SimpleRemoteEPC::runAsMain(ExecutorAddr MainFnAddr,
28 ArrayRef<std::string> Args) {
29 int64_t Result = 0;
30 if (auto Err = callSPSWrapper<rt::SPSRunAsMainSignature>(
31 WrapperFnAddr: RunAsMainAddr, WrapperCallArgs&: Result, WrapperCallArgs&: MainFnAddr, WrapperCallArgs&: Args))
32 return std::move(Err);
33 return Result;
34}
35
36Expected<int32_t> SimpleRemoteEPC::runAsVoidFunction(ExecutorAddr VoidFnAddr) {
37 int32_t Result = 0;
38 if (auto Err = callSPSWrapper<rt::SPSRunAsVoidFunctionSignature>(
39 WrapperFnAddr: RunAsVoidFunctionAddr, WrapperCallArgs&: Result, WrapperCallArgs&: VoidFnAddr))
40 return std::move(Err);
41 return Result;
42}
43
44Expected<int32_t> SimpleRemoteEPC::runAsIntFunction(ExecutorAddr IntFnAddr,
45 int Arg) {
46 int32_t Result = 0;
47 if (auto Err = callSPSWrapper<rt::SPSRunAsIntFunctionSignature>(
48 WrapperFnAddr: RunAsIntFunctionAddr, WrapperCallArgs&: Result, WrapperCallArgs&: IntFnAddr, WrapperCallArgs&: Arg))
49 return std::move(Err);
50 return Result;
51}
52
53void SimpleRemoteEPC::callWrapperAsync(ExecutorAddr WrapperFnAddr,
54 IncomingWFRHandler OnComplete,
55 ArrayRef<char> ArgBuffer) {
56 uint64_t SeqNo;
57 {
58 std::lock_guard<std::mutex> Lock(SimpleRemoteEPCMutex);
59 SeqNo = getNextSeqNo();
60 assert(!PendingCallWrapperResults.count(SeqNo) && "SeqNo already in use");
61 PendingCallWrapperResults[SeqNo] = std::move(OnComplete);
62 }
63
64 if (auto Err = sendMessage(OpC: SimpleRemoteEPCOpcode::CallWrapper, SeqNo,
65 TagAddr: WrapperFnAddr, ArgBytes: ArgBuffer)) {
66 IncomingWFRHandler H;
67
68 // We just registered OnComplete, but there may be a race between this
69 // thread returning from sendMessage and handleDisconnect being called from
70 // the transport's listener thread. If handleDisconnect gets there first
71 // then it will have failed 'H' for us. If we get there first (or if
72 // handleDisconnect already ran) then we need to take care of it.
73 {
74 std::lock_guard<std::mutex> Lock(SimpleRemoteEPCMutex);
75 auto I = PendingCallWrapperResults.find(Val: SeqNo);
76 if (I != PendingCallWrapperResults.end()) {
77 H = std::move(I->second);
78 PendingCallWrapperResults.erase(I);
79 }
80 }
81
82 if (H)
83 H(shared::WrapperFunctionBuffer::createOutOfBandError(Msg: "disconnecting"));
84
85 getExecutionSession().reportError(Err: std::move(Err));
86 }
87}
88
89Expected<std::unique_ptr<DylibManager>>
90SimpleRemoteEPC::createDefaultDylibMgr() {
91 auto DM = EPCGenericDylibManager::CreateWithDefaultBootstrapSymbols(EPC&: *this);
92 if (!DM)
93 return DM.takeError();
94 return std::make_unique<EPCGenericDylibManager>(args: std::move(*DM));
95}
96
97Error SimpleRemoteEPC::disconnect() {
98 T->disconnect();
99 D->shutdown();
100 std::unique_lock<std::mutex> Lock(SimpleRemoteEPCMutex);
101 DisconnectCV.wait(lock&: Lock, p: [this] { return Disconnected; });
102 return std::move(DisconnectErr);
103}
104
105Expected<SimpleRemoteEPCTransportClient::HandleMessageAction>
106SimpleRemoteEPC::handleMessage(SimpleRemoteEPCOpcode OpC, uint64_t SeqNo,
107 ExecutorAddr TagAddr,
108 shared::WrapperFunctionBuffer ArgBytes) {
109
110 LLVM_DEBUG({
111 dbgs() << "SimpleRemoteEPC::handleMessage: opc = ";
112 switch (OpC) {
113 case SimpleRemoteEPCOpcode::Setup:
114 dbgs() << "Setup";
115 assert(SeqNo == 0 && "Non-zero SeqNo for Setup?");
116 assert(!TagAddr && "Non-zero TagAddr for Setup?");
117 break;
118 case SimpleRemoteEPCOpcode::Hangup:
119 dbgs() << "Hangup";
120 assert(SeqNo == 0 && "Non-zero SeqNo for Hangup?");
121 assert(!TagAddr && "Non-zero TagAddr for Hangup?");
122 break;
123 case SimpleRemoteEPCOpcode::Result:
124 dbgs() << "Result";
125 assert(!TagAddr && "Non-zero TagAddr for Result?");
126 break;
127 case SimpleRemoteEPCOpcode::CallWrapper:
128 dbgs() << "CallWrapper";
129 break;
130 }
131 dbgs() << ", seqno = " << SeqNo << ", tag-addr = " << TagAddr
132 << ", arg-buffer = " << formatv("{0:x}", ArgBytes.size())
133 << " bytes\n";
134 });
135
136 using UT = std::underlying_type_t<SimpleRemoteEPCOpcode>;
137 if (static_cast<UT>(OpC) > static_cast<UT>(SimpleRemoteEPCOpcode::LastOpC))
138 return make_error<StringError>(Args: "Unexpected opcode",
139 Args: inconvertibleErrorCode());
140
141 switch (OpC) {
142 case SimpleRemoteEPCOpcode::Setup:
143 if (auto Err = handleSetup(SeqNo, TagAddr, ArgBytes: std::move(ArgBytes)))
144 return std::move(Err);
145 break;
146 case SimpleRemoteEPCOpcode::Hangup:
147 T->disconnect();
148 if (auto Err = handleHangup(ArgBytes: std::move(ArgBytes)))
149 return std::move(Err);
150 return EndSession;
151 case SimpleRemoteEPCOpcode::Result:
152 if (auto Err = handleResult(SeqNo, TagAddr, ArgBytes: std::move(ArgBytes)))
153 return std::move(Err);
154 break;
155 case SimpleRemoteEPCOpcode::CallWrapper:
156 handleCallWrapper(RemoteSeqNo: SeqNo, TagAddr, ArgBytes: std::move(ArgBytes));
157 break;
158 }
159 return ContinueSession;
160}
161
162void SimpleRemoteEPC::handleDisconnect(Error Err) {
163 LLVM_DEBUG({
164 dbgs() << "SimpleRemoteEPC::handleDisconnect: "
165 << (Err ? "failure" : "success") << "\n";
166 });
167
168 PendingCallWrapperResultsMap TmpPending;
169
170 {
171 std::lock_guard<std::mutex> Lock(SimpleRemoteEPCMutex);
172 std::swap(a&: TmpPending, b&: PendingCallWrapperResults);
173 }
174
175 for (auto &KV : TmpPending)
176 KV.second(
177 shared::WrapperFunctionBuffer::createOutOfBandError(Msg: "disconnecting"));
178
179 std::lock_guard<std::mutex> Lock(SimpleRemoteEPCMutex);
180 DisconnectErr = joinErrors(E1: std::move(DisconnectErr), E2: std::move(Err));
181 Disconnected = true;
182 DisconnectCV.notify_all();
183}
184
185Expected<std::unique_ptr<jitlink::JITLinkMemoryManager>>
186SimpleRemoteEPC::createDefaultMemoryManager(SimpleRemoteEPC &SREPC) {
187 EPCGenericJITLinkMemoryManager::SymbolAddrs SAs;
188 if (auto Err = SREPC.getBootstrapSymbols(
189 Pairs: {{SAs.Allocator, rt::SimpleExecutorMemoryManagerInstanceName},
190 {SAs.Reserve, rt::SimpleExecutorMemoryManagerReserveWrapperName},
191 {SAs.Initialize,
192 rt::SimpleExecutorMemoryManagerInitializeWrapperName},
193 {SAs.Release, rt::SimpleExecutorMemoryManagerReleaseWrapperName}}))
194 return std::move(Err);
195
196 return std::make_unique<EPCGenericJITLinkMemoryManager>(args&: SREPC, args&: SAs);
197}
198
199Expected<std::unique_ptr<MemoryAccess>>
200SimpleRemoteEPC::createDefaultMemoryAccess(SimpleRemoteEPC &SREPC) {
201 EPCGenericMemoryAccess::FuncAddrs FAs;
202 if (auto Err = SREPC.getBootstrapSymbols(
203 Pairs: {{FAs.WriteUInt8s, rt::MemoryWriteUInt8sWrapperName},
204 {FAs.WriteUInt16s, rt::MemoryWriteUInt16sWrapperName},
205 {FAs.WriteUInt32s, rt::MemoryWriteUInt32sWrapperName},
206 {FAs.WriteUInt64s, rt::MemoryWriteUInt64sWrapperName},
207 {FAs.WriteBuffers, rt::MemoryWriteBuffersWrapperName},
208 {FAs.WritePointers, rt::MemoryWritePointersWrapperName},
209 {FAs.ReadUInt8s, rt::MemoryReadUInt8sWrapperName},
210 {FAs.ReadUInt16s, rt::MemoryReadUInt16sWrapperName},
211 {FAs.ReadUInt32s, rt::MemoryReadUInt32sWrapperName},
212 {FAs.ReadUInt64s, rt::MemoryReadUInt64sWrapperName},
213 {FAs.ReadBuffers, rt::MemoryReadBuffersWrapperName},
214 {FAs.ReadStrings, rt::MemoryReadStringsWrapperName}}))
215 return std::move(Err);
216
217 return std::make_unique<EPCGenericMemoryAccess>(args&: SREPC, args&: FAs);
218}
219
220Error SimpleRemoteEPC::sendMessage(SimpleRemoteEPCOpcode OpC, uint64_t SeqNo,
221 ExecutorAddr TagAddr,
222 ArrayRef<char> ArgBytes) {
223 assert(OpC != SimpleRemoteEPCOpcode::Setup &&
224 "SimpleRemoteEPC sending Setup message? That's the wrong direction.");
225
226 LLVM_DEBUG({
227 dbgs() << "SimpleRemoteEPC::sendMessage: opc = ";
228 switch (OpC) {
229 case SimpleRemoteEPCOpcode::Hangup:
230 dbgs() << "Hangup";
231 assert(SeqNo == 0 && "Non-zero SeqNo for Hangup?");
232 assert(!TagAddr && "Non-zero TagAddr for Hangup?");
233 break;
234 case SimpleRemoteEPCOpcode::Result:
235 dbgs() << "Result";
236 assert(!TagAddr && "Non-zero TagAddr for Result?");
237 break;
238 case SimpleRemoteEPCOpcode::CallWrapper:
239 dbgs() << "CallWrapper";
240 break;
241 default:
242 llvm_unreachable("Invalid opcode");
243 }
244 dbgs() << ", seqno = " << SeqNo << ", tag-addr = " << TagAddr
245 << ", arg-buffer = " << formatv("{0:x}", ArgBytes.size())
246 << " bytes\n";
247 });
248 auto Err = T->sendMessage(OpC, SeqNo, TagAddr, ArgBytes);
249 LLVM_DEBUG({
250 if (Err)
251 dbgs() << " \\--> SimpleRemoteEPC::sendMessage failed\n";
252 });
253 return Err;
254}
255
256Error SimpleRemoteEPC::handleSetup(uint64_t SeqNo, ExecutorAddr TagAddr,
257 shared::WrapperFunctionBuffer ArgBytes) {
258 if (SeqNo != 0)
259 return make_error<StringError>(Args: "Setup packet SeqNo not zero",
260 Args: inconvertibleErrorCode());
261
262 if (TagAddr)
263 return make_error<StringError>(Args: "Setup packet TagAddr not zero",
264 Args: inconvertibleErrorCode());
265
266 std::lock_guard<std::mutex> Lock(SimpleRemoteEPCMutex);
267 auto I = PendingCallWrapperResults.find(Val: 0);
268 assert(PendingCallWrapperResults.size() == 1 &&
269 I != PendingCallWrapperResults.end() &&
270 "Setup message handler not connectly set up");
271 auto SetupMsgHandler = std::move(I->second);
272 PendingCallWrapperResults.erase(I);
273
274 auto WFR =
275 shared::WrapperFunctionBuffer::copyFrom(Source: ArgBytes.data(), Size: ArgBytes.size());
276 SetupMsgHandler(std::move(WFR));
277 return Error::success();
278}
279
280Error SimpleRemoteEPC::setup(Setup S) {
281 using namespace SimpleRemoteEPCDefaultBootstrapSymbolNames;
282
283 std::promise<MSVCPExpected<SimpleRemoteEPCExecutorInfo>> EIP;
284 auto EIF = EIP.get_future();
285
286 // Prepare a handler for the setup packet.
287 PendingCallWrapperResults[0] =
288 RunInPlace()(
289 [&](shared::WrapperFunctionBuffer SetupMsgBytes) {
290 if (const char *ErrMsg = SetupMsgBytes.getOutOfBandError()) {
291 EIP.set_value(
292 make_error<StringError>(Args&: ErrMsg, Args: inconvertibleErrorCode()));
293 return;
294 }
295 using SPSSerialize =
296 shared::SPSArgList<shared::SPSSimpleRemoteEPCExecutorInfo>;
297 shared::SPSInputBuffer IB(SetupMsgBytes.data(), SetupMsgBytes.size());
298 SimpleRemoteEPCExecutorInfo EI;
299 if (SPSSerialize::deserialize(IB, Arg&: EI))
300 EIP.set_value(EI);
301 else
302 EIP.set_value(make_error<StringError>(
303 Args: "Could not deserialize setup message", Args: inconvertibleErrorCode()));
304 });
305
306 // Start the transport.
307 if (auto Err = T->start())
308 return Err;
309
310 // Wait for setup packet to arrive.
311 auto EI = EIF.get();
312 if (!EI) {
313 T->disconnect();
314 return EI.takeError();
315 }
316
317 LLVM_DEBUG({
318 dbgs() << "SimpleRemoteEPC received setup message:\n"
319 << " Triple: " << EI->TargetTriple << "\n"
320 << " Page size: " << EI->PageSize << "\n"
321 << " Bootstrap map" << (EI->BootstrapMap.empty() ? " empty" : ":")
322 << "\n";
323 for (const auto &KV : EI->BootstrapMap)
324 dbgs() << " " << KV.first() << ": " << KV.second.size()
325 << "-byte SPS encoded buffer\n";
326 dbgs() << " Bootstrap symbols"
327 << (EI->BootstrapSymbols.empty() ? " empty" : ":") << "\n";
328 for (const auto &KV : EI->BootstrapSymbols)
329 dbgs() << " " << KV.first() << ": " << KV.second << "\n";
330 });
331 TargetTriple = Triple(EI->TargetTriple);
332 PageSize = EI->PageSize;
333 BootstrapMap = std::move(EI->BootstrapMap);
334 BootstrapSymbols = std::move(EI->BootstrapSymbols);
335
336 if (auto Err = getBootstrapSymbols(
337 Pairs: {{JDI.JITDispatchContext, ExecutorSessionObjectName},
338 {JDI.JITDispatchFunction, DispatchFnName},
339 {RunAsMainAddr, rt::RunAsMainWrapperName},
340 {RunAsVoidFunctionAddr, rt::RunAsVoidFunctionWrapperName},
341 {RunAsIntFunctionAddr, rt::RunAsIntFunctionWrapperName}}))
342 return Err;
343
344 // Set a default CreateMemoryManager if none is specified.
345 if (!S.CreateMemoryManager)
346 S.CreateMemoryManager = createDefaultMemoryManager;
347
348 if (auto MemMgr = S.CreateMemoryManager(*this)) {
349 OwnedMemMgr = std::move(*MemMgr);
350 this->MemMgr = OwnedMemMgr.get();
351 } else
352 return MemMgr.takeError();
353
354 // Set a default CreateMemoryAccess if none is specified.
355 if (!S.CreateMemoryAccess)
356 S.CreateMemoryAccess = createDefaultMemoryAccess;
357
358 if (auto MemAccess = S.CreateMemoryAccess(*this)) {
359 OwnedMemAccess = std::move(*MemAccess);
360 this->MemAccess = OwnedMemAccess.get();
361 } else
362 return MemAccess.takeError();
363
364 return Error::success();
365}
366
367Error SimpleRemoteEPC::handleResult(uint64_t SeqNo, ExecutorAddr TagAddr,
368 shared::WrapperFunctionBuffer ArgBytes) {
369 IncomingWFRHandler SendResult;
370
371 if (TagAddr)
372 return make_error<StringError>(Args: "Unexpected TagAddr in result message",
373 Args: inconvertibleErrorCode());
374
375 {
376 std::lock_guard<std::mutex> Lock(SimpleRemoteEPCMutex);
377 auto I = PendingCallWrapperResults.find(Val: SeqNo);
378 if (I == PendingCallWrapperResults.end())
379 return make_error<StringError>(Args: "No call for sequence number " +
380 Twine(SeqNo),
381 Args: inconvertibleErrorCode());
382 SendResult = std::move(I->second);
383 PendingCallWrapperResults.erase(I);
384 releaseSeqNo(SeqNo);
385 }
386
387 auto WFR =
388 shared::WrapperFunctionBuffer::copyFrom(Source: ArgBytes.data(), Size: ArgBytes.size());
389 SendResult(std::move(WFR));
390 return Error::success();
391}
392
393void SimpleRemoteEPC::handleCallWrapper(
394 uint64_t RemoteSeqNo, ExecutorAddr TagAddr,
395 shared::WrapperFunctionBuffer ArgBytes) {
396 assert(ES && "No ExecutionSession attached");
397 D->dispatch(T: makeGenericNamedTask(
398 Fn: [this, RemoteSeqNo, TagAddr, ArgBytes = std::move(ArgBytes)]() mutable {
399 ES->runJITDispatchHandler(
400 SendResult: [this, RemoteSeqNo](shared::WrapperFunctionBuffer WFR) {
401 if (auto Err =
402 sendMessage(OpC: SimpleRemoteEPCOpcode::Result, SeqNo: RemoteSeqNo,
403 TagAddr: ExecutorAddr(), ArgBytes: {WFR.data(), WFR.size()}))
404 getExecutionSession().reportError(Err: std::move(Err));
405 },
406 HandlerFnTagAddr: TagAddr, ArgBytes: std::move(ArgBytes));
407 },
408 Desc: "callWrapper task"));
409}
410
411Error SimpleRemoteEPC::handleHangup(shared::WrapperFunctionBuffer ArgBytes) {
412 using namespace llvm::orc::shared;
413 auto WFR = WrapperFunctionBuffer::copyFrom(Source: ArgBytes.data(), Size: ArgBytes.size());
414 if (const char *ErrMsg = WFR.getOutOfBandError())
415 return make_error<StringError>(Args&: ErrMsg, Args: inconvertibleErrorCode());
416
417 orc::shared::detail::SPSSerializableError Info;
418 SPSInputBuffer IB(WFR.data(), WFR.size());
419 if (!SPSArgList<SPSError>::deserialize(IB, Arg&: Info))
420 return make_error<StringError>(Args: "Could not deserialize hangup info",
421 Args: inconvertibleErrorCode());
422 return fromSPSSerializable(BSE: std::move(Info));
423}
424
425} // end namespace orc
426} // end namespace llvm
427