GCC Code Coverage Report


Directory: src/
Coverage: low: ≥ 0% medium: ≥ 75.0% high: ≥ 90.0%
Coverage Exec / Excl / Total
Lines: 40.5% 79 / 0 / 195
Functions: 59.1% 13 / 0 / 22
Branches: 25.0% 78 / 0 / 312

storage/value_store/hybrid_value_store.h
Line Branch Exec Source
1 #pragma once
2
3 #include <atomic>
4 #include <iomanip>
5 #include <memory>
6 #include <sstream>
7 #include <stdexcept>
8
9 #include "base/factory.h"
10 #include "storage/value_store/dram_value_store.h"
11 #include "storage/value_store/ssd_value_store.h"
12
13 class HybridValueStore : public ValueStore {
14 public:
15 316 explicit HybridValueStore(const BaseKVConfig& config)
16
1/2
✓ Branch 1 taken 316 times.
✗ Branch 2 not taken.
316 : dram_store_(BuildDramConfig(config)),
17
3/6
✓ Branch 2 taken 316 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 316 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 316 times.
✗ Branch 9 not taken.
632 ssd_store_(BuildSsdConfig(config)) {
18
1/2
✓ Branch 1 taken 316 times.
✗ Branch 2 not taken.
316 const auto& v = config.json_config_.at("value");
19
1/2
✓ Branch 1 taken 316 times.
✗ Branch 2 not taken.
316 const auto& dram = v.at("dram_allocator");
20
2/4
✓ Branch 1 taken 316 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 316 times.
✗ Branch 5 not taken.
316 dram_capacity_bytes_ = dram.at("capacity_bytes").get<uint64_t>();
21 const auto& tiering =
22
4/10
✓ Branch 1 taken 316 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 316 times.
✗ Branch 4 not taken.
✓ Branch 6 taken 316 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 316 times.
✗ Branch 10 not taken.
✗ Branch 12 not taken.
✗ Branch 13 not taken.
316 v.contains("tiering") ? v.at("tiering") : json::object();
23
1/2
✓ Branch 1 taken 316 times.
✗ Branch 2 not taken.
316 high_watermark_ratio_ = tiering.value("high_watermark_ratio", 0.85);
24 316 }
25
26 98608 uint64_t Alloc(size_t size) override {
27
1/2
✓ Branch 1 taken 98608 times.
✗ Branch 2 not taken.
98608 if (ShouldUseDram(size)) {
28 98608 const uint64_t raw = dram_store_.Alloc(size);
29
1/2
✓ Branch 0 taken 98608 times.
✗ Branch 1 not taken.
98608 if (raw != kValueHandleNone) {
30 98608 dram_bytes_reserved_.fetch_add(
31 dram_store_.SlotCapacity(raw), std::memory_order_relaxed);
32 98608 dram_live_allocs_.fetch_add(1, std::memory_order_relaxed);
33 98608 dram_total_allocs_.fetch_add(1, std::memory_order_relaxed);
34 98608 return MakeDramHandle(raw);
35 }
36 }
37 const uint64_t raw = ssd_store_.Alloc(size);
38 if (raw == kValueHandleNone) {
39 return kValueHandleNone;
40 }
41 ssd_live_allocs_.fetch_add(1, std::memory_order_relaxed);
42 ssd_total_allocs_.fetch_add(1, std::memory_order_relaxed);
43 return MakeSsdHandle(raw);
44 }
45
46 98608 void Write(uint64_t handle, const void* data, size_t size) override {
47
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 98608 times.
98608 if (handle == kValueHandleNone) {
48 return;
49 }
50
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 98608 times.
98608 if (IsOnSSD(handle)) {
51 ssd_store_.Write(SsdRawHandle(handle), data, size);
52 return;
53 }
54 98608 dram_store_.Write(DramRawHandle(handle), data, size);
55 }
56
57 98608 uint64_t AllocAndWrite(const void* data, size_t size) override {
58 98608 const uint64_t handle = Alloc(size);
59
1/2
✓ Branch 0 taken 98608 times.
✗ Branch 1 not taken.
98608 if (handle != kValueHandleNone) {
60 98608 Write(handle, data, size);
61 }
62 98608 return handle;
63 }
64
65 size_t Read(uint64_t handle, void* out_buf, size_t buf_size) override {
66 if (handle == kValueHandleNone) {
67 return 0;
68 }
69 if (IsOnSSD(handle)) {
70 return ssd_store_.Read(SsdRawHandle(handle), out_buf, buf_size);
71 }
72 return dram_store_.Read(DramRawHandle(handle), out_buf, buf_size);
73 }
74
75 41788 void Free(uint64_t handle) override {
76
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 41788 times.
41788 if (handle == kValueHandleNone) {
77 return;
78 }
79
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 41788 times.
41788 if (IsOnSSD(handle)) {
80 ssd_store_.Free(SsdRawHandle(handle));
81 ssd_live_allocs_.fetch_sub(1, std::memory_order_relaxed);
82 return;
83 }
84 41788 const uint64_t raw = DramRawHandle(handle);
85
1/2
✓ Branch 1 taken 41788 times.
✗ Branch 2 not taken.
41788 const size_t cap = dram_store_.SlotCapacity(raw);
86
1/2
✓ Branch 1 taken 41788 times.
✗ Branch 2 not taken.
41788 dram_store_.Free(raw);
87 41788 dram_live_allocs_.fetch_sub(1, std::memory_order_relaxed);
88 41788 uint64_t cur = dram_bytes_reserved_.load(std::memory_order_relaxed);
89
3/4
✓ Branch 0 taken 43458 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 1670 times.
✓ Branch 3 taken 41788 times.
85246 while (cur != 0 &&
90
2/2
✓ Branch 0 taken 1670 times.
✓ Branch 1 taken 41788 times.
43458 !dram_bytes_reserved_.compare_exchange_weak(
91
1/2
✓ Branch 0 taken 43458 times.
✗ Branch 1 not taken.
43458 cur, cur > cap ? cur - cap : 0, std::memory_order_relaxed)) {
92 }
93 }
94
95 73814 const char* DirectPtr(uint64_t handle) const override {
96
3/6
✓ Branch 0 taken 73814 times.
✗ Branch 1 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 73814 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 73814 times.
73814 if (handle == kValueHandleNone || IsOnSSD(handle)) {
97 return nullptr;
98 }
99 73814 return dram_store_.DirectPtr(DramRawHandle(handle));
100 }
101
102 char* RDMABackingData() const override {
103 return dram_store_.RDMABackingData();
104 }
105
106 size_t RDMABackingSize() const override {
107 return dram_store_.RDMABackingSize();
108 }
109
110 73814 size_t SlotCapacity(uint64_t handle) const override {
111
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 73814 times.
73814 if (handle == kValueHandleNone) {
112 return 0;
113 }
114
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 73814 times.
73814 if (IsOnSSD(handle)) {
115 return ssd_store_.SlotCapacity(SsdRawHandle(handle));
116 }
117 73814 return dram_store_.SlotCapacity(DramRawHandle(handle));
118 }
119
120 void BatchWrite(const std::vector<uint64_t>& handles,
121 const std::vector<WriteSpec>& specs) override {
122 if (handles.size() != specs.size()) {
123 throw std::invalid_argument("HybridValueStore::BatchWrite size mismatch");
124 }
125 std::vector<uint64_t> dram_handles;
126 std::vector<WriteSpec> dram_specs;
127 std::vector<uint64_t> ssd_handles;
128 std::vector<WriteSpec> ssd_specs;
129 dram_handles.reserve(handles.size());
130 dram_specs.reserve(handles.size());
131 ssd_handles.reserve(handles.size());
132 ssd_specs.reserve(handles.size());
133 for (size_t i = 0; i < handles.size(); ++i) {
134 if (handles[i] == kValueHandleNone) {
135 continue;
136 }
137 if (IsOnSSD(handles[i])) {
138 ssd_handles.push_back(SsdRawHandle(handles[i]));
139 ssd_specs.push_back(specs[i]);
140 } else {
141 dram_handles.push_back(DramRawHandle(handles[i]));
142 dram_specs.push_back(specs[i]);
143 }
144 }
145 if (!dram_handles.empty()) {
146 dram_store_.BatchWrite(dram_handles, dram_specs);
147 }
148 if (!ssd_handles.empty()) {
149 ssd_store_.BatchWrite(ssd_handles, ssd_specs);
150 }
151 }
152
153 void BatchRead(const std::vector<uint64_t>& handles,
154 std::vector<ReadResult>& out_results) override {
155 out_results.clear();
156 out_results.resize(handles.size());
157
158 std::vector<uint64_t> ssd_handles;
159 std::vector<size_t> ssd_indices;
160 ssd_handles.reserve(handles.size());
161 ssd_indices.reserve(handles.size());
162
163 for (size_t i = 0; i < handles.size(); ++i) {
164 const uint64_t handle = handles[i];
165 if (handle == kValueHandleNone) {
166 continue;
167 }
168 if (IsOnSSD(handle)) {
169 ssd_handles.push_back(SsdRawHandle(handle));
170 ssd_indices.push_back(i);
171 continue;
172 }
173
174 const uint64_t raw = DramRawHandle(handle);
175 out_results[i].data.resize(dram_store_.SlotCapacity(raw));
176 const size_t actual = dram_store_.Read(
177 raw, out_results[i].data.data(), out_results[i].data.size());
178 out_results[i].data.resize(actual);
179 }
180
181 if (ssd_handles.empty()) {
182 return;
183 }
184
185 std::vector<ReadResult> ssd_results;
186 ssd_store_.BatchRead(ssd_handles, ssd_results);
187 if (ssd_results.size() != ssd_indices.size()) {
188 throw std::runtime_error(
189 "HybridValueStore::BatchRead result size mismatch");
190 }
191 for (size_t i = 0; i < ssd_indices.size(); ++i) {
192 out_results[ssd_indices[i]] = std::move(ssd_results[i]);
193 }
194 }
195
196 std::string GetInfo() const override {
197 std::ostringstream os;
198 os << "HybridValueStore(dram_reserved="
199 << dram_bytes_reserved_.load(std::memory_order_relaxed)
200 << ", dram_capacity=" << dram_capacity_bytes_ << ", dram="
201 << dram_store_.GetInfo() << ", ssd=" << ssd_store_.GetInfo() << ")";
202 return os.str();
203 }
204
205 std::string ExtraResultFields() const override {
206 const uint64_t dram_live =
207 dram_live_allocs_.load(std::memory_order_relaxed);
208 const uint64_t ssd_live = ssd_live_allocs_.load(std::memory_order_relaxed);
209 const uint64_t live = dram_live + ssd_live;
210 const uint64_t dram_total =
211 dram_total_allocs_.load(std::memory_order_relaxed);
212 const uint64_t ssd_total =
213 ssd_total_allocs_.load(std::memory_order_relaxed);
214 const uint64_t total = dram_total + ssd_total;
215 const double live_ratio =
216 live == 0 ? 0.0 : static_cast<double>(ssd_live) / live;
217 const double total_ratio =
218 total == 0 ? 0.0 : static_cast<double>(ssd_total) / total;
219 std::ostringstream os;
220 os << std::fixed << std::setprecision(6)
221 << " tiered_dram_capacity_bytes=" << dram_capacity_bytes_
222 << " tiered_high_watermark_ratio=" << high_watermark_ratio_
223 << " tiered_dram_live_allocs=" << dram_live << " tiered_ssd_live_allocs="
224 << ssd_live << " tiered_live_allocs=" << live
225 << " tiered_ssd_live_ratio=" << live_ratio
226 << " tiered_dram_total_allocs=" << dram_total
227 << " tiered_ssd_total_allocs=" << ssd_total << " tiered_total_allocs="
228 << total << " tiered_ssd_total_ratio=" << total_ratio;
229 return os.str();
230 }
231
232 private:
233 static constexpr uint64_t kSsdFlag = 1ULL << 63;
234
235 288024 static bool IsOnSSD(uint64_t handle) { return (handle & kSsdFlag) != 0; }
236
237 98608 static uint64_t MakeDramHandle(uint64_t raw) { return raw; }
238
239 static uint64_t MakeSsdHandle(uint64_t raw) { return raw | kSsdFlag; }
240
241 288024 static uint64_t DramRawHandle(uint64_t handle) { return handle; }
242
243 static uint64_t SsdRawHandle(uint64_t handle) { return handle & ~kSsdFlag; }
244
245 316 static BaseKVConfig BuildDramConfig(const BaseKVConfig& config) {
246
1/2
✓ Branch 1 taken 316 times.
✗ Branch 2 not taken.
316 BaseKVConfig out = config;
247 316 auto& j = out.json_config_;
248
6/12
✓ Branch 1 taken 316 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 316 times.
✗ Branch 4 not taken.
✓ Branch 6 taken 316 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 316 times.
✗ Branch 10 not taken.
✗ Branch 11 not taken.
✓ Branch 12 taken 316 times.
✗ Branch 13 not taken.
✓ Branch 14 taken 316 times.
316 if (!j.contains("value") || !j.at("value").contains("dram_allocator")) {
249 return out;
250 }
251
2/4
✓ Branch 1 taken 316 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 316 times.
✗ Branch 5 not taken.
316 json value = j.at("value");
252
2/4
✓ Branch 1 taken 316 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 316 times.
✗ Branch 5 not taken.
316 value["type"] = "DRAM_VALUE_STORE";
253
3/6
✓ Branch 1 taken 316 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 316 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 316 times.
✗ Branch 7 not taken.
316 if (value.at("dram_allocator").contains("path")) {
254
4/8
✓ Branch 1 taken 316 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 316 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 316 times.
✗ Branch 8 not taken.
✓ Branch 10 taken 316 times.
✗ Branch 11 not taken.
316 value["path"] = value.at("dram_allocator").at("path");
255 }
256
1/2
✓ Branch 1 taken 316 times.
✗ Branch 2 not taken.
316 value.erase("ssd_allocator");
257
1/2
✓ Branch 1 taken 316 times.
✗ Branch 2 not taken.
316 value.erase("tiering");
258
2/4
✓ Branch 1 taken 316 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 316 times.
✗ Branch 5 not taken.
316 j["value"] = value;
259 316 return out;
260 316 }
261
262 316 static BaseKVConfig BuildSsdConfig(const BaseKVConfig& config) {
263
1/2
✓ Branch 1 taken 316 times.
✗ Branch 2 not taken.
316 BaseKVConfig out = config;
264 316 auto& j = out.json_config_;
265
6/12
✓ Branch 1 taken 316 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 316 times.
✗ Branch 4 not taken.
✓ Branch 6 taken 316 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 316 times.
✗ Branch 10 not taken.
✗ Branch 11 not taken.
✓ Branch 12 taken 316 times.
✗ Branch 13 not taken.
✓ Branch 14 taken 316 times.
316 if (!j.contains("value") || !j.at("value").contains("ssd_allocator")) {
266 return out;
267 }
268
2/4
✓ Branch 1 taken 316 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 316 times.
✗ Branch 5 not taken.
316 json value = j.at("value");
269
2/4
✓ Branch 1 taken 316 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 316 times.
✗ Branch 5 not taken.
316 value["type"] = "SSD_VALUE_STORE";
270
3/6
✓ Branch 1 taken 316 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 316 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 316 times.
✗ Branch 7 not taken.
316 if (value.at("ssd_allocator").contains("path")) {
271
4/8
✓ Branch 1 taken 316 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 316 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 316 times.
✗ Branch 8 not taken.
✓ Branch 10 taken 316 times.
✗ Branch 11 not taken.
316 value["path"] = value.at("ssd_allocator").at("path");
272 }
273
1/2
✓ Branch 1 taken 316 times.
✗ Branch 2 not taken.
316 value.erase("dram_allocator");
274
1/2
✓ Branch 1 taken 316 times.
✗ Branch 2 not taken.
316 value.erase("tiering");
275
2/4
✓ Branch 1 taken 316 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 316 times.
✗ Branch 5 not taken.
316 j["value"] = value;
276 316 return out;
277 316 }
278
279 98608 bool ShouldUseDram(size_t size) const {
280
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 98608 times.
98608 if (dram_capacity_bytes_ == 0) {
281 return false;
282 }
283 const double next =
284 98608 static_cast<double>(
285 98608 dram_bytes_reserved_.load(std::memory_order_relaxed) + size) /
286 98608 static_cast<double>(dram_capacity_bytes_);
287 98608 return next <= high_watermark_ratio_;
288 }
289
290 DramValueStore dram_store_;
291 SsdValueStore ssd_store_;
292 uint64_t dram_capacity_bytes_ = 0;
293 double high_watermark_ratio_ = 0.85;
294 std::atomic<uint64_t> dram_bytes_reserved_{0};
295 std::atomic<uint64_t> dram_live_allocs_{0};
296 std::atomic<uint64_t> ssd_live_allocs_{0};
297 std::atomic<uint64_t> dram_total_allocs_{0};
298 std::atomic<uint64_t> ssd_total_allocs_{0};
299 };
300
301 FACTORY_REGISTER(
302 ValueStore, TIERED_VALUE_STORE, HybridValueStore, const BaseKVConfig&);
303