GCC Code Coverage Report


Directory: src/
Coverage: low: ≥ 0% medium: ≥ 75.0% high: ≥ 90.0%
Coverage Exec / Excl / Total
Lines: 0.0% 0 / 0 / 540
Functions: 0.0% 0 / 0 / 55
Branches: 0.0% 0 / 0 / 442

ps/rdma/rc_transport.cc
Line Branch Exec Source
1 #include "ps/rdma/rc_transport.h"
2
3 #include <atomic>
4 #include <chrono>
5 #include <cstring>
6 #include <iostream>
7 #include <stdexcept>
8 #include <string>
9
10 #include <folly/portability/GFlags.h>
11
12 #include "ps/rdma/control_plane.h"
13 #include "ps/rdma/rdma_common.h"
14 #include "ps/rdma/rc_options.h"
15
16 DECLARE_int32(global_id);
17 DECLARE_int32(num_server_processes);
18 DECLARE_int32(num_client_processes);
19
20 namespace petps {
21 namespace {
22
23 using petps::Exchange;
24 using petps::NowNs;
25
26 enum class TrackedWrKind : std::uint64_t {
27 kSubmitCommit = 1,
28 kResponseStatus = 2,
29 };
30
31 struct RcTransportProfileCounters {
32 std::atomic<std::uint64_t> submit_request_count{0};
33 std::atomic<std::uint64_t> submit_descriptor_write_count{0};
34 std::atomic<std::uint64_t> submit_commit_write_count{0};
35 std::atomic<std::uint64_t> submit_request_ns{0};
36 std::atomic<std::uint64_t> drain_pending_submit_count{0};
37 std::atomic<std::uint64_t> drain_pending_submit_ns{0};
38
39 std::atomic<std::uint64_t> complete_response_count{0};
40 std::atomic<std::uint64_t> response_payload_write_count{0};
41 std::atomic<std::uint64_t> response_payload_sg_write_count{0};
42 std::atomic<std::uint64_t> response_payload_sg_wr_count{0};
43 std::atomic<std::uint64_t> response_status_write_count{0};
44 std::atomic<std::uint64_t> response_payload_bytes{0};
45 std::atomic<std::uint64_t> complete_response_ns{0};
46 std::atomic<std::uint64_t> drain_pending_response_count{0};
47 std::atomic<std::uint64_t> drain_pending_response_ns{0};
48 std::atomic<std::uint64_t> next_report_ns{0};
49 };
50
51 RcTransportProfileCounters& TransportProfile() {
52 static RcTransportProfileCounters counters;
53 return counters;
54 }
55
56 void MaybeReportTransportProfile(const RcTransportConfig& config,
57 const char* role) {
58 if (FLAGS_rdma_rc_profile_interval_ms <= 0) {
59 return;
60 }
61 auto& counters = TransportProfile();
62 const std::uint64_t now = NowNs();
63 const std::uint64_t interval =
64 static_cast<std::uint64_t>(FLAGS_rdma_rc_profile_interval_ms) * 1000000;
65 std::uint64_t expected =
66 counters.next_report_ns.load(std::memory_order_relaxed);
67 if (expected == 0) {
68 counters.next_report_ns.compare_exchange_strong(
69 expected, now + interval, std::memory_order_relaxed);
70 return;
71 }
72 if (now < expected ||
73 !counters.next_report_ns.compare_exchange_strong(
74 expected, now + interval, std::memory_order_relaxed)) {
75 return;
76 }
77
78 const std::uint64_t submit_count = Exchange(&counters.submit_request_count);
79 const std::uint64_t submit_ns = Exchange(&counters.submit_request_ns);
80 const std::uint64_t complete_count =
81 Exchange(&counters.complete_response_count);
82 const std::uint64_t complete_ns = Exchange(&counters.complete_response_ns);
83 const std::uint64_t drain_submit_count =
84 Exchange(&counters.drain_pending_submit_count);
85 const std::uint64_t drain_submit_ns =
86 Exchange(&counters.drain_pending_submit_ns);
87 const std::uint64_t drain_response_count =
88 Exchange(&counters.drain_pending_response_count);
89 const std::uint64_t drain_response_ns =
90 Exchange(&counters.drain_pending_response_ns);
91 std::cout
92 << "component=rdma_rc_transport_profile role=" << role
93 << " shard=" << config.shard_id << " client_id=" << config.client_id
94 << " submit_count=" << submit_count << " submit_descriptor_writes="
95 << Exchange(&counters.submit_descriptor_write_count)
96 << " submit_commit_writes="
97 << Exchange(&counters.submit_commit_write_count)
98 << " submit_avg_ns=" << (submit_count == 0 ? 0 : submit_ns / submit_count)
99 << " drain_submit_count=" << drain_submit_count << " drain_submit_avg_ns="
100 << (drain_submit_count == 0 ? 0 : drain_submit_ns / drain_submit_count)
101 << " complete_count=" << complete_count << " response_payload_writes="
102 << Exchange(&counters.response_payload_write_count)
103 << " response_payload_sg_writes="
104 << Exchange(&counters.response_payload_sg_write_count)
105 << " response_payload_sg_wrs="
106 << Exchange(&counters.response_payload_sg_wr_count)
107 << " response_status_writes="
108 << Exchange(&counters.response_status_write_count)
109 << " response_payload_bytes="
110 << Exchange(&counters.response_payload_bytes) << " complete_avg_ns="
111 << (complete_count == 0 ? 0 : complete_ns / complete_count)
112 << " drain_response_count=" << drain_response_count
113 << " drain_response_avg_ns="
114 << (drain_response_count == 0 ? 0
115 : drain_response_ns / drain_response_count)
116 << std::endl;
117 }
118
119 std::size_t TotalClientSlotsPerShard(const RcTransportConfig& config) {
120 return static_cast<std::size_t>(config.qps_per_client_per_shard) *
121 static_cast<std::size_t>(config.slots_per_qp);
122 }
123
124 std::size_t ClientSlotBytes(const RcTransportConfig& config) {
125 return config.response_slot_bytes + config.request_slot_bytes;
126 }
127
128 std::size_t ClientLaneBytes(const RcTransportConfig& config) {
129 return static_cast<std::size_t>(config.slots_per_qp) *
130 ClientSlotBytes(config);
131 }
132
133 int LogicalClientsPerProcess(const RcTransportConfig& config) {
134 if (FLAGS_num_client_processes <= 0) {
135 throw std::runtime_error("num_client_processes must be positive");
136 }
137 if (config.num_clients < FLAGS_num_client_processes) {
138 throw std::runtime_error(
139 "logical client count smaller than OS client process count");
140 }
141 if (config.num_clients % FLAGS_num_client_processes != 0) {
142 throw std::runtime_error(
143 "logical client count must be divisible by OS client process count");
144 }
145 return config.num_clients / FLAGS_num_client_processes;
146 }
147
148 int OsClientIndexForLogicalClient(const RcTransportConfig& config,
149 int client_id) {
150 return client_id / LogicalClientsPerProcess(config);
151 }
152
153 int LocalLogicalClientIndex(const RcTransportConfig& config, int client_id) {
154 return client_id % LogicalClientsPerProcess(config);
155 }
156
157 int RawLaneForLogicalClient(
158 const RcTransportConfig& config, int client_id, int qp_index) {
159 return LocalLogicalClientIndex(config, client_id) *
160 config.qps_per_client_per_shard +
161 qp_index;
162 }
163
164 int RawLanesPerOsClient(const RcTransportConfig& config) {
165 return LogicalClientsPerProcess(config) * config.qps_per_client_per_shard;
166 }
167
168 std::size_t ServerLaneBytes(const RcTransportConfig& config) {
169 return static_cast<std::size_t>(config.num_clients) *
170 static_cast<std::size_t>(config.slots_per_qp) *
171 config.request_slot_bytes;
172 }
173
174 std::size_t
175 ClientShardLaneOffset(const RcTransportConfig& config, int raw_lane) {
176 return (static_cast<std::size_t>(config.shard_id) *
177 static_cast<std::size_t>(RawLanesPerOsClient(config)) +
178 static_cast<std::size_t>(raw_lane)) *
179 ClientLaneBytes(config);
180 }
181
182 std::size_t ClientSlotOffset(const RcTransportConfig& config, int slot_in_qp) {
183 return static_cast<std::size_t>(slot_in_qp) * ClientSlotBytes(config);
184 }
185
186 std::size_t ServerRequestOffset(
187 const RcTransportConfig& config, int client_id, int slot_in_qp) {
188 return (static_cast<std::size_t>(client_id) *
189 static_cast<std::size_t>(config.slots_per_qp) +
190 static_cast<std::size_t>(slot_in_qp)) *
191 config.request_slot_bytes;
192 }
193
194 std::size_t ClientResponseOffsetForRawLane(
195 const RcTransportConfig& config, int raw_lane, int slot_in_qp) {
196 return ClientShardLaneOffset(config, raw_lane) +
197 ClientSlotOffset(config, slot_in_qp);
198 }
199
200 std::size_t
201 ClientResponseOffset(const RcTransportConfig& config, int slot_in_qp) {
202 return ClientResponseOffsetForRawLane(
203 config, RawLaneForLogicalClient(config, config.client_id, 0), slot_in_qp);
204 }
205
206 std::size_t
207 ClientRequestStagingOffset(const RcTransportConfig& config, int slot_in_qp) {
208 return ClientResponseOffset(config, slot_in_qp) + config.response_slot_bytes;
209 }
210
211 std::uint64_t RequestCommitOffset(const RcTransportConfig& config) {
212 return config.request_slot_bytes - Align64(sizeof(CommitWord));
213 }
214
215 std::uint64_t ResponseStatusOffset(const RcTransportConfig& config) {
216 return config.response_slot_bytes - Align64(sizeof(StatusWord));
217 }
218
219 int GlobalSlotIndex(const RcTransportConfig& config,
220 int client_id,
221 int qp_index,
222 int slot_in_qp) {
223 return static_cast<int>(
224 (static_cast<std::size_t>(client_id) * TotalClientSlotsPerShard(config)) +
225 (static_cast<std::size_t>(qp_index) *
226 static_cast<std::size_t>(config.slots_per_qp)) +
227 static_cast<std::size_t>(slot_in_qp));
228 }
229
230 void DecodeGlobalSlotIndex(
231 const RcTransportConfig& config,
232 int slot_index,
233 int* client_id,
234 int* qp_index,
235 int* slot_in_qp) {
236 if (slot_index < 0) {
237 throw std::runtime_error("slot_index out of range");
238 }
239 const std::size_t slots_per_client = TotalClientSlotsPerShard(config);
240 if (slots_per_client == 0) {
241 throw std::runtime_error("slots_per_client is zero");
242 }
243 const std::size_t slot = static_cast<std::size_t>(slot_index);
244 if (client_id != nullptr) {
245 *client_id = static_cast<int>(slot / slots_per_client);
246 }
247 const std::size_t slot_in_client = slot % slots_per_client;
248 if (qp_index != nullptr) {
249 *qp_index = static_cast<int>(
250 slot_in_client / static_cast<std::size_t>(config.slots_per_qp));
251 }
252 if (slot_in_qp != nullptr) {
253 *slot_in_qp = static_cast<int>(
254 slot_in_client % static_cast<std::size_t>(config.slots_per_qp));
255 }
256 }
257
258 int ResponseSlotOrdinal(
259 const RcTransportConfig& config, int client_id, int slot_in_qp) {
260 return client_id * config.slots_per_qp + slot_in_qp;
261 }
262
263 std::uint64_t MakeTrackedWrId(TrackedWrKind kind, int slot_ordinal) {
264 return (static_cast<std::uint64_t>(kind) << 32) |
265 static_cast<std::uint32_t>(slot_ordinal);
266 }
267
268 int DecodeTrackedWrId(
269 TrackedWrKind kind, std::uint64_t wr_id, std::size_t expected_slots) {
270 const std::uint64_t kind_bits = wr_id >> 32;
271 if (kind_bits != static_cast<std::uint64_t>(kind)) {
272 throw std::runtime_error("unexpected tracked RC WR kind");
273 }
274 const std::uint32_t slot_ordinal = static_cast<std::uint32_t>(wr_id);
275 if (slot_ordinal >= expected_slots) {
276 throw std::runtime_error("tracked RC WR slot ordinal out of range");
277 }
278 return static_cast<int>(slot_ordinal);
279 }
280
281 void WaitForTrackedCompletion(
282 RawVerbsTransport* verbs,
283 std::vector<std::uint8_t>* ready,
284 TrackedWrKind kind,
285 int slot_ordinal,
286 const std::string& context) {
287 if (verbs == nullptr || ready == nullptr) {
288 throw std::runtime_error("tracked completion state is null");
289 }
290 auto& ready_flags = *ready;
291 if (slot_ordinal < 0 ||
292 static_cast<std::size_t>(slot_ordinal) >= ready_flags.size()) {
293 throw std::runtime_error("tracked completion slot ordinal out of range");
294 }
295 if (ready_flags[static_cast<std::size_t>(slot_ordinal)] != 0) {
296 ready_flags[static_cast<std::size_t>(slot_ordinal)] = 0;
297 return;
298 }
299
300 while (true) {
301 RawVerbsCompletion completion;
302 if (!verbs->Poll(&completion, FLAGS_rdma_wait_timeout_ms)) {
303 throw std::runtime_error(
304 "RC verbs write completion timeout " + context + " expected_wr_id=" +
305 std::to_string(MakeTrackedWrId(kind, slot_ordinal)));
306 }
307 const int completed_slot =
308 DecodeTrackedWrId(kind, completion.wr_id, ready_flags.size());
309 if (completed_slot == slot_ordinal) {
310 return;
311 }
312 ready_flags[static_cast<std::size_t>(completed_slot)] = 1;
313 }
314 }
315
316 void DrainTrackedPendingWrite(
317 RawVerbsTransport* verbs,
318 std::vector<std::uint8_t>* pending,
319 std::vector<std::uint8_t>* ready,
320 TrackedWrKind kind,
321 int slot_ordinal,
322 const std::string& context) {
323 if (pending == nullptr || ready == nullptr) {
324 return;
325 }
326 auto& pending_flags = *pending;
327 if (slot_ordinal < 0 ||
328 static_cast<std::size_t>(slot_ordinal) >= pending_flags.size()) {
329 throw std::runtime_error("tracked pending slot ordinal out of range");
330 }
331 if (pending_flags[static_cast<std::size_t>(slot_ordinal)] == 0) {
332 return;
333 }
334 WaitForTrackedCompletion(verbs, ready, kind, slot_ordinal, context);
335 pending_flags[static_cast<std::size_t>(slot_ordinal)] = 0;
336 }
337
338 void DrainTrackedPendingWrite(
339 RawVerbsTransport* verbs,
340 std::vector<std::uint8_t>* pending,
341 std::vector<std::uint8_t>* ready,
342 TrackedWrKind kind,
343 int slot_ordinal,
344 const std::string& context,
345 bool profile_enabled,
346 std::atomic<std::uint64_t>* drain_count,
347 std::atomic<std::uint64_t>* drain_ns) {
348 if (pending == nullptr || ready == nullptr || drain_count == nullptr ||
349 drain_ns == nullptr) {
350 return;
351 }
352 if (slot_ordinal < 0 ||
353 static_cast<std::size_t>(slot_ordinal) >= pending->size() ||
354 pending->at(static_cast<std::size_t>(slot_ordinal)) == 0) {
355 return;
356 }
357 if (!profile_enabled) {
358 DrainTrackedPendingWrite(
359 verbs, pending, ready, kind, slot_ordinal, context);
360 return;
361 }
362
363 const std::uint64_t drain_start_ns = NowNs();
364 DrainTrackedPendingWrite(verbs, pending, ready, kind, slot_ordinal, context);
365 drain_count->fetch_add(1, std::memory_order_relaxed);
366 drain_ns->fetch_add(NowNs() - drain_start_ns, std::memory_order_relaxed);
367 }
368
369 std::string WriteContext(
370 const RcTransportConfig& config,
371 int client_id,
372 int qp_index,
373 int slot_in_qp,
374 std::uint64_t seq,
375 std::uint64_t remote_offset,
376 int remote_node,
377 const char* phase) {
378 return "phase=" + std::string(phase) +
379 " shard=" + std::to_string(config.shard_id) + " client_id=" +
380 std::to_string(client_id) + " qp=" + std::to_string(qp_index) +
381 " slot_in_qp=" + std::to_string(slot_in_qp) + " seq=" +
382 std::to_string(seq) + " remote_node=" + std::to_string(remote_node) +
383 " remote_offset=" + std::to_string(remote_offset);
384 }
385
386 void ValidateClientId(const RcTransportConfig& config, int client_id) {
387 if (client_id < 0 || client_id >= config.num_clients) {
388 throw std::runtime_error("client_id out of range");
389 }
390 }
391
392 void ValidateSlotInQp(const RcTransportConfig& config, int slot_in_qp) {
393 if (slot_in_qp < 0 || slot_in_qp >= config.slots_per_qp) {
394 throw std::runtime_error("slot_in_qp out of range");
395 }
396 }
397
398 RawVerbsConfig MakeRawConfig(
399 const RcTransportConfig& config,
400 int local_lane,
401 std::size_t local_region_bytes,
402 bool is_client,
403 int only_node_id) {
404 RawVerbsConfig raw;
405 raw.global_id = FLAGS_global_id;
406 raw.local_lane = local_lane;
407 raw.remote_lane = local_lane;
408 raw.only_node_id = only_node_id;
409 raw.num_servers = FLAGS_num_server_processes;
410 raw.num_clients = FLAGS_num_client_processes;
411 raw.numa_id =
412 is_client ? FLAGS_rdma_rc_client_numa_id : FLAGS_rdma_rc_server_numa_id;
413 raw.max_inline_data =
414 static_cast<std::uint32_t>(std::max(0, FLAGS_rdma_rc_inline_bytes));
415 raw.connect_to_servers = is_client;
416 raw.connect_to_clients = !is_client;
417 raw.local_region_bytes = local_region_bytes;
418 raw.control_plane_host = config.control_plane_host;
419 raw.control_plane_port = config.control_plane_port;
420 raw.control_plane_timeout_ms = config.control_plane_timeout_ms;
421 return raw;
422 }
423
424 } // namespace
425
426 RcShardClientTransport::RcShardClientTransport(const RcTransportConfig& config)
427 : config_(config), server_node_id_(config.shard_id) {
428 ValidateClientId(config_, config_.client_id);
429 if (config_.slots_per_qp <= 0) {
430 throw std::runtime_error("slots_per_qp must be positive");
431 }
432 if (server_node_id_ < 0 || server_node_id_ >= FLAGS_num_server_processes) {
433 throw std::runtime_error("server shard id out of global node range");
434 }
435 lanes_.reserve(static_cast<std::size_t>(config_.qps_per_client_per_shard));
436 for (int qp = 0; qp < config_.qps_per_client_per_shard; ++qp) {
437 Lane lane;
438 const int raw_lane =
439 RawLaneForLogicalClient(config_, config_.client_id, qp);
440 const std::size_t local_bytes =
441 static_cast<std::size_t>(FLAGS_num_server_processes) *
442 static_cast<std::size_t>(RawLanesPerOsClient(config_)) *
443 ClientLaneBytes(config_);
444 RawVerbsConfig raw =
445 MakeRawConfig(config_, raw_lane, local_bytes, true, server_node_id_);
446 raw.reserved_region_offset = ClientShardLaneOffset(config_, raw_lane);
447 raw.reserved_region_bytes = ClientLaneBytes(config_);
448 lane.verbs = std::make_unique<RawVerbsTransport>(raw);
449 lane.lane_base = lane.verbs->LocalPointer(GlobalAddress{
450 static_cast<std::uint16_t>(FLAGS_global_id),
451 static_cast<std::uint64_t>(ClientShardLaneOffset(config_, raw_lane)),
452 });
453 std::memset(lane.lane_base, 0, ClientLaneBytes(config_));
454 lane.submit_completion_pending.assign(
455 static_cast<std::size_t>(config_.slots_per_qp), 0);
456 lane.submit_completion_ready.assign(
457 static_cast<std::size_t>(config_.slots_per_qp), 0);
458 lane.verbs->PublishAndConnect();
459 lanes_.push_back(std::move(lane));
460 }
461 }
462
463 RcShardClientTransport::~RcShardClientTransport() {
464 try {
465 for (std::size_t qp = 0; qp < lanes_.size(); ++qp) {
466 Lane& lane = lanes_[qp];
467 if (!lane.verbs) {
468 continue;
469 }
470 for (int slot_in_qp = 0; slot_in_qp < config_.slots_per_qp;
471 ++slot_in_qp) {
472 DrainTrackedPendingWrite(
473 lane.verbs.get(),
474 &lane.submit_completion_pending,
475 &lane.submit_completion_ready,
476 TrackedWrKind::kSubmitCommit,
477 slot_in_qp,
478 WriteContext(
479 config_,
480 config_.client_id,
481 static_cast<int>(qp),
482 slot_in_qp,
483 0,
484 ServerRequestOffset(config_, config_.client_id, slot_in_qp) +
485 RequestCommitOffset(config_),
486 server_node_id_,
487 "shutdown_submit_commit"));
488 }
489 }
490 } catch (...) {
491 }
492 }
493
494 RcShardClientTransport::Lane& RcShardClientTransport::LaneAt(int qp_index) {
495 if (qp_index < 0 || qp_index >= config_.qps_per_client_per_shard) {
496 throw std::runtime_error("qp_index out of range");
497 }
498 return lanes_.at(static_cast<std::size_t>(qp_index));
499 }
500
501 const RcShardClientTransport::Lane&
502 RcShardClientTransport::LaneAt(int qp_index) const {
503 if (qp_index < 0 || qp_index >= config_.qps_per_client_per_shard) {
504 throw std::runtime_error("qp_index out of range");
505 }
506 return lanes_.at(static_cast<std::size_t>(qp_index));
507 }
508
509 RcClientQpView RcShardClientTransport::OpenQp(int qp_index) {
510 return OpenSlot(qp_index, 0);
511 }
512
513 RcClientQpView RcShardClientTransport::OpenSlot(int qp_index, int slot_in_qp) {
514 ValidateSlotInQp(config_, slot_in_qp);
515 const Lane& lane = LaneAt(qp_index);
516 auto* slot_base = static_cast<char*>(lane.lane_base) +
517 ClientSlotOffset(config_, slot_in_qp);
518 auto* response_slot = static_cast<void*>(slot_base);
519 auto* response_payload = slot_base;
520 auto* status = reinterpret_cast<StatusWord*>(
521 response_payload + config_.response_slot_bytes -
522 Align64(sizeof(StatusWord)));
523
524 auto* request_slot = slot_base + config_.response_slot_bytes;
525 auto* descriptor = reinterpret_cast<RequestDescriptor*>(request_slot);
526 auto* payload = request_slot + Align64(sizeof(RequestDescriptor));
527 auto* commit = reinterpret_cast<CommitWord*>(
528 request_slot + config_.request_slot_bytes - Align64(sizeof(CommitWord)));
529 const int slot_index =
530 GlobalSlotIndex(config_, config_.client_id, qp_index, slot_in_qp);
531
532 return RcClientQpView{
533 qp_index,
534 slot_in_qp,
535 slot_index,
536 request_slot,
537 descriptor,
538 payload,
539 commit,
540 response_slot,
541 response_payload,
542 status,
543 };
544 }
545
546 void RcShardClientTransport::SubmitRequest(
547 const RcClientQpView& view,
548 const RequestDescriptor& descriptor,
549 const void* payload,
550 std::size_t payload_bytes) {
551 const bool profile_enabled = FLAGS_rdma_rc_profile_interval_ms > 0;
552 const std::uint64_t start_ns = profile_enabled ? NowNs() : 0;
553 ValidateSlotInQp(config_, view.slot_in_qp);
554 Lane& lane = LaneAt(view.qp_index);
555 const std::uint64_t remote_request_offset =
556 ServerRequestOffset(config_, config_.client_id, view.slot_in_qp);
557 auto& counters = TransportProfile();
558 DrainTrackedPendingWrite(
559 lane.verbs.get(),
560 &lane.submit_completion_pending,
561 &lane.submit_completion_ready,
562 TrackedWrKind::kSubmitCommit,
563 view.slot_in_qp,
564 WriteContext(
565 config_,
566 config_.client_id,
567 view.qp_index,
568 view.slot_in_qp,
569 descriptor.seq - 1,
570 remote_request_offset + RequestCommitOffset(config_),
571 server_node_id_,
572 "previous_submit_commit"),
573 profile_enabled,
574 &counters.drain_pending_submit_count,
575 &counters.drain_pending_submit_ns);
576
577 auto* request_slot = static_cast<char*>(view.request_slot);
578 auto* local_descriptor = reinterpret_cast<RequestDescriptor*>(request_slot);
579 auto* local_payload = request_slot + Align64(sizeof(RequestDescriptor));
580 auto* local_commit = reinterpret_cast<CommitWord*>(
581 request_slot + RequestCommitOffset(config_));
582 *local_descriptor = descriptor;
583 if (payload_bytes > 0) {
584 std::memcpy(local_payload, payload, payload_bytes);
585 }
586 local_commit->seq.store(descriptor.seq, std::memory_order_release);
587 local_commit->state.store(kRcSlotReady, std::memory_order_release);
588
589 lane.verbs->Write(
590 request_slot,
591 GlobalAddress{
592 static_cast<std::uint16_t>(server_node_id_),
593 remote_request_offset,
594 },
595 Align64(sizeof(RequestDescriptor)) + payload_bytes,
596 /*wr_id=*/0,
597 false);
598 if (profile_enabled) {
599 TransportProfile().submit_descriptor_write_count.fetch_add(
600 1, std::memory_order_relaxed);
601 }
602
603 lane.verbs->Write(
604 local_commit,
605 GlobalAddress{
606 static_cast<std::uint16_t>(server_node_id_),
607 remote_request_offset + RequestCommitOffset(config_),
608 },
609 sizeof(CommitWord),
610 MakeTrackedWrId(TrackedWrKind::kSubmitCommit, view.slot_in_qp),
611 true);
612 lane.submit_completion_pending[static_cast<std::size_t>(view.slot_in_qp)] = 1;
613 if (profile_enabled) {
614 auto& profile = TransportProfile();
615 profile.submit_request_count.fetch_add(1, std::memory_order_relaxed);
616 profile.submit_commit_write_count.fetch_add(1, std::memory_order_relaxed);
617 profile.submit_request_ns.fetch_add(
618 NowNs() - start_ns, std::memory_order_relaxed);
619 MaybeReportTransportProfile(config_, "client");
620 }
621 }
622
623 void RcShardClientTransport::ClearRequestSlot(const RcClientQpView& view) {
624 auto* commit = view.commit;
625 commit->state.store(0, std::memory_order_release);
626 }
627
628 RcShardServerTransport::RcShardServerTransport(const RcTransportConfig& config)
629 : config_(config) {
630 if (FLAGS_global_id < 0 || FLAGS_global_id >= FLAGS_num_server_processes) {
631 throw std::runtime_error("server global_id out of range");
632 }
633 if (config_.slots_per_qp <= 0) {
634 throw std::runtime_error("slots_per_qp must be positive");
635 }
636 lanes_.reserve(static_cast<std::size_t>(config_.qps_per_client_per_shard));
637 const int raw_lane_count = RawLanesPerOsClient(config_);
638 for (int raw_lane = 0; raw_lane < raw_lane_count; ++raw_lane) {
639 Lane lane;
640 const int response_slots = config_.num_clients * config_.slots_per_qp;
641 const std::size_t local_bytes =
642 ServerLaneBytes(config_) +
643 static_cast<std::size_t>(response_slots) * config_.response_slot_bytes;
644 lane.verbs = std::make_unique<RawVerbsTransport>(
645 MakeRawConfig(config_, raw_lane, local_bytes, false, -1));
646 lane.request_slots =
647 lane.verbs->AllocateRegistered(ServerLaneBytes(config_));
648 std::memset(lane.request_slots, 0, ServerLaneBytes(config_));
649 lane.response_staging.reserve(static_cast<std::size_t>(response_slots));
650 lane.response_completion_pending.assign(
651 static_cast<std::size_t>(response_slots), 0);
652 lane.response_completion_ready.assign(
653 static_cast<std::size_t>(response_slots), 0);
654 for (int slot = 0; slot < response_slots; ++slot) {
655 void* response_slot =
656 lane.verbs->AllocateRegistered(config_.response_slot_bytes);
657 std::memset(response_slot, 0, config_.response_slot_bytes);
658 lane.response_staging.push_back(response_slot);
659 }
660 lane.verbs->PublishAndConnect();
661 lanes_.push_back(std::move(lane));
662 }
663 }
664
665 RcShardServerTransport::~RcShardServerTransport() {
666 try {
667 const int logical_clients_per_process = LogicalClientsPerProcess(config_);
668 for (std::size_t raw_lane_index = 0; raw_lane_index < lanes_.size();
669 ++raw_lane_index) {
670 Lane& lane = lanes_[raw_lane_index];
671 if (!lane.verbs) {
672 continue;
673 }
674 const int local_logical_client = static_cast<int>(
675 raw_lane_index /
676 static_cast<std::size_t>(config_.qps_per_client_per_shard));
677 const int qp_index = static_cast<int>(
678 raw_lane_index %
679 static_cast<std::size_t>(config_.qps_per_client_per_shard));
680 for (int os_client = 0; os_client < FLAGS_num_client_processes;
681 ++os_client) {
682 const int client =
683 os_client * logical_clients_per_process + local_logical_client;
684 for (int slot_in_qp = 0; slot_in_qp < config_.slots_per_qp;
685 ++slot_in_qp) {
686 const int raw_lane =
687 RawLaneForLogicalClient(config_, client, qp_index);
688 const int client_node_id =
689 FLAGS_num_server_processes +
690 OsClientIndexForLogicalClient(config_, client);
691 const int response_slot =
692 ResponseSlotOrdinal(config_, client, slot_in_qp);
693 DrainTrackedPendingWrite(
694 lane.verbs.get(),
695 &lane.response_completion_pending,
696 &lane.response_completion_ready,
697 TrackedWrKind::kResponseStatus,
698 response_slot,
699 WriteContext(
700 config_,
701 client,
702 qp_index,
703 slot_in_qp,
704 0,
705 ClientResponseOffsetForRawLane(
706 config_, raw_lane, slot_in_qp) +
707 ResponseStatusOffset(config_),
708 client_node_id,
709 "shutdown_response_status"));
710 }
711 }
712 }
713 } catch (...) {
714 }
715 }
716
717 RcShardServerTransport::Lane&
718 RcShardServerTransport::LaneAt(int client_id, int qp_index) {
719 ValidateClientId(config_, client_id);
720 if (qp_index < 0 || qp_index >= config_.qps_per_client_per_shard) {
721 throw std::runtime_error("qp_index out of range");
722 }
723 return lanes_.at(static_cast<std::size_t>(
724 RawLaneForLogicalClient(config_, client_id, qp_index)));
725 }
726
727 const RcShardServerTransport::Lane&
728 RcShardServerTransport::LaneAt(int client_id, int qp_index) const {
729 ValidateClientId(config_, client_id);
730 if (qp_index < 0 || qp_index >= config_.qps_per_client_per_shard) {
731 throw std::runtime_error("qp_index out of range");
732 }
733 return lanes_.at(static_cast<std::size_t>(
734 RawLaneForLogicalClient(config_, client_id, qp_index)));
735 }
736
737 int RcShardServerTransport::TotalSlots() const {
738 return static_cast<int>(static_cast<std::size_t>(config_.num_clients) *
739 TotalClientSlotsPerShard(config_));
740 }
741
742 void RcShardServerTransport::RegisterLocalMemoryRegion(
743 void* base, std::size_t bytes) {
744 for (auto& lane : lanes_) {
745 if (lane.verbs) {
746 lane.verbs->RegisterMemoryRegion(base, bytes);
747 }
748 }
749 }
750
751 int RcShardServerTransport::SlotIndex(
752 int client_id, int qp_index, int slot_in_qp) const {
753 ValidateClientId(config_, client_id);
754 ValidateSlotInQp(config_, slot_in_qp);
755 if (qp_index < 0 || qp_index >= config_.qps_per_client_per_shard) {
756 throw std::runtime_error("qp_index out of range");
757 }
758 return GlobalSlotIndex(config_, client_id, qp_index, slot_in_qp);
759 }
760
761 void RcShardServerTransport::DecodeSlotIndex(
762 int slot_index, int* client_id, int* qp_index, int* slot_in_qp) const {
763 if (slot_index < 0 || slot_index >= TotalSlots()) {
764 throw std::runtime_error("slot_index out of range");
765 }
766 DecodeGlobalSlotIndex(config_, slot_index, client_id, qp_index, slot_in_qp);
767 }
768
769 void* RcShardServerTransport::RequestSlot(int slot_index) const {
770 int client_id = -1;
771 int qp_index = -1;
772 int slot_in_qp = -1;
773 DecodeSlotIndex(slot_index, &client_id, &qp_index, &slot_in_qp);
774 const Lane& lane = LaneAt(client_id, qp_index);
775 return static_cast<char*>(lane.request_slots) +
776 ServerRequestOffset(config_, client_id, slot_in_qp);
777 }
778
779 RequestDescriptor*
780 RcShardServerTransport::RequestDescriptorAt(int slot_index) const {
781 return reinterpret_cast<RequestDescriptor*>(RequestSlot(slot_index));
782 }
783
784 char* RcShardServerTransport::RequestPayloadAt(int slot_index) const {
785 return static_cast<char*>(RequestSlot(slot_index)) +
786 Align64(sizeof(RequestDescriptor));
787 }
788
789 CommitWord* RcShardServerTransport::RequestCommitAt(int slot_index) const {
790 return reinterpret_cast<CommitWord*>(
791 static_cast<char*>(RequestSlot(slot_index)) +
792 RequestCommitOffset(config_));
793 }
794
795 RcShardServerTransport::ResponseView RcShardServerTransport::OpenClientResponse(
796 int client_id, int qp_index, int slot_in_qp) {
797 ValidateClientId(config_, client_id);
798 ValidateSlotInQp(config_, slot_in_qp);
799 Lane& lane = LaneAt(client_id, qp_index);
800 auto* slot =
801 static_cast<char*>(lane.response_staging.at(static_cast<std::size_t>(
802 ResponseSlotOrdinal(config_, client_id, slot_in_qp))));
803 auto* payload = static_cast<char*>(slot);
804 auto* status =
805 reinterpret_cast<StatusWord*>(payload + ResponseStatusOffset(config_));
806 return ResponseView{slot, payload, status};
807 }
808
809 void RcShardServerTransport::CompleteResponse(
810 int client_id,
811 int qp_index,
812 int slot_in_qp,
813 const ResponseView& response,
814 std::uint64_t seq) {
815 const bool profile_enabled = FLAGS_rdma_rc_profile_interval_ms > 0;
816 const std::uint64_t start_ns = profile_enabled ? NowNs() : 0;
817 ValidateClientId(config_, client_id);
818 ValidateSlotInQp(config_, slot_in_qp);
819 Lane& lane = LaneAt(client_id, qp_index);
820 const int response_slot = ResponseSlotOrdinal(config_, client_id, slot_in_qp);
821 const int raw_lane = RawLaneForLogicalClient(config_, client_id, qp_index);
822 const int client_node_id =
823 FLAGS_num_server_processes +
824 OsClientIndexForLogicalClient(config_, client_id);
825 auto& counters = TransportProfile();
826 DrainTrackedPendingWrite(
827 lane.verbs.get(),
828 &lane.response_completion_pending,
829 &lane.response_completion_ready,
830 TrackedWrKind::kResponseStatus,
831 response_slot,
832 WriteContext(
833 config_,
834 client_id,
835 qp_index,
836 slot_in_qp,
837 seq - 1,
838 ClientResponseOffsetForRawLane(config_, raw_lane, slot_in_qp) +
839 ResponseStatusOffset(config_),
840 client_node_id,
841 "previous_response_status"),
842 profile_enabled,
843 &counters.drain_pending_response_count,
844 &counters.drain_pending_response_ns);
845 response.status->seq.store(seq, std::memory_order_release);
846 response.status->state.store(kRcSlotDone, std::memory_order_release);
847
848 if (response.status->response_bytes > 0) {
849 const std::uint64_t response_payload_offset =
850 ClientResponseOffsetForRawLane(config_, raw_lane, slot_in_qp);
851 lane.verbs->Write(
852 response.payload,
853 GlobalAddress{
854 static_cast<std::uint16_t>(client_node_id),
855 response_payload_offset,
856 },
857 response.status->response_bytes,
858 /*wr_id=*/0,
859 false);
860 if (profile_enabled) {
861 auto& profile = TransportProfile();
862 profile.response_payload_write_count.fetch_add(
863 1, std::memory_order_relaxed);
864 profile.response_payload_bytes.fetch_add(
865 response.status->response_bytes, std::memory_order_relaxed);
866 }
867 }
868
869 const std::uint64_t response_status_offset =
870 ClientResponseOffsetForRawLane(config_, raw_lane, slot_in_qp) +
871 ResponseStatusOffset(config_);
872 lane.verbs->Write(
873 response.status,
874 GlobalAddress{
875 static_cast<std::uint16_t>(client_node_id),
876 response_status_offset,
877 },
878 sizeof(StatusWord),
879 MakeTrackedWrId(TrackedWrKind::kResponseStatus, response_slot),
880 true);
881 lane.response_completion_pending[static_cast<std::size_t>(response_slot)] = 1;
882 if (profile_enabled) {
883 auto& profile = TransportProfile();
884 profile.complete_response_count.fetch_add(1, std::memory_order_relaxed);
885 profile.response_status_write_count.fetch_add(1, std::memory_order_relaxed);
886 profile.complete_response_ns.fetch_add(
887 NowNs() - start_ns, std::memory_order_relaxed);
888 MaybeReportTransportProfile(config_, "server");
889 }
890 }
891
892 void RcShardServerTransport::WriteResponsePayloadSg(
893 int client_id,
894 int qp_index,
895 int slot_in_qp,
896 base::ConstArray<RawVerbsSge> sges,
897 std::uint64_t response_offset,
898 std::uint64_t bytes) {
899 ValidateClientId(config_, client_id);
900 ValidateSlotInQp(config_, slot_in_qp);
901 if (sges.Size() == 0 || bytes == 0) {
902 return;
903 }
904 Lane& lane = LaneAt(client_id, qp_index);
905 const int raw_lane = RawLaneForLogicalClient(config_, client_id, qp_index);
906 const int client_node_id =
907 FLAGS_num_server_processes +
908 OsClientIndexForLogicalClient(config_, client_id);
909 lane.verbs->WriteSg(
910 sges,
911 GlobalAddress{
912 static_cast<std::uint16_t>(client_node_id),
913 ClientResponseOffsetForRawLane(config_, raw_lane, slot_in_qp) +
914 response_offset,
915 },
916 /*wr_id=*/0,
917 false);
918 if (FLAGS_rdma_rc_profile_interval_ms > 0) {
919 auto& profile = TransportProfile();
920 profile.response_payload_sg_write_count.fetch_add(
921 1, std::memory_order_relaxed);
922 profile.response_payload_sg_wr_count.fetch_add(
923 1, std::memory_order_relaxed);
924 profile.response_payload_bytes.fetch_add(bytes, std::memory_order_relaxed);
925 }
926 }
927
928 void RcShardServerTransport::CompleteResponseStatusOnly(
929 int client_id,
930 int qp_index,
931 int slot_in_qp,
932 const ResponseView& response,
933 std::uint64_t seq) {
934 const bool profile_enabled = FLAGS_rdma_rc_profile_interval_ms > 0;
935 const std::uint64_t start_ns = profile_enabled ? NowNs() : 0;
936 ValidateClientId(config_, client_id);
937 ValidateSlotInQp(config_, slot_in_qp);
938 Lane& lane = LaneAt(client_id, qp_index);
939 const int response_slot = ResponseSlotOrdinal(config_, client_id, slot_in_qp);
940 const int raw_lane = RawLaneForLogicalClient(config_, client_id, qp_index);
941 const int client_node_id =
942 FLAGS_num_server_processes +
943 OsClientIndexForLogicalClient(config_, client_id);
944 auto& counters = TransportProfile();
945 DrainTrackedPendingWrite(
946 lane.verbs.get(),
947 &lane.response_completion_pending,
948 &lane.response_completion_ready,
949 TrackedWrKind::kResponseStatus,
950 response_slot,
951 WriteContext(
952 config_,
953 client_id,
954 qp_index,
955 slot_in_qp,
956 seq - 1,
957 ClientResponseOffsetForRawLane(config_, raw_lane, slot_in_qp) +
958 ResponseStatusOffset(config_),
959 client_node_id,
960 "previous_response_status"),
961 profile_enabled,
962 &counters.drain_pending_response_count,
963 &counters.drain_pending_response_ns);
964 response.status->seq.store(seq, std::memory_order_release);
965 response.status->state.store(kRcSlotDone, std::memory_order_release);
966
967 const std::uint64_t response_status_offset =
968 ClientResponseOffsetForRawLane(config_, raw_lane, slot_in_qp) +
969 ResponseStatusOffset(config_);
970 lane.verbs->Write(
971 response.status,
972 GlobalAddress{
973 static_cast<std::uint16_t>(client_node_id),
974 response_status_offset,
975 },
976 sizeof(StatusWord),
977 MakeTrackedWrId(TrackedWrKind::kResponseStatus, response_slot),
978 true);
979 lane.response_completion_pending[static_cast<std::size_t>(response_slot)] = 1;
980 if (profile_enabled) {
981 auto& profile = TransportProfile();
982 profile.complete_response_count.fetch_add(1, std::memory_order_relaxed);
983 profile.response_status_write_count.fetch_add(1, std::memory_order_relaxed);
984 profile.complete_response_ns.fetch_add(
985 NowNs() - start_ns, std::memory_order_relaxed);
986 MaybeReportTransportProfile(config_, "server");
987 }
988 }
989
990 } // namespace petps
991