storage/nvm/pet_kv/pet_kv.cc
| Line | Branch | Exec | Source |
|---|---|---|---|
| 1 | #include "pet_kv.h" | ||
| 2 | |||
| 3 | #include <algorithm> | ||
| 4 | #include <atomic> | ||
| 5 | #include <cstdio> | ||
| 6 | #include <fstream> | ||
| 7 | #include <iostream> | ||
| 8 | #include <utility> | ||
| 9 | |||
| 10 | #define FREE_SHM(ptr) shm_recycle_->Recycle(ptr); | ||
| 11 | |||
| 12 | namespace base { | ||
| 13 | |||
| 14 | constexpr bool IGNORE_LOAD_FACTOR = true; | ||
| 15 | |||
| 16 | PetKVData::Config PetKVData::kConfig = PetKVData::Config(); | ||
| 17 | |||
| 18 | ✗ | PetKV::PetKV(const std::string& shm_dir, | |
| 19 | int64 memory_size, | ||
| 20 | int capacity, | ||
| 21 | ✗ | int pre_known_value_size) | |
| 22 | ✗ | : start_ts_(base::GetTimestamp()), | |
| 23 | ✗ | shm_dir_(shm_dir), | |
| 24 | ✗ | pre_known_value_size_(pre_known_value_size) { | |
| 25 | ✗ | ts_getter_ = &AsyncTimeHelper::GetTimestamp; | |
| 26 | #if 1 | ||
| 27 | ✗ | if (pre_known_value_size == 0) | |
| 28 | ✗ | shm_malloc_ = new PersistMemoryPool<false>( | |
| 29 | ✗ | shm_dir + "/value", | |
| 30 | memory_size, | ||
| 31 | ✗ | {8 + 32, 8 + 64, 8 + 128, 8 + 512, 8 + 1024}); | |
| 32 | else | ||
| 33 | ✗ | shm_malloc_ = new PersistMemoryPool<true>( | |
| 34 | ✗ | shm_dir + "/value", memory_size, {pre_known_value_size}); | |
| 35 | #else | ||
| 36 | shm_malloc_ = new PersistLoopShmMalloc(shm_dir + "/value", memory_size); | ||
| 37 | #endif | ||
| 38 | |||
| 39 | // shm_recycle_ = new DirectRecycle (shm_malloc_); | ||
| 40 | // shm_recycle_ = new DelayedRecycle(shm_malloc_); | ||
| 41 | ✗ | shm_recycle_ = new ShmEpochRecycle(shm_malloc_); | |
| 42 | |||
| 43 | ✗ | auto begin_ts = base::GetTimestamp() / 1000; | |
| 44 | ✗ | LOG(INFO) << "PetKV " << shm_dir | |
| 45 | ✗ | << " start initialize, pre_ms: " << (begin_ts - start_ts_ / 1000); | |
| 46 | |||
| 47 | ✗ | uint64_t dict_size = capacity; | |
| 48 | auto dict_memory_size = | ||
| 49 | ✗ | ShmKDoubleDict::MemorySize(dict_size, IGNORE_LOAD_FACTOR); | |
| 50 | ✗ | LOG(INFO) << fmt::format( | |
| 51 | "PetKV allocate {:.2f} GB; capacity={} for dict", | ||
| 52 | ✗ | (double)dict_memory_size / 1024 / 1024 / 1024, | |
| 53 | ✗ | dict_size); | |
| 54 | ✗ | dict_shm_file_ = base::ShmFile::New(base::ShmFile::ConfigForMedium( | |
| 55 | ✗ | "DRAM", shm_dir + "/dict", dict_memory_size)); | |
| 56 | ✗ | if (!dict_shm_file_) { | |
| 57 | ✗ | base::file_util::Delete(shm_dir + "/dict", false); | |
| 58 | ✗ | dict_shm_file_ = base::ShmFile::New(base::ShmFile::ConfigForMedium( | |
| 59 | ✗ | "DRAM", shm_dir + "/dict", dict_memory_size)); | |
| 60 | ✗ | CHECK(dict_shm_file_); | |
| 61 | ✗ | LOG(INFO) << "Reinitialize shm dict size: " << dict_memory_size; | |
| 62 | } | ||
| 63 | |||
| 64 | ✗ | auto dict_file_init_ts = base::GetTimestamp() / 1000; | |
| 65 | |||
| 66 | ✗ | valid_shm_file_ = base::ShmFile::New(base::ShmFile::ConfigForMedium( | |
| 67 | ✗ | "DRAM", shm_dir + "/valid", valid_file_size)); | |
| 68 | ✗ | if (!valid_shm_file_) { | |
| 69 | ✗ | base::file_util::Delete(shm_dir + "/valid", false); | |
| 70 | ✗ | valid_shm_file_ = base::ShmFile::New(base::ShmFile::ConfigForMedium( | |
| 71 | ✗ | "DRAM", shm_dir + "/valid", valid_file_size)); | |
| 72 | ✗ | CHECK(valid_shm_file_); | |
| 73 | ✗ | shm_malloc_->Initialize(); | |
| 74 | } | ||
| 75 | ✗ | auto valid_file_init_ts = base::GetTimestamp() / 1000; | |
| 76 | |||
| 77 | ✗ | dict_ = reinterpret_cast<ShmKDoubleDict*>(dict_shm_file_->Data()); | |
| 78 | ✗ | if (!dict_->Valid(dict_shm_file_->Size())) { | |
| 79 | ✗ | dict_->Initialize(dict_size, IGNORE_LOAD_FACTOR); | |
| 80 | } else { | ||
| 81 | ✗ | LOG(INFO) << "Before recovery: [shm_malloc] " << shm_malloc_->GetInfo(); | |
| 82 | ✗ | dict_->Reload( | |
| 83 | ✗ | dict_size, IGNORE_LOAD_FACTOR, [&](uint64 key, PetKVData value) { | |
| 84 | ✗ | shm_malloc_->AddMallocs4Recovery(value.shm_malloc_offset()); | |
| 85 | ✗ | }); | |
| 86 | ✗ | LOG(INFO) << "After recovery: [Dict] find " << dict_->Size() << " kvs"; | |
| 87 | ✗ | LOG(INFO) << "After recovery: [shm_malloc] " << shm_malloc_->GetInfo(); | |
| 88 | } | ||
| 89 | |||
| 90 | ✗ | CHECK_EQ(dict_->Size(), shm_malloc_->total_malloc()); | |
| 91 | ✗ | auto all_valid_ts = base::GetTimestamp() / 1000; | |
| 92 | ✗ | LOG(INFO) << "PetKV " << shm_dir << " initialize succeed, dict_file_init_ms: " | |
| 93 | ✗ | << (dict_file_init_ts - begin_ts) | |
| 94 | ✗ | << ", dict_init_ms: " << (valid_file_init_ts - dict_file_init_ts) | |
| 95 | ✗ | << ", all_valid_ms: " << (all_valid_ts - valid_file_init_ts); | |
| 96 | ✗ | } | |
| 97 | |||
| 98 | ✗ | PetKV::~PetKV() { | |
| 99 | ✗ | delete shm_recycle_; | |
| 100 | ✗ | LOG(INFO) << "shm kv safe quit, remain dict size = " << dict_->Size(); | |
| 101 | ✗ | LOG(INFO) << shm_malloc_->GetInfo(); | |
| 102 | ✗ | delete shm_malloc_; | |
| 103 | ✗ | } | |
| 104 | |||
| 105 | ✗ | bool PetKV::Valid() { | |
| 106 | ✗ | const int kCheckThreadNum = 3; | |
| 107 | ✗ | auto begin_ts = base::GetTimestamp() / 1000; | |
| 108 | ✗ | if (!dict_->Valid(dict_shm_file_->Size())) { | |
| 109 | ✗ | LOG(ERROR) << "dict load error: " << dict_shm_file_->filename() | |
| 110 | ✗ | << ", size: " << dict_shm_file_->Size(); | |
| 111 | ✗ | return false; | |
| 112 | } | ||
| 113 | ✗ | auto dict_check_ts = base::GetTimestamp() / 1000; | |
| 114 | ✗ | auto check_ts = base::GetTimestamp() / 1000; | |
| 115 | ✗ | auto shm_free_ts = base::GetTimestamp() / 1000; | |
| 116 | ✗ | LOG(INFO) << "shm kv " << shm_dir_ | |
| 117 | ✗ | << " check valid, dict_check_ms: " << (dict_check_ts - begin_ts) | |
| 118 | ✗ | << ", check_ms: " << (check_ts - dict_check_ts) | |
| 119 | ✗ | << ", shm_free_ms: " << (shm_free_ts - check_ts); | |
| 120 | |||
| 121 | ✗ | return true; | |
| 122 | } | ||
| 123 | |||
| 124 | ✗ | bool PetKV::Update(uint64 key, const char* log, int log_size) { | |
| 125 | ✗ | base::AutoLock lock(modify_lock_); | |
| 126 | ✗ | auto [p_cache, exists] = dict_->GetReturnPtr(key); | |
| 127 | |||
| 128 | ✗ | char* value = shm_malloc_->New(log_size); | |
| 129 | ✗ | if (nullptr == value) | |
| 130 | ✗ | return false; | |
| 131 | ✗ | memcpy(value, log, log_size); | |
| 132 | ✗ | base::clflushopt_range(value, log_size); | |
| 133 | ✗ | PetKVData new_cache(shm_malloc_->GetMallocOffset(value)); | |
| 134 | |||
| 135 | ✗ | if (!exists) { | |
| 136 | ✗ | p_cache = dict_->Insert(key, new_cache, nullptr, true); | |
| 137 | ✗ | if (p_cache == nullptr) { | |
| 138 | ✗ | RECSTORE_LOG_EVERY_MS(WARNING, 2000) << "Update fail: " << key; | |
| 139 | ✗ | FREE_SHM(shm_malloc_->GetMallocData(new_cache.shm_malloc_offset())); | |
| 140 | ✗ | return false; | |
| 141 | } | ||
| 142 | ✗ | p_cache->SetShmMallocOffset(new_cache.shm_malloc_offset()); | |
| 143 | ✗ | p_cache->DoFlush(); | |
| 144 | } else { | ||
| 145 | ✗ | FREE_SHM(shm_malloc_->GetMallocData(p_cache->shm_malloc_offset())); | |
| 146 | ✗ | p_cache->SetShmMallocOffset(new_cache.shm_malloc_offset()); | |
| 147 | ✗ | p_cache->DoFlush(); | |
| 148 | } | ||
| 149 | |||
| 150 | // re-read the KV pair | ||
| 151 | ✗ | if (p_cache->shm_malloc_offset() != new_cache.shm_malloc_offset()) | |
| 152 | ✗ | FREE_SHM(shm_malloc_->GetMallocData(new_cache.shm_malloc_offset())); | |
| 153 | ✗ | return true; | |
| 154 | ✗ | } | |
| 155 | |||
| 156 | ✗ | std::string PetKV::GetInfo() { | |
| 157 | ✗ | std::string info; | |
| 158 | ✗ | info.append( | |
| 159 | ✗ | base::SFormat("cache: {}/{}\n", dict_->Size(), dict_->Capacity())); | |
| 160 | ✗ | info.append(shm_malloc_->GetInfo()); | |
| 161 | |||
| 162 | ✗ | LOG(INFO) << base::SFormat( | |
| 163 | "LoadFactor : {}/{}={}", | ||
| 164 | ✗ | dict_->Size(), | |
| 165 | ✗ | dict_->Capacity(), | |
| 166 | ✗ | dict_->Size() / (float)dict_->Capacity()); | |
| 167 | ✗ | LOG(INFO) << "MemoryUtil: " | |
| 168 | ✗ | << dict_->Size() * 16 / | |
| 169 | ✗ | (float)ShmKDoubleDict::MemorySize( | |
| 170 | ✗ | (float)dict_->Capacity(), IGNORE_LOAD_FACTOR); | |
| 171 | |||
| 172 | ✗ | return info; | |
| 173 | ✗ | } | |
| 174 | |||
| 175 | ✗ | PetMultiKV::PetMultiKV(const std::vector<std::string>& shm_dir, | |
| 176 | int shard_num, | ||
| 177 | int64 shard_memory, | ||
| 178 | int shard_cache_capacity, | ||
| 179 | ✗ | int pre_known_value_size) | |
| 180 | ✗ | : shm_dir_(shm_dir), | |
| 181 | ✗ | shard_num_(shard_num), | |
| 182 | ✗ | shard_memory_(shard_memory), | |
| 183 | ✗ | shard_cache_capacity_(shard_cache_capacity), | |
| 184 | ✗ | pre_known_value_size_(pre_known_value_size) { | |
| 185 | ✗ | CHECK(!shm_dir_.empty()); | |
| 186 | ✗ | for (const auto& dir : shm_dir) { | |
| 187 | ✗ | base::file_util::CreateDirectory(dir); | |
| 188 | } | ||
| 189 | |||
| 190 | ✗ | if (shard_memory_ >= (1LL << 35)) { | |
| 191 | ✗ | for (int i = 36; i <= 64; ++i) { | |
| 192 | ✗ | if ((1LL << i) > shard_memory_) { | |
| 193 | ✗ | PetKVData::kConfig = PetKVData::Config(i - 3); | |
| 194 | ✗ | LOG(INFO) << "shard_memory over 32G, change expire_time bit: " | |
| 195 | ✗ | << PetKVData::kConfig.kExpireBit | |
| 196 | ✗ | << ", shm_offset bit: " << (i - 3); | |
| 197 | ✗ | break; | |
| 198 | } | ||
| 199 | } | ||
| 200 | } | ||
| 201 | |||
| 202 | ✗ | shm_kv_.resize(shard_num); | |
| 203 | ✗ | std::vector<std::thread> thread_pool; | |
| 204 | ✗ | for (int i = 0; i < shard_num; ++i) { | |
| 205 | ✗ | thread_pool.emplace_back(&PetMultiKV::LoadShard, this, i); | |
| 206 | } | ||
| 207 | ✗ | for (int i = 0; i < shard_num; ++i) { | |
| 208 | ✗ | thread_pool[i].join(); | |
| 209 | } | ||
| 210 | ✗ | } | |
| 211 | |||
| 212 | ✗ | void PetMultiKV::LoadShard(int shard) { | |
| 213 | ✗ | base::file_util::CreateDirectory(shm_dir(shard)); | |
| 214 | ✗ | LOG(INFO) << "PetMultiKV LoadShard shm_file:" << shm_dir(shard) | |
| 215 | ✗ | << ", shard memory_size:" << shard_memory_ | |
| 216 | ✗ | << ", shard_cache_capacity:" << shard_cache_capacity_ | |
| 217 | ✗ | << ", pre_known_value_size:" << pre_known_value_size_; | |
| 218 | |||
| 219 | ✗ | shm_kv_[shard] = new PetKV( | |
| 220 | ✗ | shm_dir(shard), | |
| 221 | shard_memory_, | ||
| 222 | shard_cache_capacity_, | ||
| 223 | ✗ | pre_known_value_size_); | |
| 224 | ✗ | } | |
| 225 | ✗ | std::string PetMultiKV::GetInfo() { | |
| 226 | ✗ | std::string info; | |
| 227 | ✗ | for (int shard = 0; shard < shard_num_; ++shard) { | |
| 228 | ✗ | info.append("shard " + base::IntToString(shard) + "\n"); | |
| 229 | ✗ | info.append(shm_kv_[shard]->GetInfo()); | |
| 230 | } | ||
| 231 | ✗ | return info; | |
| 232 | ✗ | } | |
| 233 | |||
| 234 | ✗ | std::string PetMultiKV::shm_dir(int shard_id) { | |
| 235 | ✗ | int idx = shard_id % shm_dir_.size(); | |
| 236 | ✗ | return shm_dir_[idx] + "/" + base::IntToString(shard_id); | |
| 237 | } | ||
| 238 | |||
| 239 | } // namespace base | ||
| 240 |