GCC Code Coverage Report


Directory: src/
Coverage: low: ≥ 0% medium: ≥ 75.0% high: ≥ 90.0%
Coverage Exec / Excl / Total
Lines: 0.0% 0 / 0 / 161
Functions: 0.0% 0 / 0 / 10
Branches: 0.0% 0 / 0 / 408

storage/nvm/pet_kv/pet_kv.cc
Line Branch Exec Source
1 #include "pet_kv.h"
2
3 #include <algorithm>
4 #include <atomic>
5 #include <cstdio>
6 #include <fstream>
7 #include <iostream>
8 #include <utility>
9
10 #define FREE_SHM(ptr) shm_recycle_->Recycle(ptr);
11
12 namespace base {
13
14 constexpr bool IGNORE_LOAD_FACTOR = true;
15
16 PetKVData::Config PetKVData::kConfig = PetKVData::Config();
17
18 PetKV::PetKV(const std::string& shm_dir,
19 int64 memory_size,
20 int capacity,
21 int pre_known_value_size)
22 : start_ts_(base::GetTimestamp()),
23 shm_dir_(shm_dir),
24 pre_known_value_size_(pre_known_value_size) {
25 ts_getter_ = &AsyncTimeHelper::GetTimestamp;
26 #if 1
27 if (pre_known_value_size == 0)
28 shm_malloc_ = new PersistMemoryPool<false>(
29 shm_dir + "/value",
30 memory_size,
31 {8 + 32, 8 + 64, 8 + 128, 8 + 512, 8 + 1024});
32 else
33 shm_malloc_ = new PersistMemoryPool<true>(
34 shm_dir + "/value", memory_size, {pre_known_value_size});
35 #else
36 shm_malloc_ = new PersistLoopShmMalloc(shm_dir + "/value", memory_size);
37 #endif
38
39 // shm_recycle_ = new DirectRecycle (shm_malloc_);
40 // shm_recycle_ = new DelayedRecycle(shm_malloc_);
41 shm_recycle_ = new ShmEpochRecycle(shm_malloc_);
42
43 auto begin_ts = base::GetTimestamp() / 1000;
44 LOG(INFO) << "PetKV " << shm_dir
45 << " start initialize, pre_ms: " << (begin_ts - start_ts_ / 1000);
46
47 uint64_t dict_size = capacity;
48 auto dict_memory_size =
49 ShmKDoubleDict::MemorySize(dict_size, IGNORE_LOAD_FACTOR);
50 LOG(INFO) << fmt::format(
51 "PetKV allocate {:.2f} GB; capacity={} for dict",
52 (double)dict_memory_size / 1024 / 1024 / 1024,
53 dict_size);
54 dict_shm_file_ = base::ShmFile::New(base::ShmFile::ConfigForMedium(
55 "DRAM", shm_dir + "/dict", dict_memory_size));
56 if (!dict_shm_file_) {
57 base::file_util::Delete(shm_dir + "/dict", false);
58 dict_shm_file_ = base::ShmFile::New(base::ShmFile::ConfigForMedium(
59 "DRAM", shm_dir + "/dict", dict_memory_size));
60 CHECK(dict_shm_file_);
61 LOG(INFO) << "Reinitialize shm dict size: " << dict_memory_size;
62 }
63
64 auto dict_file_init_ts = base::GetTimestamp() / 1000;
65
66 valid_shm_file_ = base::ShmFile::New(base::ShmFile::ConfigForMedium(
67 "DRAM", shm_dir + "/valid", valid_file_size));
68 if (!valid_shm_file_) {
69 base::file_util::Delete(shm_dir + "/valid", false);
70 valid_shm_file_ = base::ShmFile::New(base::ShmFile::ConfigForMedium(
71 "DRAM", shm_dir + "/valid", valid_file_size));
72 CHECK(valid_shm_file_);
73 shm_malloc_->Initialize();
74 }
75 auto valid_file_init_ts = base::GetTimestamp() / 1000;
76
77 dict_ = reinterpret_cast<ShmKDoubleDict*>(dict_shm_file_->Data());
78 if (!dict_->Valid(dict_shm_file_->Size())) {
79 dict_->Initialize(dict_size, IGNORE_LOAD_FACTOR);
80 } else {
81 LOG(INFO) << "Before recovery: [shm_malloc] " << shm_malloc_->GetInfo();
82 dict_->Reload(
83 dict_size, IGNORE_LOAD_FACTOR, [&](uint64 key, PetKVData value) {
84 shm_malloc_->AddMallocs4Recovery(value.shm_malloc_offset());
85 });
86 LOG(INFO) << "After recovery: [Dict] find " << dict_->Size() << " kvs";
87 LOG(INFO) << "After recovery: [shm_malloc] " << shm_malloc_->GetInfo();
88 }
89
90 CHECK_EQ(dict_->Size(), shm_malloc_->total_malloc());
91 auto all_valid_ts = base::GetTimestamp() / 1000;
92 LOG(INFO) << "PetKV " << shm_dir << " initialize succeed, dict_file_init_ms: "
93 << (dict_file_init_ts - begin_ts)
94 << ", dict_init_ms: " << (valid_file_init_ts - dict_file_init_ts)
95 << ", all_valid_ms: " << (all_valid_ts - valid_file_init_ts);
96 }
97
98 PetKV::~PetKV() {
99 delete shm_recycle_;
100 LOG(INFO) << "shm kv safe quit, remain dict size = " << dict_->Size();
101 LOG(INFO) << shm_malloc_->GetInfo();
102 delete shm_malloc_;
103 }
104
105 bool PetKV::Valid() {
106 const int kCheckThreadNum = 3;
107 auto begin_ts = base::GetTimestamp() / 1000;
108 if (!dict_->Valid(dict_shm_file_->Size())) {
109 LOG(ERROR) << "dict load error: " << dict_shm_file_->filename()
110 << ", size: " << dict_shm_file_->Size();
111 return false;
112 }
113 auto dict_check_ts = base::GetTimestamp() / 1000;
114 auto check_ts = base::GetTimestamp() / 1000;
115 auto shm_free_ts = base::GetTimestamp() / 1000;
116 LOG(INFO) << "shm kv " << shm_dir_
117 << " check valid, dict_check_ms: " << (dict_check_ts - begin_ts)
118 << ", check_ms: " << (check_ts - dict_check_ts)
119 << ", shm_free_ms: " << (shm_free_ts - check_ts);
120
121 return true;
122 }
123
124 bool PetKV::Update(uint64 key, const char* log, int log_size) {
125 base::AutoLock lock(modify_lock_);
126 auto [p_cache, exists] = dict_->GetReturnPtr(key);
127
128 char* value = shm_malloc_->New(log_size);
129 if (nullptr == value)
130 return false;
131 memcpy(value, log, log_size);
132 base::clflushopt_range(value, log_size);
133 PetKVData new_cache(shm_malloc_->GetMallocOffset(value));
134
135 if (!exists) {
136 p_cache = dict_->Insert(key, new_cache, nullptr, true);
137 if (p_cache == nullptr) {
138 RECSTORE_LOG_EVERY_MS(WARNING, 2000) << "Update fail: " << key;
139 FREE_SHM(shm_malloc_->GetMallocData(new_cache.shm_malloc_offset()));
140 return false;
141 }
142 p_cache->SetShmMallocOffset(new_cache.shm_malloc_offset());
143 p_cache->DoFlush();
144 } else {
145 FREE_SHM(shm_malloc_->GetMallocData(p_cache->shm_malloc_offset()));
146 p_cache->SetShmMallocOffset(new_cache.shm_malloc_offset());
147 p_cache->DoFlush();
148 }
149
150 // re-read the KV pair
151 if (p_cache->shm_malloc_offset() != new_cache.shm_malloc_offset())
152 FREE_SHM(shm_malloc_->GetMallocData(new_cache.shm_malloc_offset()));
153 return true;
154 }
155
156 std::string PetKV::GetInfo() {
157 std::string info;
158 info.append(
159 base::SFormat("cache: {}/{}\n", dict_->Size(), dict_->Capacity()));
160 info.append(shm_malloc_->GetInfo());
161
162 LOG(INFO) << base::SFormat(
163 "LoadFactor : {}/{}={}",
164 dict_->Size(),
165 dict_->Capacity(),
166 dict_->Size() / (float)dict_->Capacity());
167 LOG(INFO) << "MemoryUtil: "
168 << dict_->Size() * 16 /
169 (float)ShmKDoubleDict::MemorySize(
170 (float)dict_->Capacity(), IGNORE_LOAD_FACTOR);
171
172 return info;
173 }
174
175 PetMultiKV::PetMultiKV(const std::vector<std::string>& shm_dir,
176 int shard_num,
177 int64 shard_memory,
178 int shard_cache_capacity,
179 int pre_known_value_size)
180 : shm_dir_(shm_dir),
181 shard_num_(shard_num),
182 shard_memory_(shard_memory),
183 shard_cache_capacity_(shard_cache_capacity),
184 pre_known_value_size_(pre_known_value_size) {
185 CHECK(!shm_dir_.empty());
186 for (const auto& dir : shm_dir) {
187 base::file_util::CreateDirectory(dir);
188 }
189
190 if (shard_memory_ >= (1LL << 35)) {
191 for (int i = 36; i <= 64; ++i) {
192 if ((1LL << i) > shard_memory_) {
193 PetKVData::kConfig = PetKVData::Config(i - 3);
194 LOG(INFO) << "shard_memory over 32G, change expire_time bit: "
195 << PetKVData::kConfig.kExpireBit
196 << ", shm_offset bit: " << (i - 3);
197 break;
198 }
199 }
200 }
201
202 shm_kv_.resize(shard_num);
203 std::vector<std::thread> thread_pool;
204 for (int i = 0; i < shard_num; ++i) {
205 thread_pool.emplace_back(&PetMultiKV::LoadShard, this, i);
206 }
207 for (int i = 0; i < shard_num; ++i) {
208 thread_pool[i].join();
209 }
210 }
211
212 void PetMultiKV::LoadShard(int shard) {
213 base::file_util::CreateDirectory(shm_dir(shard));
214 LOG(INFO) << "PetMultiKV LoadShard shm_file:" << shm_dir(shard)
215 << ", shard memory_size:" << shard_memory_
216 << ", shard_cache_capacity:" << shard_cache_capacity_
217 << ", pre_known_value_size:" << pre_known_value_size_;
218
219 shm_kv_[shard] = new PetKV(
220 shm_dir(shard),
221 shard_memory_,
222 shard_cache_capacity_,
223 pre_known_value_size_);
224 }
225 std::string PetMultiKV::GetInfo() {
226 std::string info;
227 for (int shard = 0; shard < shard_num_; ++shard) {
228 info.append("shard " + base::IntToString(shard) + "\n");
229 info.append(shm_kv_[shard]->GetInfo());
230 }
231 return info;
232 }
233
234 std::string PetMultiKV::shm_dir(int shard_id) {
235 int idx = shard_id % shm_dir_.size();
236 return shm_dir_[idx] + "/" + base::IntToString(shard_id);
237 }
238
239 } // namespace base
240