storage/kv_engine/engine_petkv.h
| Line | Branch | Exec | Source |
|---|---|---|---|
| 1 | #pragma once | ||
| 2 | #include <algorithm> | ||
| 3 | #include <memory> | ||
| 4 | #include <string> | ||
| 5 | #include <vector> | ||
| 6 | |||
| 7 | #include "base/factory.h" | ||
| 8 | #include "base_kv.h" | ||
| 9 | #include "storage/nvm/pet_kv/pet_kv.h" | ||
| 10 | |||
| 11 | DECLARE_int32(prefetch_method); | ||
| 12 | |||
| 13 | class KVEnginePetKV : public BaseKV { | ||
| 14 | public: | ||
| 15 | ✗ | explicit KVEnginePetKV(const BaseKVConfig& config) : BaseKV(config) { | |
| 16 | ✗ | const std::string shm_path = config.json_config_["path"]; | |
| 17 | ✗ | const int shard_num = config.json_config_.value("shard_num", 16); | |
| 18 | ✗ | const int value_size = config.json_config_.at("value_size").get<int>(); | |
| 19 | ✗ | const int capacity = config.json_config_.at("capacity").get<int>(); | |
| 20 | ✗ | constexpr int64_t kMinShardMemory = 2LL * 1024 * 1024; | |
| 21 | const int64_t value_capacity = | ||
| 22 | ✗ | config.json_config_.at("value_capacity").get<int64_t>(); | |
| 23 | const int shard_capacity = | ||
| 24 | ✗ | std::max(1, (capacity + shard_num - 1) / shard_num); | |
| 25 | const int64_t shard_memory = | ||
| 26 | ✗ | std::max<int64_t>(kMinShardMemory, value_capacity / shard_num); | |
| 27 | ✗ | shm_kv = std::make_unique<base::PetMultiKV>( | |
| 28 | ✗ | shm_path, shard_num, shard_memory, shard_capacity, value_size); | |
| 29 | ✗ | } | |
| 30 | |||
| 31 | ✗ | void Get(const uint64_t key, std::string& value, unsigned t) override { | |
| 32 | ✗ | auto kv_data = shm_kv->Get(key); | |
| 33 | ✗ | if (kv_data.data) | |
| 34 | ✗ | value = std::string(kv_data.data, kv_data.size); | |
| 35 | ✗ | } | |
| 36 | |||
| 37 | ✗ | void BatchGet(base::ConstArray<uint64> keys, | |
| 38 | std::vector<base::ConstArray<float>>* values, | ||
| 39 | unsigned t) override { | ||
| 40 | ✗ | values->clear(); | |
| 41 | ✗ | if (FLAGS_prefetch_method == 0) { | |
| 42 | ✗ | for (auto k : keys) { | |
| 43 | ✗ | auto kv_data = shm_kv->Get(k); | |
| 44 | #ifdef RPC_DEBUG | ||
| 45 | CHECK_NE(kv_data.size, 0) << "empty kv, key is " << k; | ||
| 46 | #endif | ||
| 47 | ✗ | values->emplace_back( | |
| 48 | ✗ | (float*)kv_data.data, kv_data.size / sizeof(float)); | |
| 49 | } | ||
| 50 | ✗ | } else if (FLAGS_prefetch_method == 1) { | |
| 51 | ✗ | shm_kv->BatchGet(keys, values); | |
| 52 | } | ||
| 53 | ✗ | } | |
| 54 | |||
| 55 | void | ||
| 56 | ✗ | Put(const uint64_t key, const std::string_view& value, unsigned t) override { | |
| 57 | ✗ | CHECK(shm_kv->Update(key, value.data(), value.size())); | |
| 58 | ✗ | } | |
| 59 | |||
| 60 | std::pair<uint64_t, uint64_t> RegisterPMAddr() const { | ||
| 61 | return base::PMMmapRegisterCenter::GetInstance()->ForRDMAMemoryRegion(); | ||
| 62 | } | ||
| 63 | |||
| 64 | ✗ | void DebugInfo() const override { shm_kv->GetInfo(); } | |
| 65 | |||
| 66 | private: | ||
| 67 | base::ScopedTempDir dir; | ||
| 68 | std::unique_ptr<base::PetMultiKV> shm_kv; | ||
| 69 | }; | ||
| 70 | |||
| 71 | FACTORY_REGISTER(BaseKV, KVEnginePetKV, KVEnginePetKV, const BaseKVConfig&); | ||
| 72 |