GCC Code Coverage Report


Directory: src/
Coverage: low: ≥ 0% medium: ≥ 75.0% high: ≥ 90.0%
Coverage Exec / Excl / Total
Lines: 0.0% 0 / 0 / 1
Functions: 0.0% 0 / 0 / 1
Branches: -% 0 / 0 / 0

ps/rdma/rc_transport.h
Line Branch Exec Source
1 #pragma once
2
3 #include <cstddef>
4 #include <cstdint>
5 #include <memory>
6 #include <mutex>
7 #include <string>
8 #include <unordered_map>
9 #include <utility>
10 #include <vector>
11
12 #include "base/array.h"
13 #include "ps/rdma/raw_verbs_transport.h"
14 #include "ps/rdma/rdma_protocol.h"
15
16 namespace petps {
17
18 struct RcTransportConfig {
19 int shard_id = 0; // Logical shard served by this transport.
20 int client_id = -1; // Logical client id for response slot selection.
21 int num_clients = 1; // Total client count expected by the server.
22 int qps_per_client_per_shard = 32; // Number of lanes per client per shard.
23 int slots_per_qp = 1; // Logical slots multiplexed on each lane.
24 std::size_t request_slot_bytes = 1 << 20; // Bytes per server request slot.
25 std::size_t response_slot_bytes = 1 << 20; // Bytes per client response slot.
26 std::string control_plane_host = "127.0.0.1";
27 int control_plane_port = 25100;
28 int control_plane_timeout_ms = 30000;
29 std::string namespace_token =
30 "default"; // Shared-memory namespace for this run.
31 };
32
33 struct RcClientQpView {
34 int qp_index = 0; // Lane index local to the client.
35 int slot_in_qp = 0; // Logical slot index within the lane.
36 int slot_index = 0; // Global request slot index on the server.
37 void* request_slot = nullptr; // Base address of the request slot.
38 RequestDescriptor* descriptor = nullptr;
39 char* payload = nullptr; // Request payload region after descriptor.
40 CommitWord* commit = nullptr; // Commit word at end of request slot.
41 void* response_slot = nullptr; // Base address of the client response slot.
42 char* response_payload = nullptr; // Response payload region before status.
43 StatusWord* status = nullptr; // Status word at end of response slot.
44 };
45
46 class RcShardClientTransport {
47 public:
48 explicit RcShardClientTransport(const RcTransportConfig& config);
49 ~RcShardClientTransport();
50
51 RcShardClientTransport(const RcShardClientTransport&) = delete;
52 RcShardClientTransport& operator=(const RcShardClientTransport&) = delete;
53
54 RcClientQpView OpenQp(int qp_index);
55 RcClientQpView OpenSlot(int qp_index, int slot_in_qp);
56 void SubmitRequest(const RcClientQpView& view,
57 const RequestDescriptor& descriptor,
58 const void* payload,
59 std::size_t payload_bytes);
60 void ClearRequestSlot(const RcClientQpView& view);
61 std::size_t request_slot_bytes() const { return config_.request_slot_bytes; }
62 std::size_t response_slot_bytes() const {
63 return config_.response_slot_bytes;
64 }
65 const RcTransportConfig& config() const { return config_; }
66
67 private:
68 struct Lane {
69 std::unique_ptr<RawVerbsTransport>
70 verbs; // RC QP and registered memory for this lane.
71 void* lane_base =
72 nullptr; // Local registered region for all slots in this lane.
73 std::vector<std::uint8_t> submit_completion_pending;
74 std::vector<std::uint8_t> submit_completion_ready;
75 };
76
77 Lane& LaneAt(int qp_index);
78 const Lane& LaneAt(int qp_index) const;
79
80 RcTransportConfig config_;
81 int server_node_id_ = 0; // Global node id of the target shard server.
82 std::vector<Lane> lanes_;
83 };
84
85 class RcShardServerTransport {
86 public:
87 explicit RcShardServerTransport(const RcTransportConfig& config);
88 ~RcShardServerTransport();
89
90 RcShardServerTransport(const RcShardServerTransport&) = delete;
91 RcShardServerTransport& operator=(const RcShardServerTransport&) = delete;
92
93 int TotalSlots() const;
94 void RegisterLocalMemoryRegion(void* base, std::size_t bytes);
95 int SlotIndex(int client_id, int qp_index, int slot_in_qp) const;
96 void DecodeSlotIndex(
97 int slot_index, int* client_id, int* qp_index, int* slot_in_qp) const;
98 void* RequestSlot(int slot_index) const;
99 RequestDescriptor* RequestDescriptorAt(int slot_index) const;
100 char* RequestPayloadAt(int slot_index) const;
101 CommitWord* RequestCommitAt(int slot_index) const;
102
103 struct ResponseView {
104 void* slot = nullptr; // Base address of the client response slot.
105 char* payload = nullptr; // Response payload region.
106 StatusWord* status = nullptr; // Final completion word for this response.
107 };
108
109 ResponseView OpenClientResponse(int client_id, int qp_index, int slot_in_qp);
110 void CompleteResponse(int client_id,
111 int qp_index,
112 int slot_in_qp,
113 const ResponseView& response,
114 std::uint64_t seq);
115 void WriteResponsePayloadSg(
116 int client_id,
117 int qp_index,
118 int slot_in_qp,
119 base::ConstArray<RawVerbsSge> sges,
120 std::uint64_t response_offset,
121 std::uint64_t bytes);
122 void CompleteResponseStatusOnly(
123 int client_id,
124 int qp_index,
125 int slot_in_qp,
126 const ResponseView& response,
127 std::uint64_t seq);
128 const RcTransportConfig& config() const { return config_; }
129
130 private:
131 struct Lane {
132 std::unique_ptr<RawVerbsTransport>
133 verbs; // RC QP and registered memory for this lane.
134 void* request_slots = nullptr; // Local registered slots for all clients.
135 std::vector<void*> response_staging; // Per-client-per-slot registered
136 // response staging slots.
137 std::vector<std::uint8_t> response_completion_pending;
138 std::vector<std::uint8_t> response_completion_ready;
139 };
140
141 Lane& LaneAt(int client_id, int qp_index);
142 const Lane& LaneAt(int client_id, int qp_index) const;
143
144 RcTransportConfig config_; // Transport sizing and namespace config.
145 std::vector<Lane> lanes_; // One verbs RC lane per qp_index.
146 };
147
148 } // namespace petps
149