storage/value_store/hybrid_value_store.h
| Line | Branch | Exec | Source |
|---|---|---|---|
| 1 | #pragma once | ||
| 2 | |||
| 3 | #include <atomic> | ||
| 4 | #include <iomanip> | ||
| 5 | #include <memory> | ||
| 6 | #include <sstream> | ||
| 7 | #include <stdexcept> | ||
| 8 | |||
| 9 | #include "base/factory.h" | ||
| 10 | #include "storage/value_store/dram_value_store.h" | ||
| 11 | #include "storage/value_store/ssd_value_store.h" | ||
| 12 | |||
| 13 | class HybridValueStore : public ValueStore { | ||
| 14 | public: | ||
| 15 | 316 | explicit HybridValueStore(const BaseKVConfig& config) | |
| 16 |
1/2✓ Branch 1 taken 316 times.
✗ Branch 2 not taken.
|
316 | : dram_store_(BuildDramConfig(config)), |
| 17 |
3/6✓ Branch 2 taken 316 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 316 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 316 times.
✗ Branch 9 not taken.
|
632 | ssd_store_(BuildSsdConfig(config)) { |
| 18 |
1/2✓ Branch 1 taken 316 times.
✗ Branch 2 not taken.
|
316 | const auto& v = config.json_config_.at("value"); |
| 19 |
1/2✓ Branch 1 taken 316 times.
✗ Branch 2 not taken.
|
316 | const auto& dram = v.at("dram_allocator"); |
| 20 |
2/4✓ Branch 1 taken 316 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 316 times.
✗ Branch 5 not taken.
|
316 | dram_capacity_bytes_ = dram.at("capacity_bytes").get<uint64_t>(); |
| 21 | const auto& tiering = | ||
| 22 |
4/10✓ Branch 1 taken 316 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 316 times.
✗ Branch 4 not taken.
✓ Branch 6 taken 316 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 316 times.
✗ Branch 10 not taken.
✗ Branch 12 not taken.
✗ Branch 13 not taken.
|
316 | v.contains("tiering") ? v.at("tiering") : json::object(); |
| 23 |
1/2✓ Branch 1 taken 316 times.
✗ Branch 2 not taken.
|
316 | high_watermark_ratio_ = tiering.value("high_watermark_ratio", 0.85); |
| 24 | 316 | } | |
| 25 | |||
| 26 | 98608 | uint64_t Alloc(size_t size) override { | |
| 27 |
1/2✓ Branch 1 taken 98608 times.
✗ Branch 2 not taken.
|
98608 | if (ShouldUseDram(size)) { |
| 28 | 98608 | const uint64_t raw = dram_store_.Alloc(size); | |
| 29 |
1/2✓ Branch 0 taken 98608 times.
✗ Branch 1 not taken.
|
98608 | if (raw != kValueHandleNone) { |
| 30 | 98608 | dram_bytes_reserved_.fetch_add( | |
| 31 | dram_store_.SlotCapacity(raw), std::memory_order_relaxed); | ||
| 32 | 98608 | dram_live_allocs_.fetch_add(1, std::memory_order_relaxed); | |
| 33 | 98608 | dram_total_allocs_.fetch_add(1, std::memory_order_relaxed); | |
| 34 | 98608 | return MakeDramHandle(raw); | |
| 35 | } | ||
| 36 | } | ||
| 37 | ✗ | const uint64_t raw = ssd_store_.Alloc(size); | |
| 38 | ✗ | if (raw == kValueHandleNone) { | |
| 39 | ✗ | return kValueHandleNone; | |
| 40 | } | ||
| 41 | ✗ | ssd_live_allocs_.fetch_add(1, std::memory_order_relaxed); | |
| 42 | ✗ | ssd_total_allocs_.fetch_add(1, std::memory_order_relaxed); | |
| 43 | ✗ | return MakeSsdHandle(raw); | |
| 44 | } | ||
| 45 | |||
| 46 | 98608 | void Write(uint64_t handle, const void* data, size_t size) override { | |
| 47 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 98608 times.
|
98608 | if (handle == kValueHandleNone) { |
| 48 | ✗ | return; | |
| 49 | } | ||
| 50 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 98608 times.
|
98608 | if (IsOnSSD(handle)) { |
| 51 | ✗ | ssd_store_.Write(SsdRawHandle(handle), data, size); | |
| 52 | ✗ | return; | |
| 53 | } | ||
| 54 | 98608 | dram_store_.Write(DramRawHandle(handle), data, size); | |
| 55 | } | ||
| 56 | |||
| 57 | 98608 | uint64_t AllocAndWrite(const void* data, size_t size) override { | |
| 58 | 98608 | const uint64_t handle = Alloc(size); | |
| 59 |
1/2✓ Branch 0 taken 98608 times.
✗ Branch 1 not taken.
|
98608 | if (handle != kValueHandleNone) { |
| 60 | 98608 | Write(handle, data, size); | |
| 61 | } | ||
| 62 | 98608 | return handle; | |
| 63 | } | ||
| 64 | |||
| 65 | ✗ | size_t Read(uint64_t handle, void* out_buf, size_t buf_size) override { | |
| 66 | ✗ | if (handle == kValueHandleNone) { | |
| 67 | ✗ | return 0; | |
| 68 | } | ||
| 69 | ✗ | if (IsOnSSD(handle)) { | |
| 70 | ✗ | return ssd_store_.Read(SsdRawHandle(handle), out_buf, buf_size); | |
| 71 | } | ||
| 72 | ✗ | return dram_store_.Read(DramRawHandle(handle), out_buf, buf_size); | |
| 73 | } | ||
| 74 | |||
| 75 | 41788 | void Free(uint64_t handle) override { | |
| 76 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 41788 times.
|
41788 | if (handle == kValueHandleNone) { |
| 77 | ✗ | return; | |
| 78 | } | ||
| 79 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 41788 times.
|
41788 | if (IsOnSSD(handle)) { |
| 80 | ✗ | ssd_store_.Free(SsdRawHandle(handle)); | |
| 81 | ✗ | ssd_live_allocs_.fetch_sub(1, std::memory_order_relaxed); | |
| 82 | ✗ | return; | |
| 83 | } | ||
| 84 | 41788 | const uint64_t raw = DramRawHandle(handle); | |
| 85 |
1/2✓ Branch 1 taken 41788 times.
✗ Branch 2 not taken.
|
41788 | const size_t cap = dram_store_.SlotCapacity(raw); |
| 86 |
1/2✓ Branch 1 taken 41788 times.
✗ Branch 2 not taken.
|
41788 | dram_store_.Free(raw); |
| 87 | 41788 | dram_live_allocs_.fetch_sub(1, std::memory_order_relaxed); | |
| 88 | 41788 | uint64_t cur = dram_bytes_reserved_.load(std::memory_order_relaxed); | |
| 89 |
3/4✓ Branch 0 taken 43458 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 1670 times.
✓ Branch 3 taken 41788 times.
|
85246 | while (cur != 0 && |
| 90 |
2/2✓ Branch 0 taken 1670 times.
✓ Branch 1 taken 41788 times.
|
43458 | !dram_bytes_reserved_.compare_exchange_weak( |
| 91 |
1/2✓ Branch 0 taken 43458 times.
✗ Branch 1 not taken.
|
43458 | cur, cur > cap ? cur - cap : 0, std::memory_order_relaxed)) { |
| 92 | } | ||
| 93 | } | ||
| 94 | |||
| 95 | 73814 | const char* DirectPtr(uint64_t handle) const override { | |
| 96 |
3/6✓ Branch 0 taken 73814 times.
✗ Branch 1 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 73814 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 73814 times.
|
73814 | if (handle == kValueHandleNone || IsOnSSD(handle)) { |
| 97 | ✗ | return nullptr; | |
| 98 | } | ||
| 99 | 73814 | return dram_store_.DirectPtr(DramRawHandle(handle)); | |
| 100 | } | ||
| 101 | |||
| 102 | ✗ | char* RDMABackingData() const override { | |
| 103 | ✗ | return dram_store_.RDMABackingData(); | |
| 104 | } | ||
| 105 | |||
| 106 | ✗ | size_t RDMABackingSize() const override { | |
| 107 | ✗ | return dram_store_.RDMABackingSize(); | |
| 108 | } | ||
| 109 | |||
| 110 | 73814 | size_t SlotCapacity(uint64_t handle) const override { | |
| 111 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 73814 times.
|
73814 | if (handle == kValueHandleNone) { |
| 112 | ✗ | return 0; | |
| 113 | } | ||
| 114 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 73814 times.
|
73814 | if (IsOnSSD(handle)) { |
| 115 | ✗ | return ssd_store_.SlotCapacity(SsdRawHandle(handle)); | |
| 116 | } | ||
| 117 | 73814 | return dram_store_.SlotCapacity(DramRawHandle(handle)); | |
| 118 | } | ||
| 119 | |||
| 120 | ✗ | void BatchWrite(const std::vector<uint64_t>& handles, | |
| 121 | const std::vector<WriteSpec>& specs) override { | ||
| 122 | ✗ | if (handles.size() != specs.size()) { | |
| 123 | ✗ | throw std::invalid_argument("HybridValueStore::BatchWrite size mismatch"); | |
| 124 | } | ||
| 125 | ✗ | std::vector<uint64_t> dram_handles; | |
| 126 | ✗ | std::vector<WriteSpec> dram_specs; | |
| 127 | ✗ | std::vector<uint64_t> ssd_handles; | |
| 128 | ✗ | std::vector<WriteSpec> ssd_specs; | |
| 129 | ✗ | dram_handles.reserve(handles.size()); | |
| 130 | ✗ | dram_specs.reserve(handles.size()); | |
| 131 | ✗ | ssd_handles.reserve(handles.size()); | |
| 132 | ✗ | ssd_specs.reserve(handles.size()); | |
| 133 | ✗ | for (size_t i = 0; i < handles.size(); ++i) { | |
| 134 | ✗ | if (handles[i] == kValueHandleNone) { | |
| 135 | ✗ | continue; | |
| 136 | } | ||
| 137 | ✗ | if (IsOnSSD(handles[i])) { | |
| 138 | ✗ | ssd_handles.push_back(SsdRawHandle(handles[i])); | |
| 139 | ✗ | ssd_specs.push_back(specs[i]); | |
| 140 | } else { | ||
| 141 | ✗ | dram_handles.push_back(DramRawHandle(handles[i])); | |
| 142 | ✗ | dram_specs.push_back(specs[i]); | |
| 143 | } | ||
| 144 | } | ||
| 145 | ✗ | if (!dram_handles.empty()) { | |
| 146 | ✗ | dram_store_.BatchWrite(dram_handles, dram_specs); | |
| 147 | } | ||
| 148 | ✗ | if (!ssd_handles.empty()) { | |
| 149 | ✗ | ssd_store_.BatchWrite(ssd_handles, ssd_specs); | |
| 150 | } | ||
| 151 | ✗ | } | |
| 152 | |||
| 153 | ✗ | void BatchRead(const std::vector<uint64_t>& handles, | |
| 154 | std::vector<ReadResult>& out_results) override { | ||
| 155 | ✗ | out_results.clear(); | |
| 156 | ✗ | out_results.resize(handles.size()); | |
| 157 | |||
| 158 | ✗ | std::vector<uint64_t> ssd_handles; | |
| 159 | ✗ | std::vector<size_t> ssd_indices; | |
| 160 | ✗ | ssd_handles.reserve(handles.size()); | |
| 161 | ✗ | ssd_indices.reserve(handles.size()); | |
| 162 | |||
| 163 | ✗ | for (size_t i = 0; i < handles.size(); ++i) { | |
| 164 | ✗ | const uint64_t handle = handles[i]; | |
| 165 | ✗ | if (handle == kValueHandleNone) { | |
| 166 | ✗ | continue; | |
| 167 | } | ||
| 168 | ✗ | if (IsOnSSD(handle)) { | |
| 169 | ✗ | ssd_handles.push_back(SsdRawHandle(handle)); | |
| 170 | ✗ | ssd_indices.push_back(i); | |
| 171 | ✗ | continue; | |
| 172 | } | ||
| 173 | |||
| 174 | ✗ | const uint64_t raw = DramRawHandle(handle); | |
| 175 | ✗ | out_results[i].data.resize(dram_store_.SlotCapacity(raw)); | |
| 176 | ✗ | const size_t actual = dram_store_.Read( | |
| 177 | ✗ | raw, out_results[i].data.data(), out_results[i].data.size()); | |
| 178 | ✗ | out_results[i].data.resize(actual); | |
| 179 | } | ||
| 180 | |||
| 181 | ✗ | if (ssd_handles.empty()) { | |
| 182 | ✗ | return; | |
| 183 | } | ||
| 184 | |||
| 185 | ✗ | std::vector<ReadResult> ssd_results; | |
| 186 | ✗ | ssd_store_.BatchRead(ssd_handles, ssd_results); | |
| 187 | ✗ | if (ssd_results.size() != ssd_indices.size()) { | |
| 188 | ✗ | throw std::runtime_error( | |
| 189 | ✗ | "HybridValueStore::BatchRead result size mismatch"); | |
| 190 | } | ||
| 191 | ✗ | for (size_t i = 0; i < ssd_indices.size(); ++i) { | |
| 192 | ✗ | out_results[ssd_indices[i]] = std::move(ssd_results[i]); | |
| 193 | } | ||
| 194 | ✗ | } | |
| 195 | |||
| 196 | ✗ | std::string GetInfo() const override { | |
| 197 | ✗ | std::ostringstream os; | |
| 198 | ✗ | os << "HybridValueStore(dram_reserved=" | |
| 199 | ✗ | << dram_bytes_reserved_.load(std::memory_order_relaxed) | |
| 200 | ✗ | << ", dram_capacity=" << dram_capacity_bytes_ << ", dram=" | |
| 201 | ✗ | << dram_store_.GetInfo() << ", ssd=" << ssd_store_.GetInfo() << ")"; | |
| 202 | ✗ | return os.str(); | |
| 203 | ✗ | } | |
| 204 | |||
| 205 | ✗ | std::string ExtraResultFields() const override { | |
| 206 | const uint64_t dram_live = | ||
| 207 | ✗ | dram_live_allocs_.load(std::memory_order_relaxed); | |
| 208 | ✗ | const uint64_t ssd_live = ssd_live_allocs_.load(std::memory_order_relaxed); | |
| 209 | ✗ | const uint64_t live = dram_live + ssd_live; | |
| 210 | const uint64_t dram_total = | ||
| 211 | ✗ | dram_total_allocs_.load(std::memory_order_relaxed); | |
| 212 | const uint64_t ssd_total = | ||
| 213 | ✗ | ssd_total_allocs_.load(std::memory_order_relaxed); | |
| 214 | ✗ | const uint64_t total = dram_total + ssd_total; | |
| 215 | ✗ | const double live_ratio = | |
| 216 | ✗ | live == 0 ? 0.0 : static_cast<double>(ssd_live) / live; | |
| 217 | ✗ | const double total_ratio = | |
| 218 | ✗ | total == 0 ? 0.0 : static_cast<double>(ssd_total) / total; | |
| 219 | ✗ | std::ostringstream os; | |
| 220 | ✗ | os << std::fixed << std::setprecision(6) | |
| 221 | ✗ | << " tiered_dram_capacity_bytes=" << dram_capacity_bytes_ | |
| 222 | ✗ | << " tiered_high_watermark_ratio=" << high_watermark_ratio_ | |
| 223 | ✗ | << " tiered_dram_live_allocs=" << dram_live << " tiered_ssd_live_allocs=" | |
| 224 | ✗ | << ssd_live << " tiered_live_allocs=" << live | |
| 225 | ✗ | << " tiered_ssd_live_ratio=" << live_ratio | |
| 226 | ✗ | << " tiered_dram_total_allocs=" << dram_total | |
| 227 | ✗ | << " tiered_ssd_total_allocs=" << ssd_total << " tiered_total_allocs=" | |
| 228 | ✗ | << total << " tiered_ssd_total_ratio=" << total_ratio; | |
| 229 | ✗ | return os.str(); | |
| 230 | ✗ | } | |
| 231 | |||
| 232 | private: | ||
| 233 | static constexpr uint64_t kSsdFlag = 1ULL << 63; | ||
| 234 | |||
| 235 | 288024 | static bool IsOnSSD(uint64_t handle) { return (handle & kSsdFlag) != 0; } | |
| 236 | |||
| 237 | 98608 | static uint64_t MakeDramHandle(uint64_t raw) { return raw; } | |
| 238 | |||
| 239 | ✗ | static uint64_t MakeSsdHandle(uint64_t raw) { return raw | kSsdFlag; } | |
| 240 | |||
| 241 | 288024 | static uint64_t DramRawHandle(uint64_t handle) { return handle; } | |
| 242 | |||
| 243 | ✗ | static uint64_t SsdRawHandle(uint64_t handle) { return handle & ~kSsdFlag; } | |
| 244 | |||
| 245 | 316 | static BaseKVConfig BuildDramConfig(const BaseKVConfig& config) { | |
| 246 |
1/2✓ Branch 1 taken 316 times.
✗ Branch 2 not taken.
|
316 | BaseKVConfig out = config; |
| 247 | 316 | auto& j = out.json_config_; | |
| 248 |
6/12✓ Branch 1 taken 316 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 316 times.
✗ Branch 4 not taken.
✓ Branch 6 taken 316 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 316 times.
✗ Branch 10 not taken.
✗ Branch 11 not taken.
✓ Branch 12 taken 316 times.
✗ Branch 13 not taken.
✓ Branch 14 taken 316 times.
|
316 | if (!j.contains("value") || !j.at("value").contains("dram_allocator")) { |
| 249 | ✗ | return out; | |
| 250 | } | ||
| 251 |
2/4✓ Branch 1 taken 316 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 316 times.
✗ Branch 5 not taken.
|
316 | json value = j.at("value"); |
| 252 |
2/4✓ Branch 1 taken 316 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 316 times.
✗ Branch 5 not taken.
|
316 | value["type"] = "DRAM_VALUE_STORE"; |
| 253 |
3/6✓ Branch 1 taken 316 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 316 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 316 times.
✗ Branch 7 not taken.
|
316 | if (value.at("dram_allocator").contains("path")) { |
| 254 |
4/8✓ Branch 1 taken 316 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 316 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 316 times.
✗ Branch 8 not taken.
✓ Branch 10 taken 316 times.
✗ Branch 11 not taken.
|
316 | value["path"] = value.at("dram_allocator").at("path"); |
| 255 | } | ||
| 256 |
1/2✓ Branch 1 taken 316 times.
✗ Branch 2 not taken.
|
316 | value.erase("ssd_allocator"); |
| 257 |
1/2✓ Branch 1 taken 316 times.
✗ Branch 2 not taken.
|
316 | value.erase("tiering"); |
| 258 |
2/4✓ Branch 1 taken 316 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 316 times.
✗ Branch 5 not taken.
|
316 | j["value"] = value; |
| 259 | 316 | return out; | |
| 260 | 316 | } | |
| 261 | |||
| 262 | 316 | static BaseKVConfig BuildSsdConfig(const BaseKVConfig& config) { | |
| 263 |
1/2✓ Branch 1 taken 316 times.
✗ Branch 2 not taken.
|
316 | BaseKVConfig out = config; |
| 264 | 316 | auto& j = out.json_config_; | |
| 265 |
6/12✓ Branch 1 taken 316 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 316 times.
✗ Branch 4 not taken.
✓ Branch 6 taken 316 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 316 times.
✗ Branch 10 not taken.
✗ Branch 11 not taken.
✓ Branch 12 taken 316 times.
✗ Branch 13 not taken.
✓ Branch 14 taken 316 times.
|
316 | if (!j.contains("value") || !j.at("value").contains("ssd_allocator")) { |
| 266 | ✗ | return out; | |
| 267 | } | ||
| 268 |
2/4✓ Branch 1 taken 316 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 316 times.
✗ Branch 5 not taken.
|
316 | json value = j.at("value"); |
| 269 |
2/4✓ Branch 1 taken 316 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 316 times.
✗ Branch 5 not taken.
|
316 | value["type"] = "SSD_VALUE_STORE"; |
| 270 |
3/6✓ Branch 1 taken 316 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 316 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 316 times.
✗ Branch 7 not taken.
|
316 | if (value.at("ssd_allocator").contains("path")) { |
| 271 |
4/8✓ Branch 1 taken 316 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 316 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 316 times.
✗ Branch 8 not taken.
✓ Branch 10 taken 316 times.
✗ Branch 11 not taken.
|
316 | value["path"] = value.at("ssd_allocator").at("path"); |
| 272 | } | ||
| 273 |
1/2✓ Branch 1 taken 316 times.
✗ Branch 2 not taken.
|
316 | value.erase("dram_allocator"); |
| 274 |
1/2✓ Branch 1 taken 316 times.
✗ Branch 2 not taken.
|
316 | value.erase("tiering"); |
| 275 |
2/4✓ Branch 1 taken 316 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 316 times.
✗ Branch 5 not taken.
|
316 | j["value"] = value; |
| 276 | 316 | return out; | |
| 277 | 316 | } | |
| 278 | |||
| 279 | 98608 | bool ShouldUseDram(size_t size) const { | |
| 280 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 98608 times.
|
98608 | if (dram_capacity_bytes_ == 0) { |
| 281 | ✗ | return false; | |
| 282 | } | ||
| 283 | const double next = | ||
| 284 | 98608 | static_cast<double>( | |
| 285 | 98608 | dram_bytes_reserved_.load(std::memory_order_relaxed) + size) / | |
| 286 | 98608 | static_cast<double>(dram_capacity_bytes_); | |
| 287 | 98608 | return next <= high_watermark_ratio_; | |
| 288 | } | ||
| 289 | |||
| 290 | DramValueStore dram_store_; | ||
| 291 | SsdValueStore ssd_store_; | ||
| 292 | uint64_t dram_capacity_bytes_ = 0; | ||
| 293 | double high_watermark_ratio_ = 0.85; | ||
| 294 | std::atomic<uint64_t> dram_bytes_reserved_{0}; | ||
| 295 | std::atomic<uint64_t> dram_live_allocs_{0}; | ||
| 296 | std::atomic<uint64_t> ssd_live_allocs_{0}; | ||
| 297 | std::atomic<uint64_t> dram_total_allocs_{0}; | ||
| 298 | std::atomic<uint64_t> ssd_total_allocs_{0}; | ||
| 299 | }; | ||
| 300 | |||
| 301 | FACTORY_REGISTER( | ||
| 302 | ValueStore, TIERED_VALUE_STORE, HybridValueStore, const BaseKVConfig&); | ||
| 303 |