ps/rdma/rc_transport.h
| Line | Branch | Exec | Source |
|---|---|---|---|
| 1 | #pragma once | ||
| 2 | |||
| 3 | #include <cstddef> | ||
| 4 | #include <cstdint> | ||
| 5 | #include <memory> | ||
| 6 | #include <mutex> | ||
| 7 | #include <string> | ||
| 8 | #include <unordered_map> | ||
| 9 | #include <utility> | ||
| 10 | #include <vector> | ||
| 11 | |||
| 12 | #include "base/array.h" | ||
| 13 | #include "ps/rdma/raw_verbs_transport.h" | ||
| 14 | #include "ps/rdma/rdma_protocol.h" | ||
| 15 | |||
| 16 | namespace petps { | ||
| 17 | |||
| 18 | struct RcTransportConfig { | ||
| 19 | int shard_id = 0; // Logical shard served by this transport. | ||
| 20 | int client_id = -1; // Logical client id for response slot selection. | ||
| 21 | int num_clients = 1; // Total client count expected by the server. | ||
| 22 | int qps_per_client_per_shard = 32; // Number of lanes per client per shard. | ||
| 23 | int slots_per_qp = 1; // Logical slots multiplexed on each lane. | ||
| 24 | std::size_t request_slot_bytes = 1 << 20; // Bytes per server request slot. | ||
| 25 | std::size_t response_slot_bytes = 1 << 20; // Bytes per client response slot. | ||
| 26 | std::string control_plane_host = "127.0.0.1"; | ||
| 27 | int control_plane_port = 25100; | ||
| 28 | int control_plane_timeout_ms = 30000; | ||
| 29 | std::string namespace_token = | ||
| 30 | "default"; // Shared-memory namespace for this run. | ||
| 31 | }; | ||
| 32 | |||
| 33 | struct RcClientQpView { | ||
| 34 | int qp_index = 0; // Lane index local to the client. | ||
| 35 | int slot_in_qp = 0; // Logical slot index within the lane. | ||
| 36 | int slot_index = 0; // Global request slot index on the server. | ||
| 37 | void* request_slot = nullptr; // Base address of the request slot. | ||
| 38 | RequestDescriptor* descriptor = nullptr; | ||
| 39 | char* payload = nullptr; // Request payload region after descriptor. | ||
| 40 | CommitWord* commit = nullptr; // Commit word at end of request slot. | ||
| 41 | void* response_slot = nullptr; // Base address of the client response slot. | ||
| 42 | char* response_payload = nullptr; // Response payload region before status. | ||
| 43 | StatusWord* status = nullptr; // Status word at end of response slot. | ||
| 44 | }; | ||
| 45 | |||
| 46 | class RcShardClientTransport { | ||
| 47 | public: | ||
| 48 | explicit RcShardClientTransport(const RcTransportConfig& config); | ||
| 49 | ~RcShardClientTransport(); | ||
| 50 | |||
| 51 | RcShardClientTransport(const RcShardClientTransport&) = delete; | ||
| 52 | RcShardClientTransport& operator=(const RcShardClientTransport&) = delete; | ||
| 53 | |||
| 54 | RcClientQpView OpenQp(int qp_index); | ||
| 55 | RcClientQpView OpenSlot(int qp_index, int slot_in_qp); | ||
| 56 | void SubmitRequest(const RcClientQpView& view, | ||
| 57 | const RequestDescriptor& descriptor, | ||
| 58 | const void* payload, | ||
| 59 | std::size_t payload_bytes); | ||
| 60 | void ClearRequestSlot(const RcClientQpView& view); | ||
| 61 | std::size_t request_slot_bytes() const { return config_.request_slot_bytes; } | ||
| 62 | std::size_t response_slot_bytes() const { | ||
| 63 | return config_.response_slot_bytes; | ||
| 64 | } | ||
| 65 | const RcTransportConfig& config() const { return config_; } | ||
| 66 | |||
| 67 | private: | ||
| 68 | struct Lane { | ||
| 69 | std::unique_ptr<RawVerbsTransport> | ||
| 70 | verbs; // RC QP and registered memory for this lane. | ||
| 71 | void* lane_base = | ||
| 72 | nullptr; // Local registered region for all slots in this lane. | ||
| 73 | std::vector<std::uint8_t> submit_completion_pending; | ||
| 74 | std::vector<std::uint8_t> submit_completion_ready; | ||
| 75 | }; | ||
| 76 | |||
| 77 | Lane& LaneAt(int qp_index); | ||
| 78 | const Lane& LaneAt(int qp_index) const; | ||
| 79 | |||
| 80 | RcTransportConfig config_; | ||
| 81 | int server_node_id_ = 0; // Global node id of the target shard server. | ||
| 82 | std::vector<Lane> lanes_; | ||
| 83 | }; | ||
| 84 | |||
| 85 | class RcShardServerTransport { | ||
| 86 | public: | ||
| 87 | explicit RcShardServerTransport(const RcTransportConfig& config); | ||
| 88 | ~RcShardServerTransport(); | ||
| 89 | |||
| 90 | RcShardServerTransport(const RcShardServerTransport&) = delete; | ||
| 91 | RcShardServerTransport& operator=(const RcShardServerTransport&) = delete; | ||
| 92 | |||
| 93 | int TotalSlots() const; | ||
| 94 | void RegisterLocalMemoryRegion(void* base, std::size_t bytes); | ||
| 95 | int SlotIndex(int client_id, int qp_index, int slot_in_qp) const; | ||
| 96 | void DecodeSlotIndex( | ||
| 97 | int slot_index, int* client_id, int* qp_index, int* slot_in_qp) const; | ||
| 98 | void* RequestSlot(int slot_index) const; | ||
| 99 | RequestDescriptor* RequestDescriptorAt(int slot_index) const; | ||
| 100 | char* RequestPayloadAt(int slot_index) const; | ||
| 101 | CommitWord* RequestCommitAt(int slot_index) const; | ||
| 102 | |||
| 103 | struct ResponseView { | ||
| 104 | void* slot = nullptr; // Base address of the client response slot. | ||
| 105 | char* payload = nullptr; // Response payload region. | ||
| 106 | StatusWord* status = nullptr; // Final completion word for this response. | ||
| 107 | }; | ||
| 108 | |||
| 109 | ResponseView OpenClientResponse(int client_id, int qp_index, int slot_in_qp); | ||
| 110 | void CompleteResponse(int client_id, | ||
| 111 | int qp_index, | ||
| 112 | int slot_in_qp, | ||
| 113 | const ResponseView& response, | ||
| 114 | std::uint64_t seq); | ||
| 115 | void WriteResponsePayloadSg( | ||
| 116 | int client_id, | ||
| 117 | int qp_index, | ||
| 118 | int slot_in_qp, | ||
| 119 | base::ConstArray<RawVerbsSge> sges, | ||
| 120 | std::uint64_t response_offset, | ||
| 121 | std::uint64_t bytes); | ||
| 122 | void CompleteResponseStatusOnly( | ||
| 123 | int client_id, | ||
| 124 | int qp_index, | ||
| 125 | int slot_in_qp, | ||
| 126 | const ResponseView& response, | ||
| 127 | std::uint64_t seq); | ||
| 128 | ✗ | const RcTransportConfig& config() const { return config_; } | |
| 129 | |||
| 130 | private: | ||
| 131 | struct Lane { | ||
| 132 | std::unique_ptr<RawVerbsTransport> | ||
| 133 | verbs; // RC QP and registered memory for this lane. | ||
| 134 | void* request_slots = nullptr; // Local registered slots for all clients. | ||
| 135 | std::vector<void*> response_staging; // Per-client-per-slot registered | ||
| 136 | // response staging slots. | ||
| 137 | std::vector<std::uint8_t> response_completion_pending; | ||
| 138 | std::vector<std::uint8_t> response_completion_ready; | ||
| 139 | }; | ||
| 140 | |||
| 141 | Lane& LaneAt(int client_id, int qp_index); | ||
| 142 | const Lane& LaneAt(int client_id, int qp_index) const; | ||
| 143 | |||
| 144 | RcTransportConfig config_; // Transport sizing and namespace config. | ||
| 145 | std::vector<Lane> lanes_; // One verbs RC lane per qp_index. | ||
| 146 | }; | ||
| 147 | |||
| 148 | } // namespace petps | ||
| 149 |