GCC Code Coverage Report


Directory: src/
Coverage: low: ≥ 0% medium: ≥ 75.0% high: ≥ 90.0%
Coverage Exec / Excl / Total
Lines: 0.4% 1 / 0 / 262
Functions: 3.2% 1 / 0 / 31
Branches: 0.0% 0 / 0 / 446

storage/io_backend/spdk/spdk_backend.cpp
Line Branch Exec Source
1 #include "../io_backend.h"
2 #include <algorithm>
3 #include <atomic>
4 #include <fcntl.h>
5 #include <fmt/core.h>
6 #include "spdk/env.h"
7 #include "spdk/nvme.h"
8 #include "spdk/env.h"
9 #include "spdk/nvme.h"
10 #include "base/factory.h"
11 #include "storage/kv_engine/base_kv.h"
12
13 static const char* pcie_address = "0000:c2:00.0";
14
15 class SpdkBackend : public IOBackend {
16 private:
17 inline static std::atomic<bool> controller_active_{false};
18 inline static std::mutex env_mutex_;
19 inline static bool env_initialized_ = false;
20 struct ThreadQpair;
21 inline static std::mutex thread_qpair_mutex_;
22 inline static std::vector<ThreadQpair*> active_thread_qpairs_;
23 spdk_env_opts opts;
24 struct spdk_nvme_transport_id trid = {};
25 struct ns_entry {
26 struct spdk_nvme_ctrlr* ctrlr;
27 struct spdk_nvme_ns* ns;
28 } ns_entry;
29 inline static struct ns_entry shared_ns_entry_ = {};
30 inline static char* shared_empty_page_ = nullptr;
31 inline static std::atomic<int> instance_count_{0};
32
33 struct ThreadQpair {
34 struct spdk_nvme_qpair* qpair = NULL;
35 bool initialized = false;
36 bool registered = false;
37 ThreadQpair() = default;
38 ~ThreadQpair() {
39 if (registered)
40 SpdkBackend::unregister_thread_qpair(this);
41 if (SpdkBackend::controller_active_.load(std::memory_order_acquire))
42 release();
43 }
44
45 static ThreadQpair& instance() {
46 static thread_local ThreadQpair thread_qpair;
47 return thread_qpair;
48 }
49
50 struct spdk_nvme_qpair* get(struct spdk_nvme_ctrlr* ctrlr, int queue_size) {
51 if (!initialized) {
52 CHECK_NE(ctrlr, nullptr) << "No NVMe controller";
53 struct spdk_nvme_io_qpair_opts opts;
54 spdk_nvme_ctrlr_get_default_io_qpair_opts(ctrlr, &opts, sizeof(opts));
55 opts.io_queue_size = queue_size + 1; // 你想要的队列深度
56 opts.io_queue_requests = queue_size + 1; // 一般和队列深度一致
57 qpair = spdk_nvme_ctrlr_alloc_io_qpair(ctrlr, &opts, sizeof(opts));
58 CHECK_NE(qpair, nullptr) << "Failed to allocate IO qpair";
59 initialized = true;
60 SpdkBackend::register_thread_qpair(this);
61 }
62 return qpair;
63 }
64
65 void release() {
66 if (initialized && qpair != nullptr) {
67 spdk_nvme_ctrlr_free_io_qpair(qpair);
68 qpair = nullptr;
69 initialized = false;
70 }
71 }
72 };
73
74 struct spdk_nvme_qpair* get_thread_qpair() {
75 ThreadQpair& thread_qpair = ThreadQpair::instance();
76 return thread_qpair.get(ns_entry.ctrlr, queue_cnt);
77 }
78
79 static void register_thread_qpair(ThreadQpair* thread_qpair) {
80 std::lock_guard<std::mutex> lock(thread_qpair_mutex_);
81 if (!thread_qpair->registered) {
82 active_thread_qpairs_.push_back(thread_qpair);
83 thread_qpair->registered = true;
84 }
85 }
86
87 static void unregister_thread_qpair(ThreadQpair* thread_qpair) {
88 std::lock_guard<std::mutex> lock(thread_qpair_mutex_);
89 auto it = std::find(active_thread_qpairs_.begin(),
90 active_thread_qpairs_.end(),
91 thread_qpair);
92 if (it != active_thread_qpairs_.end())
93 active_thread_qpairs_.erase(it);
94 thread_qpair->registered = false;
95 }
96
97 static void release_all_thread_qpairs() {
98 std::vector<ThreadQpair*> to_release;
99 {
100 std::lock_guard<std::mutex> lock(thread_qpair_mutex_);
101 to_release.swap(active_thread_qpairs_);
102 for (ThreadQpair* thread_qpair : to_release)
103 thread_qpair->registered = false;
104 }
105 for (ThreadQpair* thread_qpair : to_release)
106 thread_qpair->release();
107 }
108
109 static bool probe_cb(void* cb_ctx,
110 const struct spdk_nvme_transport_id* trid,
111 struct spdk_nvme_ctrlr_opts* opts) {
112 if (strcmp(pcie_address, trid->traddr) != 0)
113 return false;
114 LOG(INFO) << "Attaching to " << trid->traddr;
115 return true;
116 }
117
118 static void attach_cb(void* cb_ctx,
119 const struct spdk_nvme_transport_id* trid,
120 struct spdk_nvme_ctrlr* ctrlr,
121 const struct spdk_nvme_ctrlr_opts* opts) {
122 SpdkBackend* ptr = (SpdkBackend*)(cb_ctx);
123 LOG(INFO) << fmt::format("Attached to {}", trid->traddr);
124 for (uint32_t i = spdk_nvme_ctrlr_get_first_active_ns(ctrlr); i != 0;
125 i = spdk_nvme_ctrlr_get_next_active_ns(ctrlr, i)) {
126 struct spdk_nvme_ns* ns = spdk_nvme_ctrlr_get_ns(ctrlr, i);
127 if (ns && spdk_nvme_ns_is_active(ns)) {
128 ptr->ns_entry.ctrlr = ctrlr;
129 ptr->ns_entry.ns = ns;
130 LOG(INFO) << fmt::format(
131 "Using NS {} size {}\n",
132 i,
133 (unsigned long)spdk_nvme_ns_get_size(ptr->ns_entry.ns));
134 break;
135 }
136 }
137 }
138
139 static void io_complete(void* arg, const struct spdk_nvme_cpl* cpl) {
140 int64_t t = (int64_t)(intptr_t)arg;
141 if (!spdk_nvme_cpl_is_success(cpl))
142 fprintf(stderr, "I/O error!\n");
143 pending--;
144 if (t >= 0)
145 (*coros[t])();
146 }
147
148 static void batch_write_complete(void* arg, const struct spdk_nvme_cpl* cpl) {
149 int* inflight = reinterpret_cast<int*>(arg);
150 if (!spdk_nvme_cpl_is_success(cpl))
151 LOG(ERROR) << "BatchWritePages: I/O error!";
152 (*inflight)--;
153 }
154
155 public:
156 SpdkBackend(const BaseKVConfig& config) : IOBackend(config){};
157 ~SpdkBackend() {
158 if (controller_active_.load(std::memory_order_acquire)) {
159 int remaining =
160 instance_count_.fetch_sub(1, std::memory_order_acq_rel) - 1;
161 if (remaining == 0) {
162 release_all_thread_qpairs();
163 controller_active_.store(false, std::memory_order_release);
164 }
165 }
166 }
167 void init() override {
168 LOG(INFO) << "init spdk backend";
169 std::lock_guard<std::mutex> lock(env_mutex_);
170 if (!env_initialized_) {
171 LOG(INFO) << "Initializing NVMe Controllers";
172 opts = {};
173 opts.opts_size = sizeof(spdk_env_opts);
174 spdk_env_opts_init(&opts);
175 trid = {};
176 spdk_nvme_trid_populate_transport(&trid, SPDK_NVME_TRANSPORT_PCIE);
177 snprintf(trid.subnqn, sizeof(trid.subnqn), "%s", SPDK_NVMF_DISCOVERY_NQN);
178 LOG(INFO) << "pcie_address: " << pcie_address;
179 LOG(INFO) << "sizeof(trid.traddr): " << sizeof(trid.traddr);
180 snprintf(trid.traddr, sizeof(trid.traddr), "%s", pcie_address);
181 LOG(INFO) << "opts.iova_mode: " << opts.iova_mode;
182 LOG(INFO) << "trid.traddr: " << trid.traddr;
183 LOG(INFO) << "trid.subnqn: " << trid.subnqn;
184 LOG(INFO) << "sizeof(spdk_env_opts): " << sizeof(spdk_env_opts);
185
186 CHECK(spdk_env_init(&opts) >= 0) << "Unable to initialize SPDK env\n";
187 CHECK_EQ(spdk_nvme_probe(&trid, this, probe_cb, attach_cb, NULL), 0)
188 << "Failed to probe NVMe device";
189 CHECK_NE(ns_entry.ns, nullptr) << "Namespace is not initialized";
190 CHECK_NE(ns_entry.ctrlr, nullptr) << "Controller is not initialized";
191 shared_ns_entry_ = ns_entry;
192 shared_empty_page_ = (char*)spdk_zmalloc(
193 PAGE_SIZE, 64, NULL, SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
194 CHECK_NE(shared_empty_page_, nullptr) << "Failed to allocate empty page";
195 empty_page = shared_empty_page_;
196 controller_active_.store(true, std::memory_order_release);
197 env_initialized_ = true;
198 LOG(INFO) << "env initialized successfully" << std::endl;
199 } else {
200 ns_entry = shared_ns_entry_;
201 empty_page = shared_empty_page_;
202 CHECK_NE(empty_page, nullptr) << "Shared empty page is not initialized";
203 controller_active_.store(true, std::memory_order_release);
204 LOG(INFO) << "env initialized successfully" << std::endl;
205 }
206 instance_count_.fetch_add(1, std::memory_order_acq_rel);
207 }
208
209 void* GetPage(coroutine<void>::push_type& sink,
210 uint64_t index,
211 PageID_t page_id) override {
212 char* buffer = (char*)spdk_zmalloc(
213 PAGE_SIZE, 64, NULL, SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
214 CHECK_NE(buffer, nullptr) << "Failed to allocate memory for page";
215 ReadPage(sink, index, page_id, buffer);
216 return reinterpret_cast<void*>(buffer);
217 }
218 void* GetPage(PageID_t page_id) override {
219 char* buffer = (char*)spdk_zmalloc(
220 PAGE_SIZE, 64, NULL, SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
221 CHECK_NE(buffer, nullptr) << "Failed to allocate memory for page";
222 ReadPage(page_id, buffer);
223 return reinterpret_cast<void*>(buffer);
224 }
225
226 char* AllocateBuffer() override {
227 char* buf = (char*)spdk_zmalloc(
228 PAGE_SIZE, 64, NULL, SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA);
229 CHECK_NE(buf, nullptr) << "Failed to allocate DMA buffer";
230 return buf;
231 }
232 char* AllocateBuffer(uint64_t page_count) override {
233 char* buf = (char*)spdk_zmalloc(
234 PAGE_SIZE * page_count,
235 64,
236 NULL,
237 SPDK_ENV_SOCKET_ID_ANY,
238 SPDK_MALLOC_DMA);
239 CHECK_NE(buf, nullptr) << "Failed to allocate DMA buffer (" << page_count
240 << " pages)";
241 return buf;
242 }
243 void FreeBuffer(char* buf) override { spdk_free(buf); }
244
245 // Unpin a page, if dirty, write it back
246 void Unpin(coroutine<void>::push_type& sink,
247 uint64_t index,
248 PageID_t page_id,
249 void* page_data,
250 bool is_dirty) override {
251 if (is_dirty)
252 WritePage(sink, index, page_id, reinterpret_cast<char*>(page_data));
253 spdk_free(page_data);
254 }
255 void Unpin(PageID_t page_id, void* page_data, bool is_dirty) override {
256 if (is_dirty)
257 WritePage(page_id, reinterpret_cast<char*>(page_data));
258 spdk_free(page_data);
259 }
260
261 void ReadPageAsync(coroutine<void>::push_type& sink,
262 uint64_t index,
263 PageID_t page_id,
264 char* buffer) override {
265 struct spdk_nvme_qpair* qpair = get_thread_qpair();
266 pending++;
267 int ret = spdk_nvme_ns_cmd_read(
268 ns_entry.ns,
269 qpair,
270 buffer,
271 page_id * (PAGE_SIZE / spdk_nvme_ns_get_sector_size(ns_entry.ns)),
272 PAGE_SIZE / spdk_nvme_ns_get_sector_size(ns_entry.ns),
273 io_complete,
274 (void*)index,
275 0);
276 if (ret != 0) {
277 pending--;
278 throw std::runtime_error("Failed to read page:" + std::to_string(ret));
279 }
280 sink();
281 }
282 void ReadPageSync(PageID_t page_id, char* buffer) override {
283 struct spdk_nvme_qpair* qpair = get_thread_qpair();
284 pending++;
285 int ret = spdk_nvme_ns_cmd_read(
286 ns_entry.ns,
287 qpair,
288 buffer,
289 page_id * (PAGE_SIZE / spdk_nvme_ns_get_sector_size(ns_entry.ns)),
290 PAGE_SIZE / spdk_nvme_ns_get_sector_size(ns_entry.ns),
291 io_complete,
292 (void*)(-1),
293 0);
294 if (ret != 0) {
295 pending--;
296 throw std::runtime_error("Failed to read page:" + std::to_string(ret));
297 }
298 while (pending > 0) {
299 spdk_nvme_qpair_process_completions(qpair, 0);
300 }
301 }
302
303 void WritePageAsync(coroutine<void>::push_type& sink,
304 uint64_t index,
305 PageID_t page_id,
306 char* buffer) override {
307 struct spdk_nvme_qpair* qpair = get_thread_qpair();
308 pending++;
309 int ret = spdk_nvme_ns_cmd_write(
310 ns_entry.ns,
311 qpair,
312 buffer,
313 page_id * (PAGE_SIZE / spdk_nvme_ns_get_sector_size(ns_entry.ns)),
314 PAGE_SIZE / spdk_nvme_ns_get_sector_size(ns_entry.ns),
315 io_complete,
316 (void*)index,
317 0);
318 if (ret != 0) {
319 pending--;
320 throw std::runtime_error("Failed to write page:" + std::to_string(ret));
321 }
322 sink();
323 }
324 void WritePageSync(PageID_t page_id, char* buffer) override {
325 // LOG(INFO) << "WritePageSync: page_id=" << page_id
326 // << " qpair_initialized=" << ThreadQpair::instance().initialized
327 // << " pending=" << pending
328 // << " thread=" << std::this_thread::get_id();
329 struct spdk_nvme_qpair* qpair = get_thread_qpair();
330 // LOG(INFO) << "WritePageSync: qpair=" << (void*)qpair;
331 pending++;
332 int ret = spdk_nvme_ns_cmd_write(
333 ns_entry.ns,
334 qpair,
335 buffer,
336 page_id * (PAGE_SIZE / spdk_nvme_ns_get_sector_size(ns_entry.ns)),
337 PAGE_SIZE / spdk_nvme_ns_get_sector_size(ns_entry.ns),
338 io_complete,
339 (void*)(-1),
340 0);
341 // LOG(INFO) << "WritePageSync: cmd_write ret=" << ret;
342 if (ret != 0) {
343 pending--;
344 throw std::runtime_error("Failed to write page:" + std::to_string(ret));
345 }
346 int poll_count = 0;
347 while (pending > 0) {
348 int n = spdk_nvme_qpair_process_completions(qpair, 0);
349 poll_count++;
350 if (poll_count % 1000000 == 0)
351 LOG(WARNING) << "WritePageSync: stuck polling, pending=" << pending
352 << " poll_count=" << poll_count << " completions=" << n;
353 }
354 }
355
356 void submit() override {
357 // SPDK submits requests during spdk_nvme_ns_cmd_{read,write} calls.
358 }
359
360 void PollCompletion() override {
361 struct spdk_nvme_qpair* qpair = get_thread_qpair();
362 spdk_nvme_qpair_process_completions(qpair, 0);
363 }
364
365 void BatchWritePages(const std::vector<IOEntry>& entries) override {
366 if (entries.empty())
367 return;
368 struct spdk_nvme_qpair* qpair = get_thread_qpair();
369 int max_inflight = std::max(queue_cnt / 2, 1);
370 size_t submitted = 0;
371 int inflight = 0;
372 uint32_t sectors_per_page =
373 PAGE_SIZE / spdk_nvme_ns_get_sector_size(ns_entry.ns);
374
375 while (submitted < entries.size() || inflight > 0) {
376 while (submitted < entries.size() && inflight < max_inflight) {
377 auto& e = entries[submitted];
378 int ret = spdk_nvme_ns_cmd_write(
379 ns_entry.ns,
380 qpair,
381 e.buffer,
382 e.page_id * sectors_per_page,
383 e.page_count * sectors_per_page,
384 batch_write_complete,
385 &inflight,
386 0);
387 if (ret != 0)
388 throw std::runtime_error(
389 "BatchWritePages: write failed: " + std::to_string(ret));
390 inflight++;
391 submitted++;
392 }
393 spdk_nvme_qpair_process_completions(qpair, 0);
394 }
395 }
396
397 void BatchReadPages(const std::vector<IOEntry>& entries) override {
398 if (entries.empty())
399 return;
400 struct spdk_nvme_qpair* qpair = get_thread_qpair();
401 int max_inflight = std::max(queue_cnt / 2, 1);
402 size_t submitted = 0;
403 int inflight = 0;
404 uint32_t sectors_per_page =
405 PAGE_SIZE / spdk_nvme_ns_get_sector_size(ns_entry.ns);
406
407 while (submitted < entries.size() || inflight > 0) {
408 while (submitted < entries.size() && inflight < max_inflight) {
409 auto& e = entries[submitted];
410 int ret = spdk_nvme_ns_cmd_read(
411 ns_entry.ns,
412 qpair,
413 e.buffer,
414 e.page_id * sectors_per_page,
415 e.page_count * sectors_per_page,
416 batch_write_complete,
417 &inflight,
418 0);
419 if (ret != 0)
420 throw std::runtime_error(
421 "BatchReadPages: read failed: " + std::to_string(ret));
422 inflight++;
423 submitted++;
424 }
425 spdk_nvme_qpair_process_completions(qpair, 0);
426 }
427 }
428 };
429
430 10 extern "C" void RecStoreForceLinkSpdkBackend() {}
431
432 FACTORY_REGISTER(IOBackend, SPDK, SpdkBackend, const BaseKVConfig&);
433