GCC Code Coverage Report


Directory: src/
Coverage: low: ≥ 0% medium: ≥ 75.0% high: ≥ 90.0%
Coverage Exec / Excl / Total
Lines: 56.2% 113 / 0 / 201
Functions: 69.2% 18 / 0 / 26
Branches: 21.7% 82 / 0 / 378

storage/io_backend/iouring/io_uring_backend.cpp
Line Branch Exec Source
1 #include "../io_backend.h"
2 #include <cerrno>
3
4 thread_local int pending = 0;
5 thread_local std::vector<std::unique_ptr<coroutine<void>::pull_type>> coros;
6 #include <cstdlib>
7 #include <fcntl.h>
8 #include <liburing.h>
9 #include <sys/stat.h>
10 #include <unistd.h>
11 #include "base/factory.h"
12 #include "storage/kv_engine/base_kv.h"
13
14 class IoUringBackend : public IOBackend {
15 private:
16 std::string file_path;
17 int fd;
18
19 384406 static char* AllocateAligned(size_t bytes) {
20 384406 void* ptr = nullptr;
21 384406 int rc = posix_memalign(&ptr, PAGE_SIZE, bytes);
22
2/4
✓ Branch 0 taken 384406 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 384406 times.
384406 if (rc != 0 || ptr == nullptr) {
23 throw std::runtime_error(
24 "Failed to allocate aligned buffer, rc=" + std::to_string(rc));
25 }
26 384406 return reinterpret_cast<char*>(ptr);
27 }
28
29 struct ThreadQpair {
30 struct io_uring qpair;
31 bool initialized = false;
32 322 ThreadQpair() = default;
33 322 ~ThreadQpair() { release(); }
34
35 98433016 static ThreadQpair& instance() {
36
2/2
✓ Branch 0 taken 322 times.
✓ Branch 1 taken 98432694 times.
98433016 static thread_local ThreadQpair thread_qpair;
37 98433016 return thread_qpair;
38 }
39
40 98433016 struct io_uring* get(int queue_size) {
41
2/2
✓ Branch 0 taken 322 times.
✓ Branch 1 taken 98432694 times.
98433016 if (!initialized) {
42 322 int ret = io_uring_queue_init(queue_size, &qpair, 0);
43
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 322 times.
322 if (ret < 0)
44 throw std::runtime_error(
45 "Failed to initialize thread-local io_uring: " +
46 std::string(strerror(-ret)));
47 322 initialized = true;
48 }
49 98433016 return &qpair;
50 }
51
52 322 void release() {
53
1/2
✓ Branch 0 taken 322 times.
✗ Branch 1 not taken.
322 if (initialized) {
54 322 io_uring_queue_exit(&qpair);
55 322 initialized = false;
56 }
57 322 }
58 };
59 98433016 struct io_uring* get_thread_ring() {
60 98433016 ThreadQpair& thread_qpair = ThreadQpair::instance();
61 98433016 return thread_qpair.get(queue_cnt);
62 }
63
64 void ReadPageAsync(coroutine<void>::push_type& sink,
65 uint64_t index,
66 PageID_t page_id,
67 char* buffer) override {
68 struct io_uring* ring = get_thread_ring();
69 struct io_uring_sqe* sqe = io_uring_get_sqe(ring);
70 CHECK_NE(sqe, nullptr) << "Failed to get SQE for read operation";
71 pending++;
72 io_uring_prep_read(sqe, fd, buffer, PAGE_SIZE, page_id * PAGE_SIZE);
73 sqe->user_data = index;
74 submit();
75 sink();
76 }
77 211204 void ReadPageSync(PageID_t page_id, char* buffer) override {
78
1/2
✓ Branch 1 taken 211204 times.
✗ Branch 2 not taken.
211204 struct io_uring* ring = get_thread_ring();
79
1/2
✓ Branch 1 taken 211204 times.
✗ Branch 2 not taken.
211204 struct io_uring_sqe* sqe = io_uring_get_sqe(ring);
80
2/10
✓ Branch 3 taken 211204 times.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 211204 times.
✗ Branch 9 not taken.
✗ Branch 10 not taken.
✗ Branch 12 not taken.
✗ Branch 13 not taken.
✗ Branch 15 not taken.
✗ Branch 16 not taken.
211204 CHECK_NE(sqe, nullptr) << "Failed to get SQE for sync read operation";
81 211204 io_uring_prep_read(sqe, fd, buffer, PAGE_SIZE, page_id * PAGE_SIZE);
82
1/2
✓ Branch 1 taken 211204 times.
✗ Branch 2 not taken.
211204 int ret = io_uring_submit_and_wait(ring, 1);
83
2/10
✓ Branch 3 taken 211204 times.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 211204 times.
✗ Branch 9 not taken.
✗ Branch 10 not taken.
✗ Branch 12 not taken.
✗ Branch 13 not taken.
✗ Branch 15 not taken.
✗ Branch 16 not taken.
211204 CHECK_GE(ret, 0) << "Failed to submit sync read operation: " +
84 std::string(strerror(-ret));
85 211204 struct io_uring_cqe* cqe = nullptr;
86
1/2
✓ Branch 1 taken 211204 times.
✗ Branch 2 not taken.
211204 ret = io_uring_wait_cqe(ring, &cqe);
87
2/10
✓ Branch 3 taken 211204 times.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 211204 times.
✗ Branch 9 not taken.
✗ Branch 10 not taken.
✗ Branch 12 not taken.
✗ Branch 13 not taken.
✗ Branch 15 not taken.
✗ Branch 16 not taken.
211204 CHECK_GE(ret, 0) << "Failed to wait CQE for sync read: " +
88 std::string(strerror(-ret));
89
2/8
✓ Branch 3 taken 211204 times.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 211204 times.
✗ Branch 9 not taken.
✗ Branch 10 not taken.
✗ Branch 12 not taken.
✗ Branch 13 not taken.
211204 CHECK_NE(cqe, nullptr);
90
2/10
✓ Branch 3 taken 211204 times.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 211204 times.
✗ Branch 9 not taken.
✗ Branch 10 not taken.
✗ Branch 12 not taken.
✗ Branch 13 not taken.
✗ Branch 15 not taken.
✗ Branch 16 not taken.
211204 CHECK_GE(cqe->res, 0) << "Sync read failed: " +
91 std::string(strerror(-cqe->res));
92
1/2
✓ Branch 1 taken 211204 times.
✗ Branch 2 not taken.
211204 io_uring_cqe_seen(ring, cqe);
93 211204 }
94
95 void WritePageAsync(coroutine<void>::push_type& sink,
96 uint64_t index,
97 PageID_t page_id,
98 char* buffer) override {
99 struct io_uring* ring = get_thread_ring();
100 struct io_uring_sqe* sqe = io_uring_get_sqe(ring);
101 CHECK_NE(sqe, nullptr) << "Failed to get SQE for write operation";
102 pending++;
103 io_uring_prep_write(sqe, fd, buffer, PAGE_SIZE, page_id * PAGE_SIZE);
104 sqe->user_data = index;
105 submit();
106 sink();
107 }
108 void WritePageSync(PageID_t page_id, char* buffer) override {
109 struct io_uring* ring = get_thread_ring();
110 struct io_uring_sqe* sqe = io_uring_get_sqe(ring);
111 CHECK_NE(sqe, nullptr) << "Failed to get SQE for sync write operation";
112 io_uring_prep_write(sqe, fd, buffer, PAGE_SIZE, page_id * PAGE_SIZE);
113 int ret = io_uring_submit_and_wait(ring, 1);
114 CHECK_GE(ret, 0) << "Failed to submit sync write operation: " +
115 std::string(strerror(-ret));
116 struct io_uring_cqe* cqe = nullptr;
117 ret = io_uring_wait_cqe(ring, &cqe);
118 CHECK_GE(ret, 0) << "Failed to wait CQE for sync write: " +
119 std::string(strerror(-ret));
120 CHECK_NE(cqe, nullptr);
121 CHECK_GE(cqe->res, 0) << "Sync write failed: " +
122 std::string(strerror(-cqe->res));
123 io_uring_cqe_seen(ring, cqe);
124 }
125
126 public:
127 632 IoUringBackend(const BaseKVConfig& config) : IOBackend(config), fd(-1) {
128
2/4
✓ Branch 1 taken 632 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 632 times.
✗ Branch 5 not taken.
632 file_path = config.json_config_.at("file_path").get<std::string>();
129 632 }
130 1264 ~IoUringBackend() {
131
1/2
✓ Branch 0 taken 632 times.
✗ Branch 1 not taken.
632 if (fd >= 0) {
132 632 close(fd);
133 632 fd = -1;
134 }
135
1/2
✓ Branch 0 taken 632 times.
✗ Branch 1 not taken.
632 if (empty_page) {
136 632 std::free(empty_page);
137 632 empty_page = nullptr;
138 }
139 1264 }
140
141 632 void init() override {
142 632 bool exists = (access(file_path.c_str(), F_OK) != -1);
143 632 fd =
144 632 open(file_path.c_str(), O_RDWR | O_CREAT | O_DIRECT, S_IRUSR | S_IWUSR);
145
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 632 times.
632 if (fd < 0) {
146 const int err = errno;
147 throw std::runtime_error(
148 "Failed to open file: " + file_path + ", errno=" +
149 std::to_string(err) + ", detail=" + std::string(strerror(err)));
150 }
151
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 632 times.
632 if (exists) {
152 struct stat file_stat;
153 fstat(fd, &file_stat);
154 next_page_id = file_stat.st_size / PAGE_SIZE;
155 } else {
156 632 next_page_id = 1;
157 }
158 632 empty_page = AllocateAligned(PAGE_SIZE);
159 632 std::memset(empty_page, 0, PAGE_SIZE);
160 632 }
161
162 211204 char* AllocateBuffer() override { return AllocateAligned(PAGE_SIZE); }
163 172570 char* AllocateBuffer(uint64_t page_count) override {
164 172570 return AllocateAligned(PAGE_SIZE * page_count);
165 }
166 383774 void FreeBuffer(char* buf) override { std::free(buf); }
167
168 void* GetPage(coroutine<void>::push_type& sink,
169 uint64_t index,
170 PageID_t page_id) override {
171 char* buffer = AllocateBuffer();
172 ReadPageAsync(sink, index, page_id, buffer);
173 return buffer;
174 }
175 void* GetPage(PageID_t page_id) override {
176 char* buffer = AllocateBuffer();
177 ReadPageSync(page_id, buffer);
178 return buffer;
179 }
180
181 void Unpin(coroutine<void>::push_type& sink,
182 uint64_t index,
183 PageID_t page_id,
184 void* page_data,
185 bool is_dirty) override {
186 if (is_dirty)
187 WritePage(sink, index, page_id, reinterpret_cast<char*>(page_data));
188 FreeBuffer(reinterpret_cast<char*>(page_data));
189 }
190
191 void Unpin(PageID_t page_id, void* page_data, bool is_dirty) override {
192 if (is_dirty)
193 WritePageSync(page_id, reinterpret_cast<char*>(page_data));
194 FreeBuffer(reinterpret_cast<char*>(page_data));
195 }
196
197 void PollCompletion() override {
198 struct io_uring* ring = get_thread_ring();
199 struct io_uring_cqe* cqe;
200 int ret = io_uring_peek_cqe(ring, &cqe);
201 if (ret == -EAGAIN)
202 return; // No completion events
203 else if (ret < 0)
204 throw std::runtime_error(
205 "Failed to peek CQE: " + std::string(strerror(-ret)));
206 int id = cqe->user_data;
207 pending--;
208 io_uring_cqe_seen(ring, cqe);
209 if (coros[id] && *coros[id])
210 (*coros[id])();
211 return;
212 }
213
214 97950606 void submit() override {
215 97950606 struct io_uring* ring = get_thread_ring();
216 97950606 int ret = io_uring_submit(ring);
217
1/8
✗ Branch 3 not taken.
✓ Branch 4 taken 97950606 times.
✗ Branch 7 not taken.
✗ Branch 8 not taken.
✗ Branch 10 not taken.
✗ Branch 11 not taken.
✗ Branch 13 not taken.
✗ Branch 14 not taken.
97950606 CHECK_GE(ret, 0) << "Failed to submit io_uring operation: " +
218 std::string(strerror(-ret));
219 97950606 }
220
221 98636 void BatchWritePages(const std::vector<IOEntry>& entries) override {
222
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 98636 times.
98636 if (entries.empty())
223 return;
224 98636 struct io_uring* ring = get_thread_ring();
225 98636 int max_inflight = std::max(queue_cnt / 2, 1);
226 98636 size_t submitted = 0;
227 98636 int inflight = 0;
228
229
6/6
✓ Branch 1 taken 34427326 times.
✓ Branch 2 taken 98636 times.
✓ Branch 3 taken 34328690 times.
✓ Branch 4 taken 98636 times.
✓ Branch 5 taken 34427326 times.
✓ Branch 6 taken 98636 times.
34525962 while (submitted < entries.size() || inflight > 0) {
230
5/6
✓ Branch 1 taken 98636 times.
✓ Branch 2 taken 34427326 times.
✓ Branch 3 taken 98636 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 98636 times.
✓ Branch 6 taken 34427326 times.
34525962 while (submitted < entries.size() && inflight < max_inflight) {
231 98636 auto& e = entries[submitted];
232 98636 uint64_t total_bytes = e.page_count * PAGE_SIZE;
233
1/2
✓ Branch 1 taken 98636 times.
✗ Branch 2 not taken.
98636 struct io_uring_sqe* sqe = io_uring_get_sqe(ring);
234
2/10
✓ Branch 3 taken 98636 times.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 98636 times.
✗ Branch 9 not taken.
✗ Branch 10 not taken.
✗ Branch 12 not taken.
✗ Branch 13 not taken.
✗ Branch 15 not taken.
✗ Branch 16 not taken.
98636 CHECK_NE(sqe, nullptr) << "Failed to get SQE for batch write";
235 98636 io_uring_prep_write(
236 98636 sqe, fd, e.buffer, total_bytes, e.page_id * PAGE_SIZE);
237 98636 sqe->user_data = 0;
238 98636 inflight++;
239 98636 submitted++;
240 }
241
1/2
✓ Branch 1 taken 34427326 times.
✗ Branch 2 not taken.
34427326 submit();
242 struct io_uring_cqe* cqe;
243
7/8
✓ Branch 0 taken 34427326 times.
✓ Branch 1 taken 98636 times.
✓ Branch 3 taken 34427326 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 98636 times.
✓ Branch 6 taken 34328690 times.
✓ Branch 7 taken 98636 times.
✓ Branch 8 taken 34427326 times.
34525962 while (inflight > 0 && io_uring_peek_cqe(ring, &cqe) == 0) {
244
2/8
✓ Branch 3 taken 98636 times.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 98636 times.
✗ Branch 9 not taken.
✗ Branch 10 not taken.
✗ Branch 12 not taken.
✗ Branch 13 not taken.
98636 CHECK_GE(cqe->res, 0)
245 << "Batch write failed: " + std::string(strerror(-cqe->res));
246
1/2
✓ Branch 1 taken 98636 times.
✗ Branch 2 not taken.
98636 io_uring_cqe_seen(ring, cqe);
247 98636 inflight--;
248 }
249 }
250 }
251
252 172570 void BatchReadPages(const std::vector<IOEntry>& entries) override {
253
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 172570 times.
172570 if (entries.empty())
254 return;
255 172570 struct io_uring* ring = get_thread_ring();
256 172570 int max_inflight = std::max(queue_cnt / 2, 1);
257 172570 size_t submitted = 0;
258 172570 int inflight = 0;
259
260
6/6
✓ Branch 1 taken 63523280 times.
✓ Branch 2 taken 172570 times.
✓ Branch 3 taken 63350710 times.
✓ Branch 4 taken 172570 times.
✓ Branch 5 taken 63523280 times.
✓ Branch 6 taken 172570 times.
63695850 while (submitted < entries.size() || inflight > 0) {
261
5/6
✓ Branch 1 taken 172570 times.
✓ Branch 2 taken 63523280 times.
✓ Branch 3 taken 172570 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 172570 times.
✓ Branch 6 taken 63523280 times.
63695850 while (submitted < entries.size() && inflight < max_inflight) {
262 172570 auto& e = entries[submitted];
263 172570 uint64_t total_bytes = e.page_count * PAGE_SIZE;
264
1/2
✓ Branch 1 taken 172570 times.
✗ Branch 2 not taken.
172570 struct io_uring_sqe* sqe = io_uring_get_sqe(ring);
265
2/10
✓ Branch 3 taken 172570 times.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 172570 times.
✗ Branch 9 not taken.
✗ Branch 10 not taken.
✗ Branch 12 not taken.
✗ Branch 13 not taken.
✗ Branch 15 not taken.
✗ Branch 16 not taken.
172570 CHECK_NE(sqe, nullptr) << "Failed to get SQE for batch read";
266 172570 io_uring_prep_read(
267 172570 sqe, fd, e.buffer, total_bytes, e.page_id * PAGE_SIZE);
268 172570 sqe->user_data = 0;
269 172570 inflight++;
270 172570 submitted++;
271 }
272
1/2
✓ Branch 1 taken 63523280 times.
✗ Branch 2 not taken.
63523280 submit();
273 struct io_uring_cqe* cqe;
274
7/8
✓ Branch 0 taken 63523280 times.
✓ Branch 1 taken 172570 times.
✓ Branch 3 taken 63523280 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 172570 times.
✓ Branch 6 taken 63350710 times.
✓ Branch 7 taken 172570 times.
✓ Branch 8 taken 63523280 times.
63695850 while (inflight > 0 && io_uring_peek_cqe(ring, &cqe) == 0) {
275
2/8
✓ Branch 3 taken 172570 times.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 172570 times.
✗ Branch 9 not taken.
✗ Branch 10 not taken.
✗ Branch 12 not taken.
✗ Branch 13 not taken.
172570 CHECK_GE(cqe->res, 0)
276 << "Batch read failed: " + std::string(strerror(-cqe->res));
277
1/2
✓ Branch 1 taken 172570 times.
✗ Branch 2 not taken.
172570 io_uring_cqe_seen(ring, cqe);
278 172570 inflight--;
279 }
280 }
281 }
282 };
283
284 10 extern "C" void RecStoreForceLinkIoUringBackend() {}
285
286 FACTORY_REGISTER(IOBackend, IOURING, IoUringBackend, const BaseKVConfig&);
287