storage/io_backend/spdk/spdk_backend.cpp
| Line | Branch | Exec | Source |
|---|---|---|---|
| 1 | #include "../io_backend.h" | ||
| 2 | #include <algorithm> | ||
| 3 | #include <atomic> | ||
| 4 | #include <fcntl.h> | ||
| 5 | #include <fmt/core.h> | ||
| 6 | #include "spdk/env.h" | ||
| 7 | #include "spdk/nvme.h" | ||
| 8 | #include "spdk/env.h" | ||
| 9 | #include "spdk/nvme.h" | ||
| 10 | #include "base/factory.h" | ||
| 11 | #include "storage/kv_engine/base_kv.h" | ||
| 12 | |||
| 13 | static const char* pcie_address = "0000:c2:00.0"; | ||
| 14 | |||
| 15 | class SpdkBackend : public IOBackend { | ||
| 16 | private: | ||
| 17 | inline static std::atomic<bool> controller_active_{false}; | ||
| 18 | inline static std::mutex env_mutex_; | ||
| 19 | inline static bool env_initialized_ = false; | ||
| 20 | struct ThreadQpair; | ||
| 21 | inline static std::mutex thread_qpair_mutex_; | ||
| 22 | inline static std::vector<ThreadQpair*> active_thread_qpairs_; | ||
| 23 | spdk_env_opts opts; | ||
| 24 | struct spdk_nvme_transport_id trid = {}; | ||
| 25 | struct ns_entry { | ||
| 26 | struct spdk_nvme_ctrlr* ctrlr; | ||
| 27 | struct spdk_nvme_ns* ns; | ||
| 28 | } ns_entry; | ||
| 29 | inline static struct ns_entry shared_ns_entry_ = {}; | ||
| 30 | inline static char* shared_empty_page_ = nullptr; | ||
| 31 | inline static std::atomic<int> instance_count_{0}; | ||
| 32 | |||
| 33 | struct ThreadQpair { | ||
| 34 | struct spdk_nvme_qpair* qpair = NULL; | ||
| 35 | bool initialized = false; | ||
| 36 | bool registered = false; | ||
| 37 | ThreadQpair() = default; | ||
| 38 | ✗ | ~ThreadQpair() { | |
| 39 | ✗ | if (registered) | |
| 40 | ✗ | SpdkBackend::unregister_thread_qpair(this); | |
| 41 | ✗ | if (SpdkBackend::controller_active_.load(std::memory_order_acquire)) | |
| 42 | ✗ | release(); | |
| 43 | ✗ | } | |
| 44 | |||
| 45 | ✗ | static ThreadQpair& instance() { | |
| 46 | ✗ | static thread_local ThreadQpair thread_qpair; | |
| 47 | ✗ | return thread_qpair; | |
| 48 | } | ||
| 49 | |||
| 50 | ✗ | struct spdk_nvme_qpair* get(struct spdk_nvme_ctrlr* ctrlr, int queue_size) { | |
| 51 | ✗ | if (!initialized) { | |
| 52 | ✗ | CHECK_NE(ctrlr, nullptr) << "No NVMe controller"; | |
| 53 | struct spdk_nvme_io_qpair_opts opts; | ||
| 54 | ✗ | spdk_nvme_ctrlr_get_default_io_qpair_opts(ctrlr, &opts, sizeof(opts)); | |
| 55 | ✗ | opts.io_queue_size = queue_size + 1; // 你想要的队列深度 | |
| 56 | ✗ | opts.io_queue_requests = queue_size + 1; // 一般和队列深度一致 | |
| 57 | ✗ | qpair = spdk_nvme_ctrlr_alloc_io_qpair(ctrlr, &opts, sizeof(opts)); | |
| 58 | ✗ | CHECK_NE(qpair, nullptr) << "Failed to allocate IO qpair"; | |
| 59 | ✗ | initialized = true; | |
| 60 | ✗ | SpdkBackend::register_thread_qpair(this); | |
| 61 | } | ||
| 62 | ✗ | return qpair; | |
| 63 | } | ||
| 64 | |||
| 65 | ✗ | void release() { | |
| 66 | ✗ | if (initialized && qpair != nullptr) { | |
| 67 | ✗ | spdk_nvme_ctrlr_free_io_qpair(qpair); | |
| 68 | ✗ | qpair = nullptr; | |
| 69 | ✗ | initialized = false; | |
| 70 | } | ||
| 71 | ✗ | } | |
| 72 | }; | ||
| 73 | |||
| 74 | ✗ | struct spdk_nvme_qpair* get_thread_qpair() { | |
| 75 | ✗ | ThreadQpair& thread_qpair = ThreadQpair::instance(); | |
| 76 | ✗ | return thread_qpair.get(ns_entry.ctrlr, queue_cnt); | |
| 77 | } | ||
| 78 | |||
| 79 | ✗ | static void register_thread_qpair(ThreadQpair* thread_qpair) { | |
| 80 | ✗ | std::lock_guard<std::mutex> lock(thread_qpair_mutex_); | |
| 81 | ✗ | if (!thread_qpair->registered) { | |
| 82 | ✗ | active_thread_qpairs_.push_back(thread_qpair); | |
| 83 | ✗ | thread_qpair->registered = true; | |
| 84 | } | ||
| 85 | ✗ | } | |
| 86 | |||
| 87 | ✗ | static void unregister_thread_qpair(ThreadQpair* thread_qpair) { | |
| 88 | ✗ | std::lock_guard<std::mutex> lock(thread_qpair_mutex_); | |
| 89 | ✗ | auto it = std::find(active_thread_qpairs_.begin(), | |
| 90 | active_thread_qpairs_.end(), | ||
| 91 | thread_qpair); | ||
| 92 | ✗ | if (it != active_thread_qpairs_.end()) | |
| 93 | ✗ | active_thread_qpairs_.erase(it); | |
| 94 | ✗ | thread_qpair->registered = false; | |
| 95 | ✗ | } | |
| 96 | |||
| 97 | ✗ | static void release_all_thread_qpairs() { | |
| 98 | ✗ | std::vector<ThreadQpair*> to_release; | |
| 99 | { | ||
| 100 | ✗ | std::lock_guard<std::mutex> lock(thread_qpair_mutex_); | |
| 101 | ✗ | to_release.swap(active_thread_qpairs_); | |
| 102 | ✗ | for (ThreadQpair* thread_qpair : to_release) | |
| 103 | ✗ | thread_qpair->registered = false; | |
| 104 | ✗ | } | |
| 105 | ✗ | for (ThreadQpair* thread_qpair : to_release) | |
| 106 | ✗ | thread_qpair->release(); | |
| 107 | ✗ | } | |
| 108 | |||
| 109 | ✗ | static bool probe_cb(void* cb_ctx, | |
| 110 | const struct spdk_nvme_transport_id* trid, | ||
| 111 | struct spdk_nvme_ctrlr_opts* opts) { | ||
| 112 | ✗ | if (strcmp(pcie_address, trid->traddr) != 0) | |
| 113 | ✗ | return false; | |
| 114 | ✗ | LOG(INFO) << "Attaching to " << trid->traddr; | |
| 115 | ✗ | return true; | |
| 116 | } | ||
| 117 | |||
| 118 | ✗ | static void attach_cb(void* cb_ctx, | |
| 119 | const struct spdk_nvme_transport_id* trid, | ||
| 120 | struct spdk_nvme_ctrlr* ctrlr, | ||
| 121 | const struct spdk_nvme_ctrlr_opts* opts) { | ||
| 122 | ✗ | SpdkBackend* ptr = (SpdkBackend*)(cb_ctx); | |
| 123 | ✗ | LOG(INFO) << fmt::format("Attached to {}", trid->traddr); | |
| 124 | ✗ | for (uint32_t i = spdk_nvme_ctrlr_get_first_active_ns(ctrlr); i != 0; | |
| 125 | ✗ | i = spdk_nvme_ctrlr_get_next_active_ns(ctrlr, i)) { | |
| 126 | ✗ | struct spdk_nvme_ns* ns = spdk_nvme_ctrlr_get_ns(ctrlr, i); | |
| 127 | ✗ | if (ns && spdk_nvme_ns_is_active(ns)) { | |
| 128 | ✗ | ptr->ns_entry.ctrlr = ctrlr; | |
| 129 | ✗ | ptr->ns_entry.ns = ns; | |
| 130 | ✗ | LOG(INFO) << fmt::format( | |
| 131 | "Using NS {} size {}\n", | ||
| 132 | i, | ||
| 133 | ✗ | (unsigned long)spdk_nvme_ns_get_size(ptr->ns_entry.ns)); | |
| 134 | ✗ | break; | |
| 135 | } | ||
| 136 | } | ||
| 137 | ✗ | } | |
| 138 | |||
| 139 | ✗ | static void io_complete(void* arg, const struct spdk_nvme_cpl* cpl) { | |
| 140 | ✗ | int64_t t = (int64_t)(intptr_t)arg; | |
| 141 | ✗ | if (!spdk_nvme_cpl_is_success(cpl)) | |
| 142 | ✗ | fprintf(stderr, "I/O error!\n"); | |
| 143 | ✗ | pending--; | |
| 144 | ✗ | if (t >= 0) | |
| 145 | ✗ | (*coros[t])(); | |
| 146 | ✗ | } | |
| 147 | |||
| 148 | ✗ | static void batch_write_complete(void* arg, const struct spdk_nvme_cpl* cpl) { | |
| 149 | ✗ | int* inflight = reinterpret_cast<int*>(arg); | |
| 150 | ✗ | if (!spdk_nvme_cpl_is_success(cpl)) | |
| 151 | ✗ | LOG(ERROR) << "BatchWritePages: I/O error!"; | |
| 152 | ✗ | (*inflight)--; | |
| 153 | ✗ | } | |
| 154 | |||
| 155 | public: | ||
| 156 | ✗ | SpdkBackend(const BaseKVConfig& config) : IOBackend(config){}; | |
| 157 | ✗ | ~SpdkBackend() { | |
| 158 | ✗ | if (controller_active_.load(std::memory_order_acquire)) { | |
| 159 | int remaining = | ||
| 160 | ✗ | instance_count_.fetch_sub(1, std::memory_order_acq_rel) - 1; | |
| 161 | ✗ | if (remaining == 0) { | |
| 162 | ✗ | release_all_thread_qpairs(); | |
| 163 | ✗ | controller_active_.store(false, std::memory_order_release); | |
| 164 | } | ||
| 165 | } | ||
| 166 | ✗ | } | |
| 167 | ✗ | void init() override { | |
| 168 | ✗ | LOG(INFO) << "init spdk backend"; | |
| 169 | ✗ | std::lock_guard<std::mutex> lock(env_mutex_); | |
| 170 | ✗ | if (!env_initialized_) { | |
| 171 | ✗ | LOG(INFO) << "Initializing NVMe Controllers"; | |
| 172 | ✗ | opts = {}; | |
| 173 | ✗ | opts.opts_size = sizeof(spdk_env_opts); | |
| 174 | ✗ | spdk_env_opts_init(&opts); | |
| 175 | ✗ | trid = {}; | |
| 176 | ✗ | spdk_nvme_trid_populate_transport(&trid, SPDK_NVME_TRANSPORT_PCIE); | |
| 177 | ✗ | snprintf(trid.subnqn, sizeof(trid.subnqn), "%s", SPDK_NVMF_DISCOVERY_NQN); | |
| 178 | ✗ | LOG(INFO) << "pcie_address: " << pcie_address; | |
| 179 | ✗ | LOG(INFO) << "sizeof(trid.traddr): " << sizeof(trid.traddr); | |
| 180 | ✗ | snprintf(trid.traddr, sizeof(trid.traddr), "%s", pcie_address); | |
| 181 | ✗ | LOG(INFO) << "opts.iova_mode: " << opts.iova_mode; | |
| 182 | ✗ | LOG(INFO) << "trid.traddr: " << trid.traddr; | |
| 183 | ✗ | LOG(INFO) << "trid.subnqn: " << trid.subnqn; | |
| 184 | ✗ | LOG(INFO) << "sizeof(spdk_env_opts): " << sizeof(spdk_env_opts); | |
| 185 | |||
| 186 | ✗ | CHECK(spdk_env_init(&opts) >= 0) << "Unable to initialize SPDK env\n"; | |
| 187 | ✗ | CHECK_EQ(spdk_nvme_probe(&trid, this, probe_cb, attach_cb, NULL), 0) | |
| 188 | ✗ | << "Failed to probe NVMe device"; | |
| 189 | ✗ | CHECK_NE(ns_entry.ns, nullptr) << "Namespace is not initialized"; | |
| 190 | ✗ | CHECK_NE(ns_entry.ctrlr, nullptr) << "Controller is not initialized"; | |
| 191 | ✗ | shared_ns_entry_ = ns_entry; | |
| 192 | ✗ | shared_empty_page_ = (char*)spdk_zmalloc( | |
| 193 | PAGE_SIZE, 64, NULL, SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); | ||
| 194 | ✗ | CHECK_NE(shared_empty_page_, nullptr) << "Failed to allocate empty page"; | |
| 195 | ✗ | empty_page = shared_empty_page_; | |
| 196 | ✗ | controller_active_.store(true, std::memory_order_release); | |
| 197 | ✗ | env_initialized_ = true; | |
| 198 | ✗ | LOG(INFO) << "env initialized successfully" << std::endl; | |
| 199 | } else { | ||
| 200 | ✗ | ns_entry = shared_ns_entry_; | |
| 201 | ✗ | empty_page = shared_empty_page_; | |
| 202 | ✗ | CHECK_NE(empty_page, nullptr) << "Shared empty page is not initialized"; | |
| 203 | ✗ | controller_active_.store(true, std::memory_order_release); | |
| 204 | ✗ | LOG(INFO) << "env initialized successfully" << std::endl; | |
| 205 | } | ||
| 206 | instance_count_.fetch_add(1, std::memory_order_acq_rel); | ||
| 207 | ✗ | } | |
| 208 | |||
| 209 | ✗ | void* GetPage(coroutine<void>::push_type& sink, | |
| 210 | uint64_t index, | ||
| 211 | PageID_t page_id) override { | ||
| 212 | ✗ | char* buffer = (char*)spdk_zmalloc( | |
| 213 | ✗ | PAGE_SIZE, 64, NULL, SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); | |
| 214 | ✗ | CHECK_NE(buffer, nullptr) << "Failed to allocate memory for page"; | |
| 215 | ✗ | ReadPage(sink, index, page_id, buffer); | |
| 216 | ✗ | return reinterpret_cast<void*>(buffer); | |
| 217 | } | ||
| 218 | ✗ | void* GetPage(PageID_t page_id) override { | |
| 219 | ✗ | char* buffer = (char*)spdk_zmalloc( | |
| 220 | ✗ | PAGE_SIZE, 64, NULL, SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); | |
| 221 | ✗ | CHECK_NE(buffer, nullptr) << "Failed to allocate memory for page"; | |
| 222 | ✗ | ReadPage(page_id, buffer); | |
| 223 | ✗ | return reinterpret_cast<void*>(buffer); | |
| 224 | } | ||
| 225 | |||
| 226 | ✗ | char* AllocateBuffer() override { | |
| 227 | ✗ | char* buf = (char*)spdk_zmalloc( | |
| 228 | ✗ | PAGE_SIZE, 64, NULL, SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); | |
| 229 | ✗ | CHECK_NE(buf, nullptr) << "Failed to allocate DMA buffer"; | |
| 230 | ✗ | return buf; | |
| 231 | } | ||
| 232 | ✗ | char* AllocateBuffer(uint64_t page_count) override { | |
| 233 | ✗ | char* buf = (char*)spdk_zmalloc( | |
| 234 | PAGE_SIZE * page_count, | ||
| 235 | 64, | ||
| 236 | NULL, | ||
| 237 | SPDK_ENV_SOCKET_ID_ANY, | ||
| 238 | ✗ | SPDK_MALLOC_DMA); | |
| 239 | ✗ | CHECK_NE(buf, nullptr) << "Failed to allocate DMA buffer (" << page_count | |
| 240 | ✗ | << " pages)"; | |
| 241 | ✗ | return buf; | |
| 242 | } | ||
| 243 | ✗ | void FreeBuffer(char* buf) override { spdk_free(buf); } | |
| 244 | |||
| 245 | // Unpin a page, if dirty, write it back | ||
| 246 | ✗ | void Unpin(coroutine<void>::push_type& sink, | |
| 247 | uint64_t index, | ||
| 248 | PageID_t page_id, | ||
| 249 | void* page_data, | ||
| 250 | bool is_dirty) override { | ||
| 251 | ✗ | if (is_dirty) | |
| 252 | ✗ | WritePage(sink, index, page_id, reinterpret_cast<char*>(page_data)); | |
| 253 | ✗ | spdk_free(page_data); | |
| 254 | ✗ | } | |
| 255 | ✗ | void Unpin(PageID_t page_id, void* page_data, bool is_dirty) override { | |
| 256 | ✗ | if (is_dirty) | |
| 257 | ✗ | WritePage(page_id, reinterpret_cast<char*>(page_data)); | |
| 258 | ✗ | spdk_free(page_data); | |
| 259 | ✗ | } | |
| 260 | |||
| 261 | ✗ | void ReadPageAsync(coroutine<void>::push_type& sink, | |
| 262 | uint64_t index, | ||
| 263 | PageID_t page_id, | ||
| 264 | char* buffer) override { | ||
| 265 | ✗ | struct spdk_nvme_qpair* qpair = get_thread_qpair(); | |
| 266 | ✗ | pending++; | |
| 267 | ✗ | int ret = spdk_nvme_ns_cmd_read( | |
| 268 | ns_entry.ns, | ||
| 269 | qpair, | ||
| 270 | buffer, | ||
| 271 | ✗ | page_id * (PAGE_SIZE / spdk_nvme_ns_get_sector_size(ns_entry.ns)), | |
| 272 | ✗ | PAGE_SIZE / spdk_nvme_ns_get_sector_size(ns_entry.ns), | |
| 273 | io_complete, | ||
| 274 | (void*)index, | ||
| 275 | 0); | ||
| 276 | ✗ | if (ret != 0) { | |
| 277 | ✗ | pending--; | |
| 278 | ✗ | throw std::runtime_error("Failed to read page:" + std::to_string(ret)); | |
| 279 | } | ||
| 280 | ✗ | sink(); | |
| 281 | ✗ | } | |
| 282 | ✗ | void ReadPageSync(PageID_t page_id, char* buffer) override { | |
| 283 | ✗ | struct spdk_nvme_qpair* qpair = get_thread_qpair(); | |
| 284 | ✗ | pending++; | |
| 285 | ✗ | int ret = spdk_nvme_ns_cmd_read( | |
| 286 | ns_entry.ns, | ||
| 287 | qpair, | ||
| 288 | buffer, | ||
| 289 | ✗ | page_id * (PAGE_SIZE / spdk_nvme_ns_get_sector_size(ns_entry.ns)), | |
| 290 | ✗ | PAGE_SIZE / spdk_nvme_ns_get_sector_size(ns_entry.ns), | |
| 291 | io_complete, | ||
| 292 | (void*)(-1), | ||
| 293 | 0); | ||
| 294 | ✗ | if (ret != 0) { | |
| 295 | ✗ | pending--; | |
| 296 | ✗ | throw std::runtime_error("Failed to read page:" + std::to_string(ret)); | |
| 297 | } | ||
| 298 | ✗ | while (pending > 0) { | |
| 299 | ✗ | spdk_nvme_qpair_process_completions(qpair, 0); | |
| 300 | } | ||
| 301 | ✗ | } | |
| 302 | |||
| 303 | ✗ | void WritePageAsync(coroutine<void>::push_type& sink, | |
| 304 | uint64_t index, | ||
| 305 | PageID_t page_id, | ||
| 306 | char* buffer) override { | ||
| 307 | ✗ | struct spdk_nvme_qpair* qpair = get_thread_qpair(); | |
| 308 | ✗ | pending++; | |
| 309 | ✗ | int ret = spdk_nvme_ns_cmd_write( | |
| 310 | ns_entry.ns, | ||
| 311 | qpair, | ||
| 312 | buffer, | ||
| 313 | ✗ | page_id * (PAGE_SIZE / spdk_nvme_ns_get_sector_size(ns_entry.ns)), | |
| 314 | ✗ | PAGE_SIZE / spdk_nvme_ns_get_sector_size(ns_entry.ns), | |
| 315 | io_complete, | ||
| 316 | (void*)index, | ||
| 317 | 0); | ||
| 318 | ✗ | if (ret != 0) { | |
| 319 | ✗ | pending--; | |
| 320 | ✗ | throw std::runtime_error("Failed to write page:" + std::to_string(ret)); | |
| 321 | } | ||
| 322 | ✗ | sink(); | |
| 323 | ✗ | } | |
| 324 | ✗ | void WritePageSync(PageID_t page_id, char* buffer) override { | |
| 325 | // LOG(INFO) << "WritePageSync: page_id=" << page_id | ||
| 326 | // << " qpair_initialized=" << ThreadQpair::instance().initialized | ||
| 327 | // << " pending=" << pending | ||
| 328 | // << " thread=" << std::this_thread::get_id(); | ||
| 329 | ✗ | struct spdk_nvme_qpair* qpair = get_thread_qpair(); | |
| 330 | // LOG(INFO) << "WritePageSync: qpair=" << (void*)qpair; | ||
| 331 | ✗ | pending++; | |
| 332 | ✗ | int ret = spdk_nvme_ns_cmd_write( | |
| 333 | ns_entry.ns, | ||
| 334 | qpair, | ||
| 335 | buffer, | ||
| 336 | ✗ | page_id * (PAGE_SIZE / spdk_nvme_ns_get_sector_size(ns_entry.ns)), | |
| 337 | ✗ | PAGE_SIZE / spdk_nvme_ns_get_sector_size(ns_entry.ns), | |
| 338 | io_complete, | ||
| 339 | (void*)(-1), | ||
| 340 | 0); | ||
| 341 | // LOG(INFO) << "WritePageSync: cmd_write ret=" << ret; | ||
| 342 | ✗ | if (ret != 0) { | |
| 343 | ✗ | pending--; | |
| 344 | ✗ | throw std::runtime_error("Failed to write page:" + std::to_string(ret)); | |
| 345 | } | ||
| 346 | ✗ | int poll_count = 0; | |
| 347 | ✗ | while (pending > 0) { | |
| 348 | ✗ | int n = spdk_nvme_qpair_process_completions(qpair, 0); | |
| 349 | ✗ | poll_count++; | |
| 350 | ✗ | if (poll_count % 1000000 == 0) | |
| 351 | ✗ | LOG(WARNING) << "WritePageSync: stuck polling, pending=" << pending | |
| 352 | ✗ | << " poll_count=" << poll_count << " completions=" << n; | |
| 353 | } | ||
| 354 | ✗ | } | |
| 355 | |||
| 356 | ✗ | void submit() override { | |
| 357 | // SPDK submits requests during spdk_nvme_ns_cmd_{read,write} calls. | ||
| 358 | ✗ | } | |
| 359 | |||
| 360 | ✗ | void PollCompletion() override { | |
| 361 | ✗ | struct spdk_nvme_qpair* qpair = get_thread_qpair(); | |
| 362 | ✗ | spdk_nvme_qpair_process_completions(qpair, 0); | |
| 363 | ✗ | } | |
| 364 | |||
| 365 | ✗ | void BatchWritePages(const std::vector<IOEntry>& entries) override { | |
| 366 | ✗ | if (entries.empty()) | |
| 367 | ✗ | return; | |
| 368 | ✗ | struct spdk_nvme_qpair* qpair = get_thread_qpair(); | |
| 369 | ✗ | int max_inflight = std::max(queue_cnt / 2, 1); | |
| 370 | ✗ | size_t submitted = 0; | |
| 371 | ✗ | int inflight = 0; | |
| 372 | uint32_t sectors_per_page = | ||
| 373 | ✗ | PAGE_SIZE / spdk_nvme_ns_get_sector_size(ns_entry.ns); | |
| 374 | |||
| 375 | ✗ | while (submitted < entries.size() || inflight > 0) { | |
| 376 | ✗ | while (submitted < entries.size() && inflight < max_inflight) { | |
| 377 | ✗ | auto& e = entries[submitted]; | |
| 378 | ✗ | int ret = spdk_nvme_ns_cmd_write( | |
| 379 | ns_entry.ns, | ||
| 380 | qpair, | ||
| 381 | ✗ | e.buffer, | |
| 382 | ✗ | e.page_id * sectors_per_page, | |
| 383 | ✗ | e.page_count * sectors_per_page, | |
| 384 | batch_write_complete, | ||
| 385 | &inflight, | ||
| 386 | 0); | ||
| 387 | ✗ | if (ret != 0) | |
| 388 | ✗ | throw std::runtime_error( | |
| 389 | ✗ | "BatchWritePages: write failed: " + std::to_string(ret)); | |
| 390 | ✗ | inflight++; | |
| 391 | ✗ | submitted++; | |
| 392 | } | ||
| 393 | ✗ | spdk_nvme_qpair_process_completions(qpair, 0); | |
| 394 | } | ||
| 395 | } | ||
| 396 | |||
| 397 | ✗ | void BatchReadPages(const std::vector<IOEntry>& entries) override { | |
| 398 | ✗ | if (entries.empty()) | |
| 399 | ✗ | return; | |
| 400 | ✗ | struct spdk_nvme_qpair* qpair = get_thread_qpair(); | |
| 401 | ✗ | int max_inflight = std::max(queue_cnt / 2, 1); | |
| 402 | ✗ | size_t submitted = 0; | |
| 403 | ✗ | int inflight = 0; | |
| 404 | uint32_t sectors_per_page = | ||
| 405 | ✗ | PAGE_SIZE / spdk_nvme_ns_get_sector_size(ns_entry.ns); | |
| 406 | |||
| 407 | ✗ | while (submitted < entries.size() || inflight > 0) { | |
| 408 | ✗ | while (submitted < entries.size() && inflight < max_inflight) { | |
| 409 | ✗ | auto& e = entries[submitted]; | |
| 410 | ✗ | int ret = spdk_nvme_ns_cmd_read( | |
| 411 | ns_entry.ns, | ||
| 412 | qpair, | ||
| 413 | ✗ | e.buffer, | |
| 414 | ✗ | e.page_id * sectors_per_page, | |
| 415 | ✗ | e.page_count * sectors_per_page, | |
| 416 | batch_write_complete, | ||
| 417 | &inflight, | ||
| 418 | 0); | ||
| 419 | ✗ | if (ret != 0) | |
| 420 | ✗ | throw std::runtime_error( | |
| 421 | ✗ | "BatchReadPages: read failed: " + std::to_string(ret)); | |
| 422 | ✗ | inflight++; | |
| 423 | ✗ | submitted++; | |
| 424 | } | ||
| 425 | ✗ | spdk_nvme_qpair_process_completions(qpair, 0); | |
| 426 | } | ||
| 427 | } | ||
| 428 | }; | ||
| 429 | |||
| 430 | 10 | extern "C" void RecStoreForceLinkSpdkBackend() {} | |
| 431 | |||
| 432 | FACTORY_REGISTER(IOBackend, SPDK, SpdkBackend, const BaseKVConfig&); | ||
| 433 |