GCC Code Coverage Report


Directory: src/
Coverage: low: ≥ 0% medium: ≥ 75.0% high: ≥ 90.0%
Coverage Exec / Excl / Total
Lines: 88.4% 190 / 0 / 215
Functions: 93.1% 27 / 0 / 29
Branches: 43.3% 176 / 0 / 406

ps/rdma/control_plane.cc
Line Branch Exec Source
1 #include "ps/rdma/control_plane.h"
2
3 #include <grpcpp/grpcpp.h>
4
5 #include <chrono>
6 #include <cstring>
7 #include <memory>
8 #include <stdexcept>
9 #include <string>
10 #include <utility>
11
12 #include "rdma_control_plane.grpc.pb.h"
13
14 namespace petps {
15 namespace {
16
17 using recstoreps::rdma::GetMetaRequest;
18 using recstoreps::rdma::GetMetaResponse;
19 using recstoreps::rdma::ProbeRequest;
20 using recstoreps::rdma::ProbeResponse;
21 using recstoreps::rdma::PublishMetaRequest;
22 using recstoreps::rdma::PublishMetaResponse;
23 using recstoreps::rdma::PublishServerReadyRequest;
24 using recstoreps::rdma::PublishServerReadyResponse;
25 using recstoreps::rdma::RdmaControlPlane;
26 using recstoreps::rdma::WaitServerReadyRequest;
27 using recstoreps::rdma::WaitServerReadyResponse;
28 using recstoreps::rdma::WaitServerRequest;
29 using recstoreps::rdma::WaitServerResponse;
30
31 22 std::string EndpointString(const RdmaControlPlaneEndpoint& endpoint) {
32
2/4
✓ Branch 2 taken 22 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 22 times.
✗ Branch 6 not taken.
44 return endpoint.host + ":" + std::to_string(endpoint.port);
33 }
34
35 16 std::chrono::system_clock::time_point DeadlineFromNow(int timeout_ms) {
36 16 return std::chrono::system_clock::now() +
37
1/2
✓ Branch 2 taken 16 times.
✗ Branch 3 not taken.
48 std::chrono::milliseconds(timeout_ms);
38 }
39
40 4 std::string GrpcStatusText(const grpc::Status& status) {
41
1/2
✗ Branch 3 not taken.
✓ Branch 4 taken 4 times.
4 if (status.error_message().empty()) {
42 return status.error_code() == grpc::StatusCode::OK
43 ? std::string("OK")
44 : std::to_string(status.error_code());
45 }
46 4 return status.error_message();
47 }
48
49 16 void ThrowIfNotOk(const grpc::Status& status, const std::string& operation) {
50
2/2
✓ Branch 1 taken 12 times.
✓ Branch 2 taken 4 times.
16 if (status.ok()) {
51 12 return;
52 }
53
1/2
✓ Branch 1 taken 4 times.
✗ Branch 2 not taken.
4 throw std::runtime_error(
54
4/8
✓ Branch 2 taken 4 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 4 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 4 times.
✗ Branch 9 not taken.
✓ Branch 11 taken 4 times.
✗ Branch 12 not taken.
8 "control-plane " + operation + " failed: " + GrpcStatusText(status));
55 }
56
57 4 std::string EncodeMetaBytes(const RawVerbsNodeMeta& meta) {
58
1/2
✓ Branch 2 taken 4 times.
✗ Branch 3 not taken.
4 return std::string(reinterpret_cast<const char*>(&meta), sizeof(meta));
59 }
60
61 4 RawVerbsNodeMeta DecodeMetaBytes(const std::string& payload) {
62
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 4 times.
4 if (payload.size() != sizeof(RawVerbsNodeMeta)) {
63 throw std::runtime_error("invalid RawVerbsNodeMeta payload size: " +
64 std::to_string(payload.size()));
65 }
66 4 RawVerbsNodeMeta meta{};
67 4 std::memcpy(&meta, payload.data(), sizeof(meta));
68 4 return meta;
69 }
70
71 4 grpc::Status MakeDeadlineExceeded(const std::string& message) {
72 4 return grpc::Status(grpc::StatusCode::DEADLINE_EXCEEDED, message);
73 }
74
75 grpc::Status MakeUnavailable(const std::string& message) {
76 return grpc::Status(grpc::StatusCode::UNAVAILABLE, message);
77 }
78
79 } // namespace
80
81 class RdmaControlPlaneService final : public RdmaControlPlane::Service {
82 public:
83 6 explicit RdmaControlPlaneService(RdmaControlPlaneServer* owner)
84 6 : owner_(owner) {}
85
86 2 grpc::Status PublishMeta(grpc::ServerContext*,
87 const PublishMetaRequest* request,
88 PublishMetaResponse*) override {
89
2/4
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 2 times.
2 if (request->meta().size() != sizeof(RawVerbsNodeMeta)) {
90 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
91 "invalid RawVerbsNodeMeta payload size: " +
92 std::to_string(request->meta().size()));
93 }
94 const RdmaControlPlaneServer::MetaKey key{
95
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 request->publisher_node_id(),
96 4 request->publisher_lane(),
97 4 request->receiver_node_id(),
98 4 request->receiver_lane(),
99
3/6
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 2 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 2 times.
✗ Branch 8 not taken.
2 };
100
2/4
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 2 times.
✗ Branch 5 not taken.
2 const RawVerbsNodeMeta meta = DecodeMetaBytes(request->meta());
101 {
102
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 std::lock_guard<std::mutex> guard(owner_->mu_);
103
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 owner_->metadata_[key] = meta;
104 2 }
105 2 owner_->cv_.notify_all();
106
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 return grpc::Status::OK;
107 }
108
109 2 grpc::Status GetMeta(grpc::ServerContext*,
110 const GetMetaRequest* request,
111 GetMetaResponse* response) override {
112 const RdmaControlPlaneServer::MetaKey key{
113
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 request->publisher_node_id(),
114 4 request->publisher_lane(),
115 4 request->receiver_node_id(),
116 4 request->receiver_lane(),
117
3/6
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 2 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 2 times.
✗ Branch 8 not taken.
2 };
118 const int timeout_ms =
119
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 request->timeout_ms() > 0
120
2/4
✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
✓ Branch 3 taken 2 times.
✗ Branch 4 not taken.
2 ? request->timeout_ms()
121 : owner_->endpoint_.timeout_ms;
122
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 std::unique_lock<std::mutex> lock(owner_->mu_);
123 const bool ready =
124
1/2
✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
2 owner_->cv_.wait_for(lock, std::chrono::milliseconds(timeout_ms), [&] {
125
3/4
✓ Branch 1 taken 4 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 2 times.
✓ Branch 4 taken 2 times.
8 return owner_->stop_requested_.load(std::memory_order_relaxed) ||
126
1/2
✓ Branch 2 taken 4 times.
✗ Branch 3 not taken.
8 owner_->metadata_.find(key) != owner_->metadata_.end();
127 });
128
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 if (!ready) {
129 return MakeDeadlineExceeded(
130 "get_meta timeout key=" + std::to_string(key.publisher_node_id) +
131 ":" + std::to_string(key.publisher_lane) + "->" +
132 std::to_string(key.receiver_node_id) + ":" +
133 std::to_string(key.receiver_lane));
134 }
135
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2 times.
2 if (owner_->stop_requested_.load(std::memory_order_relaxed)) {
136 return MakeUnavailable("control-plane stopping");
137 }
138
2/4
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 2 times.
✗ Branch 5 not taken.
4 response->set_meta(EncodeMetaBytes(owner_->metadata_.at(key)));
139
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 return grpc::Status::OK;
140 2 }
141
142 4 grpc::Status PublishServerReady(grpc::ServerContext*,
143 const PublishServerReadyRequest* request,
144 PublishServerReadyResponse*) override {
145 {
146
1/2
✓ Branch 1 taken 4 times.
✗ Branch 2 not taken.
4 std::lock_guard<std::mutex> guard(owner_->mu_);
147
2/4
✓ Branch 1 taken 4 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 4 times.
✗ Branch 5 not taken.
4 owner_->ready_servers_.insert(request->server_id());
148 4 }
149 4 owner_->cv_.notify_all();
150 4 return grpc::Status::OK;
151 }
152
153 4 grpc::Status WaitServer(grpc::ServerContext*,
154 const WaitServerRequest* request,
155 WaitServerResponse*) override {
156 const int timeout_ms =
157
1/2
✓ Branch 1 taken 4 times.
✗ Branch 2 not taken.
4 request->timeout_ms() > 0
158
2/4
✓ Branch 0 taken 4 times.
✗ Branch 1 not taken.
✓ Branch 3 taken 4 times.
✗ Branch 4 not taken.
4 ? request->timeout_ms()
159 : owner_->endpoint_.timeout_ms;
160
1/2
✓ Branch 1 taken 4 times.
✗ Branch 2 not taken.
4 std::unique_lock<std::mutex> lock(owner_->mu_);
161 const bool ready =
162
1/2
✓ Branch 2 taken 4 times.
✗ Branch 3 not taken.
4 owner_->cv_.wait_for(lock, std::chrono::milliseconds(timeout_ms), [&] {
163
3/4
✓ Branch 1 taken 6 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 2 times.
✓ Branch 4 taken 4 times.
12 return owner_->stop_requested_.load(std::memory_order_relaxed) ||
164
2/4
✓ Branch 1 taken 6 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 6 times.
✗ Branch 5 not taken.
6 owner_->ready_servers_.find(request->server_id()) !=
165 12 owner_->ready_servers_.end();
166 });
167
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 2 times.
4 if (!ready) {
168
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
4 return MakeDeadlineExceeded("wait_server timeout server_id=" +
169
3/6
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 2 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 2 times.
✗ Branch 8 not taken.
6 std::to_string(request->server_id()));
170 }
171
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2 times.
2 if (owner_->stop_requested_.load(std::memory_order_relaxed)) {
172 return MakeUnavailable("control-plane stopping");
173 }
174
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 return grpc::Status::OK;
175 4 }
176
177 4 grpc::Status WaitServerReady(grpc::ServerContext*,
178 const WaitServerReadyRequest* request,
179 WaitServerReadyResponse*) override {
180 const int timeout_ms =
181
1/2
✓ Branch 1 taken 4 times.
✗ Branch 2 not taken.
4 request->timeout_ms() > 0
182
2/4
✓ Branch 0 taken 4 times.
✗ Branch 1 not taken.
✓ Branch 3 taken 4 times.
✗ Branch 4 not taken.
4 ? request->timeout_ms()
183 : owner_->endpoint_.timeout_ms;
184
1/2
✓ Branch 1 taken 4 times.
✗ Branch 2 not taken.
4 std::unique_lock<std::mutex> lock(owner_->mu_);
185 const bool ready =
186
1/2
✓ Branch 2 taken 4 times.
✗ Branch 3 not taken.
4 owner_->cv_.wait_for(lock, std::chrono::milliseconds(timeout_ms), [&] {
187
1/2
✓ Branch 1 taken 10 times.
✗ Branch 2 not taken.
20 return owner_->stop_requested_.load(std::memory_order_relaxed) ||
188 10 static_cast<int>(owner_->ready_servers_.size()) >=
189
2/2
✓ Branch 1 taken 2 times.
✓ Branch 2 taken 8 times.
20 request->num_servers();
190 });
191
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 2 times.
4 if (!ready) {
192 return MakeDeadlineExceeded(
193
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 "wait_server_ready timeout ready=" +
194
3/6
✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 2 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 2 times.
✗ Branch 9 not taken.
6 std::to_string(owner_->ready_servers_.size()) + "/" +
195
3/6
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 2 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 2 times.
✗ Branch 8 not taken.
6 std::to_string(request->num_servers()));
196 }
197
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2 times.
2 if (owner_->stop_requested_.load(std::memory_order_relaxed)) {
198 return MakeUnavailable("control-plane stopping");
199 }
200
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 return grpc::Status::OK;
201 4 }
202
203 grpc::Status
204 Probe(grpc::ServerContext*, const ProbeRequest*, ProbeResponse*) override {
205 if (owner_->stop_requested_.load(std::memory_order_relaxed)) {
206 return MakeUnavailable("control-plane stopping");
207 }
208 return grpc::Status::OK;
209 }
210
211 private:
212 RdmaControlPlaneServer* owner_;
213 };
214
215 6 RdmaControlPlaneClient::RdmaControlPlaneClient(
216 6 RdmaControlPlaneEndpoint endpoint)
217 6 : endpoint_(std::move(endpoint)) {}
218
219 2 void RdmaControlPlaneClient::PublishMeta(
220 int publisher_node_id,
221 int publisher_lane,
222 int receiver_node_id,
223 int receiver_lane,
224 const RawVerbsNodeMeta& meta) const {
225 auto channel = grpc::CreateChannel(
226
3/6
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 2 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 2 times.
✗ Branch 8 not taken.
4 EndpointString(endpoint_), grpc::InsecureChannelCredentials());
227
1/2
✓ Branch 3 taken 2 times.
✗ Branch 4 not taken.
2 auto stub = RdmaControlPlane::NewStub(channel);
228
229
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 PublishMetaRequest request;
230
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 request.set_publisher_node_id(publisher_node_id);
231
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 request.set_publisher_lane(publisher_lane);
232
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 request.set_receiver_node_id(receiver_node_id);
233
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 request.set_receiver_lane(receiver_lane);
234
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
4 request.set_meta(EncodeMetaBytes(meta));
235
236
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 PublishMetaResponse response;
237
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 grpc::ClientContext context;
238
2/4
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 2 times.
✗ Branch 5 not taken.
2 context.set_deadline(DeadlineFromNow(endpoint_.timeout_ms));
239
3/6
✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 2 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 2 times.
✗ Branch 10 not taken.
2 ThrowIfNotOk(stub->PublishMeta(&context, request, &response), "publish_meta");
240 2 }
241
242 2 RawVerbsNodeMeta RdmaControlPlaneClient::GetMeta(
243 int publisher_node_id,
244 int publisher_lane,
245 int receiver_node_id,
246 int receiver_lane,
247 int timeout_ms) const {
248 2 const int effective_timeout_ms =
249
1/2
✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
2 timeout_ms > 0 ? timeout_ms : endpoint_.timeout_ms;
250 auto channel = grpc::CreateChannel(
251
3/6
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 2 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 2 times.
✗ Branch 8 not taken.
4 EndpointString(endpoint_), grpc::InsecureChannelCredentials());
252
1/2
✓ Branch 3 taken 2 times.
✗ Branch 4 not taken.
2 auto stub = RdmaControlPlane::NewStub(channel);
253
254
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 GetMetaRequest request;
255
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 request.set_publisher_node_id(publisher_node_id);
256
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 request.set_publisher_lane(publisher_lane);
257
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 request.set_receiver_node_id(receiver_node_id);
258
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 request.set_receiver_lane(receiver_lane);
259
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 request.set_timeout_ms(effective_timeout_ms);
260
261
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 GetMetaResponse response;
262
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 grpc::ClientContext context;
263
2/4
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 2 times.
✗ Branch 5 not taken.
2 context.set_deadline(DeadlineFromNow(effective_timeout_ms));
264
3/6
✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 2 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 2 times.
✗ Branch 10 not taken.
2 ThrowIfNotOk(stub->GetMeta(&context, request, &response), "get_meta");
265
2/4
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 2 times.
✗ Branch 5 not taken.
4 return DecodeMetaBytes(response.meta());
266 2 }
267
268 4 void RdmaControlPlaneClient::PublishServerReady(int server_id) const {
269 auto channel = grpc::CreateChannel(
270
3/6
✓ Branch 1 taken 4 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 4 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 4 times.
✗ Branch 8 not taken.
8 EndpointString(endpoint_), grpc::InsecureChannelCredentials());
271
1/2
✓ Branch 3 taken 4 times.
✗ Branch 4 not taken.
4 auto stub = RdmaControlPlane::NewStub(channel);
272
273
1/2
✓ Branch 1 taken 4 times.
✗ Branch 2 not taken.
4 PublishServerReadyRequest request;
274
1/2
✓ Branch 1 taken 4 times.
✗ Branch 2 not taken.
4 request.set_server_id(server_id);
275
276
1/2
✓ Branch 1 taken 4 times.
✗ Branch 2 not taken.
4 PublishServerReadyResponse response;
277
1/2
✓ Branch 1 taken 4 times.
✗ Branch 2 not taken.
4 grpc::ClientContext context;
278
2/4
✓ Branch 1 taken 4 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 4 times.
✗ Branch 5 not taken.
4 context.set_deadline(DeadlineFromNow(endpoint_.timeout_ms));
279
3/6
✓ Branch 2 taken 4 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 4 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 4 times.
✗ Branch 10 not taken.
4 ThrowIfNotOk(stub->PublishServerReady(&context, request, &response),
280 "server_ready");
281 4 }
282
283 4 void RdmaControlPlaneClient::WaitServer(int server_id, int timeout_ms) const {
284 4 const int effective_timeout_ms =
285
1/2
✓ Branch 0 taken 4 times.
✗ Branch 1 not taken.
4 timeout_ms > 0 ? timeout_ms : endpoint_.timeout_ms;
286 auto channel = grpc::CreateChannel(
287
3/6
✓ Branch 1 taken 4 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 4 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 4 times.
✗ Branch 8 not taken.
8 EndpointString(endpoint_), grpc::InsecureChannelCredentials());
288
1/2
✓ Branch 3 taken 4 times.
✗ Branch 4 not taken.
4 auto stub = RdmaControlPlane::NewStub(channel);
289
290
1/2
✓ Branch 1 taken 4 times.
✗ Branch 2 not taken.
4 WaitServerRequest request;
291
1/2
✓ Branch 1 taken 4 times.
✗ Branch 2 not taken.
4 request.set_server_id(server_id);
292
1/2
✓ Branch 1 taken 4 times.
✗ Branch 2 not taken.
4 request.set_timeout_ms(effective_timeout_ms);
293
294
1/2
✓ Branch 1 taken 4 times.
✗ Branch 2 not taken.
4 WaitServerResponse response;
295
1/2
✓ Branch 1 taken 4 times.
✗ Branch 2 not taken.
4 grpc::ClientContext context;
296
2/4
✓ Branch 1 taken 4 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 4 times.
✗ Branch 5 not taken.
4 context.set_deadline(DeadlineFromNow(effective_timeout_ms));
297
4/6
✓ Branch 2 taken 4 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 4 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 2 times.
✓ Branch 10 taken 2 times.
10 ThrowIfNotOk(stub->WaitServer(&context, request, &response), "wait_server");
298 12 }
299
300 4 void RdmaControlPlaneClient::WaitServerReady(int num_servers,
301 int timeout_ms) const {
302 4 const int effective_timeout_ms =
303
1/2
✓ Branch 0 taken 4 times.
✗ Branch 1 not taken.
4 timeout_ms > 0 ? timeout_ms : endpoint_.timeout_ms;
304 auto channel = grpc::CreateChannel(
305
3/6
✓ Branch 1 taken 4 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 4 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 4 times.
✗ Branch 8 not taken.
8 EndpointString(endpoint_), grpc::InsecureChannelCredentials());
306
1/2
✓ Branch 3 taken 4 times.
✗ Branch 4 not taken.
4 auto stub = RdmaControlPlane::NewStub(channel);
307
308
1/2
✓ Branch 1 taken 4 times.
✗ Branch 2 not taken.
4 WaitServerReadyRequest request;
309
1/2
✓ Branch 1 taken 4 times.
✗ Branch 2 not taken.
4 request.set_num_servers(num_servers);
310
1/2
✓ Branch 1 taken 4 times.
✗ Branch 2 not taken.
4 request.set_timeout_ms(effective_timeout_ms);
311
312
1/2
✓ Branch 1 taken 4 times.
✗ Branch 2 not taken.
4 WaitServerReadyResponse response;
313
1/2
✓ Branch 1 taken 4 times.
✗ Branch 2 not taken.
4 grpc::ClientContext context;
314
2/4
✓ Branch 1 taken 4 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 4 times.
✗ Branch 5 not taken.
4 context.set_deadline(DeadlineFromNow(effective_timeout_ms));
315
4/6
✓ Branch 2 taken 4 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 4 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 2 times.
✓ Branch 10 taken 2 times.
10 ThrowIfNotOk(stub->WaitServerReady(&context, request, &response),
316 "wait_server_ready");
317 12 }
318
319 6 RdmaControlPlaneServer::RdmaControlPlaneServer(
320 6 RdmaControlPlaneEndpoint endpoint)
321 6 : endpoint_(std::move(endpoint)) {}
322
323 6 RdmaControlPlaneServer::~RdmaControlPlaneServer() { Stop(); }
324
325 std::size_t
326 8 RdmaControlPlaneServer::MetaKeyHash::operator()(const MetaKey& key) const {
327 8 std::size_t hash = static_cast<std::size_t>(key.publisher_node_id);
328 8 hash = hash * 1315423911u + static_cast<std::size_t>(key.publisher_lane);
329 8 hash = hash * 1315423911u + static_cast<std::size_t>(key.receiver_node_id);
330 8 hash = hash * 1315423911u + static_cast<std::size_t>(key.receiver_lane);
331 8 return hash;
332 }
333
334 6 void RdmaControlPlaneServer::Start() {
335
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 6 times.
6 if (server_ != nullptr) {
336 return;
337 }
338 6 stop_requested_.store(false, std::memory_order_relaxed);
339
1/2
✓ Branch 1 taken 6 times.
✗ Branch 2 not taken.
6 service_ = std::make_unique<RdmaControlPlaneService>(this);
340
1/2
✓ Branch 1 taken 6 times.
✗ Branch 2 not taken.
6 grpc::ServerBuilder builder;
341
1/2
✓ Branch 1 taken 6 times.
✗ Branch 2 not taken.
6 builder.AddListeningPort(
342
2/4
✓ Branch 1 taken 6 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 6 times.
✗ Branch 5 not taken.
12 EndpointString(endpoint_), grpc::InsecureServerCredentials());
343
1/2
✓ Branch 2 taken 6 times.
✗ Branch 3 not taken.
6 builder.RegisterService(service_.get());
344
1/2
✓ Branch 1 taken 6 times.
✗ Branch 2 not taken.
6 server_ = builder.BuildAndStart();
345
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 6 times.
6 if (server_ == nullptr) {
346 throw std::runtime_error("control-plane gRPC server failed to listen on " +
347 EndpointString(endpoint_));
348 }
349 6 }
350
351 12 void RdmaControlPlaneServer::Stop() {
352
2/2
✓ Branch 1 taken 6 times.
✓ Branch 2 taken 6 times.
12 if (server_ == nullptr) {
353 6 return;
354 }
355 6 stop_requested_.store(true, std::memory_order_relaxed);
356 {
357
1/2
✓ Branch 1 taken 6 times.
✗ Branch 2 not taken.
6 std::lock_guard<std::mutex> guard(mu_);
358 6 cv_.notify_all();
359 6 }
360 6 server_->Shutdown();
361 6 server_.reset();
362 6 service_.reset();
363 }
364
365 } // namespace petps
366