GCC Code Coverage Report


Directory: src/
Coverage: low: ≥ 0% medium: ≥ 75.0% high: ≥ 90.0%
Coverage Exec / Excl / Total
Lines: 57.1% 20 / 0 / 35
Functions: 55.6% 5 / 0 / 9
Branches: 50.0% 14 / 0 / 28

storage/io_backend/io_backend.h
Line Branch Exec Source
1 #pragma once
2
3 #include <atomic>
4 #include <boost/coroutine2/all.hpp>
5 #include <cstdint>
6 #include <cstdio>
7 #include <cstring>
8 #include <glog/logging.h>
9 #include <sys/user.h>
10 #include <vector>
11 #include "../index/index.h"
12
13 using boost::coroutines2::coroutine;
14 extern thread_local int pending;
15 extern thread_local std::vector<std::unique_ptr<coroutine<void>::pull_type>>
16 coros;
17
18 typedef uint64_t PageID_t;
19 const PageID_t INVALID_PAGE = -1;
20
21 class IOBackend {
22 public:
23 658 IOBackend(const BaseKVConfig& config) {
24
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 658 times.
658 if (!config.json_config_.contains("page_id_offset"))
25 LOG(WARNING) << "IOBackend config missing 'page_id_offset'";
26
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 658 times.
658 if (!config.json_config_.contains("queue_cnt"))
27 LOG(WARNING) << "IOBackend config missing 'queue_cnt'";
28
1/2
✓ Branch 1 taken 658 times.
✗ Branch 2 not taken.
658 next_page_id.store(config.json_config_.value("page_id_offset", (PageID_t)1),
29 std::memory_order_relaxed);
30
1/2
✓ Branch 1 taken 658 times.
✗ Branch 2 not taken.
658 queue_cnt = config.json_config_.value("queue_cnt", 512);
31 658 }
32 658 virtual ~IOBackend() {}
33 virtual void init() = 0;
34
35 PageID_t AllocatePage(coroutine<void>::push_type& sink, uint64_t index) {
36 PageID_t new_page_id = next_page_id.fetch_add(1, std::memory_order_relaxed);
37 WritePageAsync(sink, index, new_page_id, empty_page);
38 return new_page_id;
39 }
40 PageID_t AllocatePage() {
41 PageID_t new_page_id = next_page_id.fetch_add(1, std::memory_order_relaxed);
42 WritePageSync(new_page_id, empty_page);
43 return new_page_id;
44 }
45 PageID_t GetNextPageID() const {
46 return next_page_id.load(std::memory_order_relaxed);
47 }
48 void SetNextPageID(PageID_t page_id) {
49 next_page_id.store(page_id, std::memory_order_relaxed);
50 }
51
52 void ReadPage(coroutine<void>::push_type& sink,
53 uint64_t index,
54 PageID_t page_id,
55 char* buffer) {
56 ReadPageAsync(sink, index, page_id, buffer);
57 }
58 211212 void ReadPage(PageID_t page_id, char* buffer) {
59 211212 ReadPageSync(page_id, buffer);
60 211212 }
61
62 void WritePage(coroutine<void>::push_type& sink,
63 uint64_t index,
64 PageID_t page_id,
65 char* buffer) {
66 WritePageAsync(sink, index, page_id, buffer);
67 }
68 void WritePage(PageID_t page_id, char* buffer) {
69 WritePageSync(page_id, buffer);
70 }
71
72 virtual void* GetPage(
73 coroutine<void>::push_type& sink, uint64_t index, PageID_t page_id) = 0;
74 virtual void* GetPage(PageID_t page_id) = 0;
75
76 // Unpin a page, if dirty, write it back
77 virtual void
78 Unpin(coroutine<void>::push_type& sink,
79 uint64_t index,
80 PageID_t page_id,
81 void* page_data,
82 bool is_dirty) = 0;
83 virtual void Unpin(PageID_t page_id, void* page_data, bool is_dirty) = 0;
84
85 virtual char* AllocateBuffer() = 0;
86 // Allocate a contiguous buffer of page_count pages (zero-filled).
87 virtual char* AllocateBuffer(uint64_t page_count) = 0;
88 virtual void FreeBuffer(char* buf) = 0;
89
90 virtual void submit() = 0;
91 virtual void PollCompletion() = 0;
92
93 struct IOEntry {
94 PageID_t page_id; // starting page
95 char* buffer; // contiguous buffer
96 uint64_t page_count; // number of pages in this buffer
97 };
98
99 // Batch write using sliding-window async IO (no coroutines).
100 // Buffers are NOT freed by this method; caller is responsible.
101 10 virtual void BatchWritePages(const std::vector<IOEntry>& entries) {
102
2/2
✓ Branch 5 taken 10 times.
✓ Branch 6 taken 10 times.
20 for (auto& e : entries) {
103
2/2
✓ Branch 0 taken 16 times.
✓ Branch 1 taken 10 times.
26 for (uint64_t i = 0; i < e.page_count; i++) {
104
1/2
✓ Branch 1 taken 16 times.
✗ Branch 2 not taken.
16 WritePageSync(e.page_id + i, e.buffer + i * PAGE_SIZE);
105 }
106 }
107 10 }
108
109 // Batch read using sliding-window async IO (no coroutines).
110 // Buffers are NOT freed by this method; caller is responsible.
111 20 virtual void BatchReadPages(const std::vector<IOEntry>& entries) {
112
2/2
✓ Branch 5 taken 20 times.
✓ Branch 6 taken 20 times.
40 for (auto& e : entries) {
113
2/2
✓ Branch 0 taken 32 times.
✓ Branch 1 taken 20 times.
52 for (uint64_t i = 0; i < e.page_count; i++) {
114
1/2
✓ Branch 1 taken 32 times.
✗ Branch 2 not taken.
32 ReadPageSync(e.page_id + i, e.buffer + i * PAGE_SIZE);
115 }
116 }
117 20 }
118
119 protected:
120 std::atomic<PageID_t> next_page_id{1};
121 char* empty_page = nullptr;
122 int queue_cnt = 512;
123
124 virtual void ReadPageAsync(coroutine<void>::push_type& sink,
125 uint64_t index,
126 PageID_t page_id,
127 char* buffer) = 0;
128 virtual void ReadPageSync(PageID_t page_id, char* buffer) = 0;
129
130 virtual void WritePageAsync(coroutine<void>::push_type& sink,
131 uint64_t index,
132 PageID_t page_id,
133 char* buffer) = 0;
134 virtual void WritePageSync(PageID_t page_id, char* buffer) = 0;
135 };
136