GCC Code Coverage Report


Directory: src/
Coverage: low: ≥ 0% medium: ≥ 75.0% high: ≥ 90.0%
Coverage Exec / Excl / Total
Lines: 0.0% 0 / 0 / 333
Functions: 0.0% 0 / 0 / 23
Branches: 0.0% 0 / 0 / 1002

storage/kv_engine/benchmark_kv_engine.cc
Line Branch Exec Source
1 #include <unistd.h>
2
3 #include <algorithm>
4 #include <atomic>
5 #include <chrono>
6 #include <cmath>
7 #include <cstdint>
8 #include <cstdio>
9 #include <exception>
10 #include <memory>
11 #include <mutex>
12 #include <stdexcept>
13 #include <string>
14 #include <string_view>
15 #include <thread>
16 #include <vector>
17
18 #include "base/bind_core.h"
19 #include "base/factory.h"
20 #include "base/init.h"
21 #include "base/log.h"
22 #include "memory/shm_file.h"
23 #include "storage/io_backend/force_link.h"
24 #include "storage/kv_engine/base_kv.h"
25 #include "storage/kv_engine/engine_selector.h"
26
27 DEFINE_string(
28 dram_path,
29 "",
30 "DRAM data directory; empty only works with anonymous DRAM allocators");
31 DEFINE_string(ssd_path, "", "SSD data directory");
32 DEFINE_string(index_type, "DRAM_EXTENDIBLE_HASH", "index.type");
33 DEFINE_string(value_store_type, "DRAM_VALUE_STORE", "value.type");
34 DEFINE_string(engine_class, "KVEngineComposite", "BaseKV factory class name");
35 DEFINE_string(fasterkv_storage,
36 "memory",
37 "KVEngineFasterKV storage backend: memory|ssd");
38 DEFINE_string(
39 fasterkv_log_path,
40 "",
41 "KVEngineFasterKV SSD log directory; defaults to ssd_path/fasterkv-log");
42 DEFINE_int64(
43 fasterkv_hlog_memory_bytes,
44 0,
45 "KVEngineFasterKV hybrid log memory bytes; 0 uses backend default");
46 DEFINE_double(
47 fasterkv_mutable_fraction,
48 0.0,
49 "KVEngineFasterKV hybrid log mutable fraction; 0 uses backend default");
50 DEFINE_int64(fasterkv_read_cache_bytes,
51 0,
52 "KVEngineFasterKV read cache bytes; 0 disables read cache");
53 DEFINE_string(dram_allocator,
54 "CONCURRENT_SLAB_MEMORY_POOL",
55 "value.dram_allocator.type");
56 DEFINE_string(ssd_io_backend, "IOURING", "SSD IO backend");
57 DEFINE_int32(ssd_queue_depth, 512, "SSD IO queue depth");
58 DEFINE_int64(dram_capacity_bytes, 0, "override DRAM allocator capacity bytes");
59 DEFINE_int64(ssd_capacity_bytes, 0, "override SSD allocator capacity bytes");
60
61 DEFINE_int64(record_count, 100000000, "YCSB record count");
62 DEFINE_string(workload,
63 "c",
64 "YCSB workload: a/b/c or workloada/workloadb/workloadc");
65 DEFINE_string(distribution, "uniform", "key distribution: uniform/zipfian");
66 DEFINE_double(zipfian_alpha, 0.9, "Zipfian alpha");
67 DEFINE_string(read_mode, "exists", "read mode: exists/get/batch_get_flat");
68 DEFINE_int32(batch_keys, 500, "keys per BatchGetFlat operation");
69 DEFINE_bool(load, true, "run load phase");
70 DEFINE_bool(run, true, "run transaction phase");
71 DEFINE_bool(print_util, false, "print KVEngine utilization after load");
72
73 DEFINE_int32(thread_num, 16, "worker thread count");
74 DEFINE_int32(load_thread_num, 0, "load thread count; 0 uses thread_num");
75 DEFINE_int32(value_size, 128, "value size bytes");
76 DEFINE_int32(running_seconds, 5, "transaction runtime seconds");
77
78 namespace {
79
80 struct PhaseStats {
81 uint64_t total_ops = 0;
82 uint64_t read_ops = 0;
83 uint64_t update_ops = 0;
84 };
85
86 struct WorkloadMix {
87 int read_percent = 100;
88 };
89
90 class FastRandom {
91 public:
92 explicit FastRandom(uint64_t seed)
93 : state_(seed ? seed : 0x9e3779b97f4a7c15ULL) {}
94
95 uint64_t Next() {
96 uint64_t x = state_;
97 x ^= x >> 12;
98 x ^= x << 25;
99 x ^= x >> 27;
100 state_ = x;
101 return x * 2685821657736338717ULL;
102 }
103
104 double Uniform01() { return (Next() >> 11) * (1.0 / 9007199254740992.0); }
105
106 uint64_t Uniform(uint64_t n) { return n == 0 ? 0 : Next() % n; }
107
108 private:
109 uint64_t state_;
110 };
111
112 class KeyGenerator {
113 public:
114 KeyGenerator(std::string distribution,
115 uint64_t record_count,
116 double zipfian_alpha,
117 uint64_t seed)
118 : distribution_(std::move(distribution)),
119 record_count_(record_count),
120 alpha_(zipfian_alpha),
121 rng_(seed) {
122 if (record_count_ == 0) {
123 throw std::invalid_argument("record_count must be positive");
124 }
125 if (distribution_ == "zipfian") {
126 if (alpha_ < 0.0) {
127 throw std::invalid_argument("zipfian_alpha must be non-negative");
128 }
129 if (std::abs(alpha_ - 1.0) < 1e-9) {
130 log_n_ = std::log(static_cast<double>(record_count_));
131 } else {
132 pow_n_ = std::pow(static_cast<double>(record_count_), 1.0 - alpha_);
133 }
134 } else if (distribution_ != "uniform") {
135 throw std::invalid_argument("distribution must be uniform or zipfian");
136 }
137 }
138
139 uint64_t NextKey() {
140 if (distribution_ == "uniform") {
141 return rng_.Uniform(record_count_) + 1;
142 }
143 return NextZipfian() + 1;
144 }
145
146 uint64_t NextUint(uint64_t n) { return rng_.Uniform(n); }
147
148 private:
149 uint64_t NextZipfian() {
150 const double u = std::max(rng_.Uniform01(), 1e-12);
151 double rank = 1.0;
152 if (std::abs(alpha_ - 1.0) < 1e-9) {
153 rank = std::exp(u * log_n_);
154 } else {
155 rank = std::pow(1.0 + u * (pow_n_ - 1.0), 1.0 / (1.0 - alpha_));
156 }
157 uint64_t key = static_cast<uint64_t>(rank);
158 if (key >= record_count_) {
159 key = record_count_ - 1;
160 }
161 return key;
162 }
163
164 std::string distribution_;
165 uint64_t record_count_;
166 double alpha_;
167 double pow_n_ = 1.0;
168 double log_n_ = 0.0;
169 FastRandom rng_;
170 };
171
172 std::string NormalizeWorkload(std::string workload) {
173 std::transform(workload.begin(), workload.end(), workload.begin(), ::tolower);
174 if (workload == "workloada")
175 return "a";
176 if (workload == "workloadb")
177 return "b";
178 if (workload == "workloadc")
179 return "c";
180 return workload;
181 }
182
183 WorkloadMix GetWorkloadMix(const std::string& workload) {
184 if (workload == "a")
185 return WorkloadMix{50};
186 if (workload == "b")
187 return WorkloadMix{95};
188 if (workload == "c")
189 return WorkloadMix{100};
190 throw std::invalid_argument(
191 "workload must be a/b/c or workloada/workloadb/workloadc");
192 }
193
194 bool IsSsdIndexType(const std::string& type) {
195 return type == "SSD" || type == "SSD_EXTENDIBLE_HASH";
196 }
197
198 bool HasDramValueStore(const std::string& type) {
199 return type == "DRAM_VALUE_STORE" || type == "TIERED_VALUE_STORE";
200 }
201
202 bool HasSsdValueStore(const std::string& type) {
203 return type == "SSD_VALUE_STORE" || type == "TIERED_VALUE_STORE";
204 }
205
206 bool IsDirectExternalEngine(const std::string& engine_class) {
207 return engine_class == "KVEngineFasterKV" ||
208 engine_class == "KVEngineHPSHashMap" ||
209 engine_class == "KVEngineHPSRocksDB";
210 }
211
212 bool IsLegacyDirectEngine(const std::string& engine_class) {
213 return IsDirectExternalEngine(engine_class) ||
214 engine_class == "KVEnginePetKV";
215 }
216
217 BaseKVConfig BuildConfig() {
218 BaseKVConfig config;
219 const uint64_t capacity = static_cast<uint64_t>(FLAGS_record_count);
220 const uint64_t value_slot_bytes =
221 static_cast<uint64_t>(std::max(FLAGS_value_size, 1)) + sizeof(uint64_t);
222 const uint64_t value_capacity = capacity * value_slot_bytes * 1.2;
223 constexpr uint64_t kMinSsdCapacityBytes = 256ULL * 1024ULL * 1024ULL;
224 if (FLAGS_engine_class == "KVEngineFasterKV") {
225 if (FLAGS_fasterkv_storage != "memory" && FLAGS_fasterkv_storage != "ssd") {
226 throw std::invalid_argument("fasterkv_storage must be memory or ssd");
227 }
228 if (FLAGS_fasterkv_mutable_fraction < 0.0 ||
229 FLAGS_fasterkv_mutable_fraction > 1.0) {
230 throw std::invalid_argument(
231 "fasterkv_mutable_fraction must be in [0, 1]");
232 }
233 config.json_config_ = {
234 {"engine_type", "KVEngineFasterKV"},
235 {"capacity", capacity},
236 {"path", FLAGS_ssd_path.empty() ? FLAGS_dram_path : FLAGS_ssd_path},
237 {"value_size", FLAGS_value_size},
238 {"fasterkv", {{"storage", FLAGS_fasterkv_storage}}}};
239 if (FLAGS_fasterkv_storage == "ssd") {
240 const std::string log_path =
241 FLAGS_fasterkv_log_path.empty()
242 ? FLAGS_ssd_path + "/fasterkv-log"
243 : FLAGS_fasterkv_log_path;
244 config.json_config_["fasterkv"]["log_path"] = log_path;
245 }
246 if (FLAGS_fasterkv_hlog_memory_bytes > 0) {
247 config.json_config_["fasterkv"]["hlog_memory_bytes"] =
248 static_cast<uint64_t>(FLAGS_fasterkv_hlog_memory_bytes);
249 }
250 if (FLAGS_fasterkv_mutable_fraction > 0.0) {
251 config.json_config_["fasterkv"]["mutable_fraction"] =
252 FLAGS_fasterkv_mutable_fraction;
253 }
254 if (FLAGS_fasterkv_read_cache_bytes > 0) {
255 config.json_config_["fasterkv"]["read_cache_bytes"] =
256 static_cast<uint64_t>(FLAGS_fasterkv_read_cache_bytes);
257 }
258 config.num_threads_ = std::max(FLAGS_thread_num, FLAGS_load_thread_num);
259 return config;
260 } else if (FLAGS_engine_class == "KVEngineHPSHashMap" ||
261 FLAGS_engine_class == "KVEngineHPSRocksDB") {
262 const std::string data_path =
263 FLAGS_ssd_path.empty() ? FLAGS_dram_path : FLAGS_ssd_path;
264 if (data_path.empty()) {
265 throw std::invalid_argument(
266 "dram_path or ssd_path must be set for " + FLAGS_engine_class);
267 }
268 config.json_config_ = {
269 {"engine_type", FLAGS_engine_class},
270 {"capacity", capacity},
271 {"path", data_path},
272 {"value_size", FLAGS_value_size}};
273 config.num_threads_ = std::max(FLAGS_thread_num, FLAGS_load_thread_num);
274 return config;
275 } else if (FLAGS_engine_class == "KVEnginePetKV") {
276 if (FLAGS_dram_path.empty()) {
277 throw std::invalid_argument("dram_path must be set for KVEnginePetKV");
278 }
279 config.json_config_ = {
280 {"capacity", capacity},
281 {"path", FLAGS_dram_path},
282 {"value_size", FLAGS_value_size},
283 {"value_capacity", value_capacity}};
284 config.num_threads_ = std::max(FLAGS_thread_num, FLAGS_load_thread_num);
285 return config;
286 }
287
288 const bool ssd_index = IsSsdIndexType(FLAGS_index_type);
289
290 config.json_config_ = {
291 {"engine_type", "KVEngineComposite"},
292 {"capacity", capacity},
293 {"index", {{"type", FLAGS_index_type}}},
294 {"value",
295 {{"type", FLAGS_value_store_type},
296 {"default_value_size_hint", FLAGS_value_size}}}};
297
298 if (ssd_index) {
299 config.json_config_["index"]["path"] = FLAGS_ssd_path + "/index.db";
300 config.json_config_["index"]["io"] = {
301 {"type", FLAGS_ssd_io_backend},
302 {"queue_depth", FLAGS_ssd_queue_depth},
303 {"base_offset_bytes", 0}};
304 }
305
306 uint64_t dram_capacity =
307 FLAGS_dram_capacity_bytes > 0
308 ? static_cast<uint64_t>(FLAGS_dram_capacity_bytes)
309 : value_capacity;
310 const uint64_t ssd_capacity =
311 FLAGS_ssd_capacity_bytes > 0
312 ? static_cast<uint64_t>(FLAGS_ssd_capacity_bytes)
313 : std::max(value_capacity, kMinSsdCapacityBytes);
314
315 if (FLAGS_value_store_type == "DRAM_VALUE_STORE") {
316 if (!FLAGS_dram_path.empty()) {
317 config.json_config_["value"]["path"] = FLAGS_dram_path + "/value";
318 }
319 config.json_config_["value"]["dram_allocator"] = {
320 {"type", FLAGS_dram_allocator}, {"capacity_bytes", dram_capacity}};
321 // TODO(xieminhui): fix BUG when R2_ALLOC is used, dram_capacity is not
322 // enough, should be 10 times of value_capacity
323 if (FLAGS_dram_allocator == "R2_ALLOC")
324 dram_capacity *= 10;
325 } else if (FLAGS_value_store_type == "SSD_VALUE_STORE") {
326 config.json_config_["value"]["path"] = FLAGS_ssd_path + "/value.db";
327 config.json_config_["value"]["ssd_allocator"] = {
328 {"type", "SSD_SLAB"},
329 {"capacity_bytes", ssd_capacity},
330 {"min_block_size", 128},
331 {"max_block_size", 4096},
332 {"io",
333 {{"type", FLAGS_ssd_io_backend},
334 {"queue_depth", FLAGS_ssd_queue_depth},
335 {"base_offset_bytes", 4096}}}};
336 } else if (FLAGS_value_store_type == "TIERED_VALUE_STORE") {
337 constexpr double dram_ratio = 0.1;
338 dram_capacity = dram_capacity * dram_ratio;
339
340 config.json_config_["value"]["dram_allocator"] = {
341 {"type", FLAGS_dram_allocator},
342 {"capacity_bytes", dram_capacity},
343 {"path", FLAGS_dram_path + "/dram"}};
344 config.json_config_["value"]["ssd_allocator"] = {
345 {"type", "SSD_SLAB"},
346 {"capacity_bytes", ssd_capacity},
347 {"min_block_size", 128},
348 {"max_block_size", 4096},
349 {"path", FLAGS_ssd_path + "/ssd.db"},
350 {"io",
351 {{"type", FLAGS_ssd_io_backend},
352 {"queue_depth", FLAGS_ssd_queue_depth},
353 {"base_offset_bytes", 4096}}}};
354 config.json_config_["value"]["tiering"] = {{"cache_policy", "LRU"}};
355 }
356 LOG(INFO) << config.json_config_.dump(2);
357 config.num_threads_ = std::max(FLAGS_thread_num, FLAGS_load_thread_num);
358 return config;
359 }
360
361 PhaseStats LoadRecords(BaseKV* kv, int load_threads, uint64_t record_count) {
362 std::vector<std::thread> threads;
363 std::vector<uint64_t> counts(load_threads, 0);
364 const uint64_t per_thread = (record_count + load_threads - 1) / load_threads;
365
366 for (int tid = 0; tid < load_threads; ++tid) {
367 threads.emplace_back([kv, tid, per_thread, record_count, &counts]() {
368 // base::auto_bind_core();
369 std::string value(FLAGS_value_size, static_cast<char>('a' + (tid % 26)));
370 const uint64_t begin = static_cast<uint64_t>(tid) * per_thread + 1;
371 const uint64_t end = std::min(record_count + 1, begin + per_thread);
372 for (uint64_t key = begin; key < end; ++key) {
373 kv->Put(key, std::string_view(value.data(), value.size()), tid);
374 ++counts[tid];
375 }
376 });
377 }
378 for (auto& thread : threads) {
379 thread.join();
380 }
381
382 PhaseStats stats;
383 for (uint64_t count : counts) {
384 stats.total_ops += count;
385 stats.update_ops += count;
386 }
387 return stats;
388 }
389
390 PhaseStats RunTransactions(
391 BaseKV* kv,
392 const std::string& workload,
393 const std::string& distribution,
394 int threads_num,
395 uint64_t record_count,
396 int seconds) {
397 const WorkloadMix mix = GetWorkloadMix(workload);
398 const bool use_exists = FLAGS_read_mode == "exists";
399 const bool use_batch_get_flat = FLAGS_read_mode == "batch_get_flat";
400 if (!use_exists && FLAGS_read_mode != "get" && !use_batch_get_flat) {
401 throw std::invalid_argument(
402 "read_mode must be exists, get, or batch_get_flat");
403 }
404 if (FLAGS_batch_keys <= 0) {
405 throw std::invalid_argument("batch_keys must be positive");
406 }
407 if (use_batch_get_flat && FLAGS_value_size % sizeof(float) != 0) {
408 throw std::invalid_argument(
409 "batch_get_flat requires value_size to be a multiple of sizeof(float)");
410 }
411 const int embedding_dim = FLAGS_value_size / sizeof(float);
412
413 std::atomic<bool> start{false};
414 std::atomic<bool> stop{false};
415 std::vector<std::thread> threads;
416 std::vector<PhaseStats> stats(threads_num);
417 std::exception_ptr first_exception;
418 std::mutex exception_mu;
419
420 for (int tid = 0; tid < threads_num; ++tid) {
421 threads.emplace_back([&, tid]() {
422 try {
423 base::auto_bind_core();
424 KeyGenerator key_gen(
425 distribution,
426 record_count,
427 FLAGS_zipfian_alpha,
428 0x9e3779b97f4a7c15ULL + static_cast<uint64_t>(tid));
429 std::string value(
430 FLAGS_value_size, static_cast<char>('A' + (tid % 26)));
431 std::string read_value;
432 std::vector<uint64_t> batch_keys(static_cast<size_t>(FLAGS_batch_keys));
433 std::vector<float> flat_values(static_cast<size_t>(FLAGS_batch_keys) *
434 static_cast<size_t>(embedding_dim));
435 PhaseStats local;
436 while (!start.load(std::memory_order_acquire)) {
437 }
438 while (!stop.load(std::memory_order_relaxed)) {
439 const bool do_read =
440 static_cast<int>(key_gen.NextUint(100)) < mix.read_percent;
441 if (do_read) {
442 if (use_batch_get_flat) {
443 for (int i = 0; i < FLAGS_batch_keys; ++i) {
444 batch_keys[static_cast<size_t>(i)] = key_gen.NextKey();
445 }
446 const bool ok = kv->BatchGetFlat(
447 base::ConstArray<uint64_t>(
448 batch_keys.data(), batch_keys.size()),
449 flat_values.data(),
450 FLAGS_batch_keys,
451 embedding_dim,
452 tid);
453 if (!ok) {
454 throw std::runtime_error(
455 "read_mode=batch_get_flat is unsupported by this KV "
456 "engine");
457 }
458 local.read_ops += static_cast<uint64_t>(FLAGS_batch_keys);
459 local.total_ops += static_cast<uint64_t>(FLAGS_batch_keys);
460 continue;
461 }
462 const uint64_t key = key_gen.NextKey();
463 if (use_exists) {
464 (void)kv->Exists(key, tid);
465 } else {
466 kv->Get(key, read_value, tid);
467 }
468 ++local.read_ops;
469 } else {
470 const uint64_t key = key_gen.NextKey();
471 kv->Put(key, std::string_view(value.data(), value.size()), tid);
472 ++local.update_ops;
473 }
474 ++local.total_ops;
475 }
476 stats[tid] = local;
477 } catch (...) {
478 {
479 std::lock_guard<std::mutex> lock(exception_mu);
480 if (!first_exception) {
481 first_exception = std::current_exception();
482 }
483 }
484 stop.store(true, std::memory_order_relaxed);
485 }
486 });
487 }
488
489 start.store(true, std::memory_order_release);
490 sleep(seconds);
491 stop.store(true, std::memory_order_relaxed);
492 for (auto& thread : threads) {
493 thread.join();
494 }
495 if (first_exception) {
496 std::rethrow_exception(first_exception);
497 }
498
499 PhaseStats total;
500 for (const auto& each : stats) {
501 total.total_ops += each.total_ops;
502 total.read_ops += each.read_ops;
503 total.update_ops += each.update_ops;
504 }
505 return total;
506 }
507
508 double SecondsSince(std::chrono::steady_clock::time_point start,
509 std::chrono::steady_clock::time_point end) {
510 return std::chrono::duration_cast<std::chrono::duration<double>>(end - start)
511 .count();
512 }
513
514 } // namespace
515
516 int RunBenchmark(int argc, char* argv[]) {
517 ForceLinkIOBackends();
518 base::Init(&argc, &argv);
519
520 if (FLAGS_record_count <= 0) {
521 LOG(FATAL) << "record_count must be positive";
522 }
523 if (FLAGS_thread_num <= 0) {
524 LOG(FATAL) << "thread_num must be positive";
525 }
526 if (FLAGS_value_size <= 0) {
527 LOG(FATAL) << "value_size must be positive";
528 }
529 if (FLAGS_running_seconds <= 0) {
530 LOG(FATAL) << "running_seconds must be positive";
531 }
532 if (!IsLegacyDirectEngine(FLAGS_engine_class) &&
533 IsSsdIndexType(FLAGS_index_type) && FLAGS_ssd_path.empty()) {
534 LOG(FATAL) << "ssd_path must be set for SSD index";
535 }
536 if (!IsLegacyDirectEngine(FLAGS_engine_class) &&
537 HasSsdValueStore(FLAGS_value_store_type) && FLAGS_ssd_path.empty()) {
538 LOG(FATAL) << "ssd_path must be set for SSD/TIERED value store";
539 }
540 if (!IsLegacyDirectEngine(FLAGS_engine_class) &&
541 FLAGS_value_store_type == "TIERED_VALUE_STORE" &&
542 FLAGS_dram_path.empty()) {
543 LOG(FATAL) << "dram_path must be set for TIERED value store";
544 }
545 if (FLAGS_engine_class == "KVEngineFasterKV" &&
546 FLAGS_fasterkv_storage == "ssd" && FLAGS_ssd_path.empty() &&
547 FLAGS_fasterkv_log_path.empty()) {
548 LOG(FATAL) << "ssd_path or fasterkv_log_path must be set for "
549 "KVEngineFasterKV fasterkv_storage=ssd";
550 }
551
552 const std::string workload = NormalizeWorkload(FLAGS_workload);
553 (void)GetWorkloadMix(workload);
554 const int load_threads =
555 FLAGS_load_thread_num > 0 ? FLAGS_load_thread_num : FLAGS_thread_num;
556
557 base::PMMmapRegisterCenter::GetConfig().backend =
558 base::PMMmapRegisterCenter::BackendFromUseDram(
559 HasDramValueStore(FLAGS_value_store_type) ||
560 IsLegacyDirectEngine(FLAGS_engine_class));
561
562 BaseKVConfig config = BuildConfig();
563 std::unique_ptr<BaseKV> kv;
564 if (FLAGS_engine_class == "KVEngine") {
565 auto resolved = base::ResolveEngine(config);
566 kv.reset(base::Factory<BaseKV, const BaseKVConfig&>::NewInstance(
567 resolved.engine, resolved.cfg));
568 } else {
569 kv.reset(base::Factory<BaseKV, const BaseKVConfig&>::NewInstance(
570 FLAGS_engine_class, config));
571 }
572 if (!kv) {
573 LOG(FATAL) << "failed to create KVEngine";
574 }
575
576 if (FLAGS_load) {
577 const auto begin = std::chrono::steady_clock::now();
578 PhaseStats load_stats = LoadRecords(
579 kv.get(), load_threads, static_cast<uint64_t>(FLAGS_record_count));
580 const auto end = std::chrono::steady_clock::now();
581 const double seconds = SecondsSince(begin, end);
582 const double throughput =
583 seconds > 0.0 ? static_cast<double>(load_stats.total_ops) / seconds
584 : 0.0;
585 std::printf(
586 "YCSB_LOAD_RESULT records=%ld threads=%d seconds=%.6f ops=%lu "
587 "throughput_ops_sec=%.6f\n",
588 FLAGS_record_count,
589 load_threads,
590 seconds,
591 load_stats.total_ops,
592 throughput);
593 if (FLAGS_print_util) {
594 kv->Util();
595 }
596 }
597
598 if (FLAGS_run) {
599 const auto begin = std::chrono::steady_clock::now();
600 PhaseStats run_stats = RunTransactions(
601 kv.get(),
602 workload,
603 FLAGS_distribution,
604 FLAGS_thread_num,
605 static_cast<uint64_t>(FLAGS_record_count),
606 FLAGS_running_seconds);
607 const auto end = std::chrono::steady_clock::now();
608 const double seconds = SecondsSince(begin, end);
609 const double throughput =
610 seconds > 0.0 ? static_cast<double>(run_stats.total_ops) / seconds
611 : 0.0;
612 std::printf(
613 "YCSB_RESULT workload=%s distribution=%s zipfian_alpha=%.6f "
614 "read_mode=%s threads=%d records=%ld runtime_s=%.6f ops=%lu "
615 "throughput_ops_sec=%.6f read_ops=%lu update_ops=%lu\n",
616 workload.c_str(),
617 FLAGS_distribution.c_str(),
618 FLAGS_zipfian_alpha,
619 FLAGS_read_mode.c_str(),
620 FLAGS_thread_num,
621 FLAGS_record_count,
622 seconds,
623 run_stats.total_ops,
624 throughput,
625 run_stats.read_ops,
626 run_stats.update_ops);
627 }
628
629 LOG(INFO) << "gracefully exit";
630 return 0;
631 }
632
633 int main(int argc, char* argv[]) {
634 try {
635 return RunBenchmark(argc, argv);
636 } catch (const std::exception& e) {
637 std::fprintf(stderr, "ERROR benchmark_kv_engine: %s\n", e.what());
638 return 1;
639 }
640 }
641