GCC Code Coverage Report


Directory: src/
Coverage: low: ≥ 0% medium: ≥ 75.0% high: ≥ 90.0%
Coverage Exec / Excl / Total
Lines: 36.4% 71 / 0 / 195
Functions: 46.7% 7 / 0 / 15
Branches: 25.6% 67 / 0 / 262

ps/base/cache_ps_impl.h
Line Branch Exec Source
1 #pragma once
2
3 #include <algorithm>
4 #include <atomic>
5 #include <boost/coroutine2/all.hpp>
6 #include <cstring>
7 #include <cstdint>
8 #include <experimental/filesystem>
9 #include <random>
10 #include <vector>
11
12 #include "base/array.h"
13 #include "base/factory.h"
14 #include "base/log.h" // NOLINT
15 #include "base/timer.h"
16 #include "parameters.h"
17 #include "storage/kv_engine/base_kv.h"
18 #include "storage/kv_engine/engine_factory.h"
19 #include "storage/kv_engine/engine_selector.h"
20 #include "optimizer/optimizer.h"
21 #include "ps/local_shm/local_shm_stage_report.h"
22
23 #ifdef ENABLE_PERF_REPORT
24 # include <chrono>
25 # include "base/report/report_client.h"
26 #endif
27
28 using boost::coroutines2::coroutine;
29
30 static const int KEY_CNT = 12543670;
31
32 template <typename key_t>
33 struct TaskElement {
34 TaskElement(const base::ConstArray<key_t>& keys,
35 const base::MutableArray<ParameterPack>& packs,
36 std::atomic_bool* promise)
37 : keys(keys), packs(packs), promise(promise) {}
38
39 TaskElement() {}
40
41 base::ConstArray<key_t> keys;
42 base::MutableArray<ParameterPack> packs;
43 std::atomic_bool* promise;
44 };
45
46 class CachePS {
47 public:
48 using key_t = uint64_t;
49
50 struct FlatGetProfile {
51 std::uint64_t batch_get_ns = 0;
52 std::uint64_t index_lookup_ns = 0;
53 std::uint64_t zero_fill_ns = 0;
54 std::uint64_t row_copy_ns = 0;
55 std::uint64_t rows = 0;
56 std::uint64_t value_bytes = 0;
57 std::uint64_t missing_rows = 0;
58 };
59
60 using DirectFixedRow = BaseKV::DirectFixedRow;
61 using RDMABackingRegion = BaseKV::RDMABackingRegion;
62
63 30 CachePS(json config) {
64
5/10
✓ Branch 1 taken 30 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 30 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 30 times.
✗ Branch 8 not taken.
✓ Branch 10 taken 30 times.
✗ Branch 11 not taken.
✓ Branch 13 taken 30 times.
✗ Branch 14 not taken.
30 LOG(INFO) << "cache ps config: " << config.dump(2);
65 30 BaseKVConfig kv_config;
66
2/4
✓ Branch 1 taken 30 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 30 times.
✗ Branch 5 not taken.
30 kv_config.num_threads_ = config["num_threads"].get<int>();
67
2/4
✓ Branch 1 taken 30 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 30 times.
✗ Branch 5 not taken.
30 kv_config.json_config_ = config["base_kv_config"];
68
2/4
✓ Branch 1 taken 30 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 30 times.
✗ Branch 5 not taken.
30 auto r = base::ResolveEngine(kv_config);
69
1/2
✓ Branch 1 taken 30 times.
✗ Branch 2 not taken.
30 base_kv_.reset(base::Factory<BaseKV, const BaseKVConfig&>::NewInstance(
70 r.engine, r.cfg));
71 30 }
72
73 30 ~CachePS() {}
74
75 bool Initialize(const std::vector<std::string>& model_config_path,
76 const std::vector<std::string>& emb_file_path) {
77 LOG(INFO) << "Before Load CKPT";
78 LoadCkpt(model_config_path, emb_file_path);
79 LOG(INFO) << "After Load CKPT";
80 return true;
81 }
82
83 void Clear() { base_kv_->clear(); }
84
85 void LoadFakeData(int64_t key_capacity, int value_size) {
86 base_kv_->LoadFakeData(key_capacity, value_size);
87 }
88
89 bool LoadCkpt(const std::vector<std::string>& model_config_path,
90 const std::vector<std::string>& emb_file_path) {
91 // base_kv_->loadCkpt();
92 // LoadFakeData(KEY_CNT);
93 return true;
94 }
95
96 void PutSingleParameter(
97 const uint64_t key, const void* data, const int dim, const int tid) {
98 base_kv_->Put(key, std::string_view((char*)data, dim * sizeof(float)), tid);
99 }
100
101 22 void PutDenseParameterBatch(
102 const uint64_t* keys,
103 const float* values,
104 int key_count,
105 int embedding_dim,
106 int tid) {
107
2/4
✓ Branch 0 taken 22 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 22 times.
22 if (key_count <= 0 || embedding_dim <= 0) {
108 return;
109 }
110 22 base::ConstArray<uint64_t> key_array(keys, key_count);
111 22 std::vector<base::ConstArray<float>> value_slices;
112
1/2
✓ Branch 1 taken 22 times.
✗ Branch 2 not taken.
22 value_slices.reserve(static_cast<std::size_t>(key_count));
113
2/2
✓ Branch 0 taken 38 times.
✓ Branch 1 taken 22 times.
60 for (int i = 0; i < key_count; ++i) {
114
1/2
✓ Branch 1 taken 38 times.
✗ Branch 2 not taken.
38 value_slices.emplace_back(values + i * embedding_dim, embedding_dim);
115 }
116
1/2
✓ Branch 2 taken 22 times.
✗ Branch 3 not taken.
22 base_kv_->BatchPut(key_array, &value_slices, tid);
117 22 }
118
119 void PutSingleParameter(const ParameterCompressItem* item, int tid) {
120 auto key = item->key;
121 auto dim = item->dim;
122 base_kv_->Put(
123 key, std::string_view((char*)item->data(), dim * sizeof(float)), tid);
124 }
125
126 void PutParameter(coroutine<void>::push_type& sink,
127 const ParameterCompressReader* reader,
128 int tid) {
129 std::vector<uint64_t> keys_vec;
130 std::vector<base::ConstArray<float>> values;
131 for (int i = 0; i < reader->item_size(); i++) {
132 keys_vec.emplace_back(reader->item(i)->key);
133 values.emplace_back(
134 (float*)reader->item(i)->data(), reader->item(i)->dim);
135 }
136 base::ConstArray<uint64_t> keys(keys_vec);
137
138 base_kv_->BatchPut(sink, keys, &values, tid);
139 }
140
141 void PutParameter(const ParameterCompressReader* reader, int tid) {
142 std::vector<uint64_t> keys_vec;
143 std::vector<base::ConstArray<float>> values;
144 for (int i = 0; i < reader->item_size(); i++) {
145 keys_vec.emplace_back(reader->item(i)->key);
146 values.emplace_back(
147 (float*)reader->item(i)->data(), reader->item(i)->dim);
148 }
149 base::ConstArray<uint64_t> keys(keys_vec);
150
151 base_kv_->BatchPut(keys, &values, tid);
152 }
153
154 bool GetParameterRun2Completion(key_t key, ParameterPack& pack, int tid) {
155 std::vector<uint64_t> keys = {key};
156 base::ConstArray<uint64_t> keys_array(keys);
157 std::vector<base::ConstArray<float>> values;
158
159 base_kv_->BatchGet(keys_array, &values, tid);
160 base::ConstArray<float> value = values[0];
161
162 if (value.Size() == 0) {
163 pack.key = key;
164 pack.dim = 0;
165 pack.emb_data = nullptr;
166 RECSTORE_LOG_EVERY_MS(ERROR, 1000) << "key " << key << " not existing";
167 return false;
168 }
169 pack.key = key;
170 pack.dim = value.Size();
171 pack.emb_data = value.Data();
172 // LOG(ERROR) << "Get key " << key << " dim " << pack.dim;
173 return true;
174 }
175
176 10 bool GetParameterRun2Completion(base::ConstArray<uint64_t> keys,
177 std::vector<ParameterPack>& packs,
178 int tid) {
179 #ifdef ENABLE_PERF_REPORT
180 auto start_time = std::chrono::high_resolution_clock::now();
181 #endif
182 10 const auto batch_get_start = std::chrono::steady_clock::now();
183 10 std::vector<base::ConstArray<float>> values;
184
1/2
✓ Branch 2 taken 10 times.
✗ Branch 3 not taken.
10 base_kv_->BatchGet(keys, &values, tid);
185
1/2
✓ Branch 1 taken 10 times.
✗ Branch 2 not taken.
10 recstore::ReportLocalShmStageMetric(
186 "cache_ps_get_batch_get_us",
187 recstore::LocalShmElapsedUs(batch_get_start));
188
189 10 const auto pack_build_start = std::chrono::steady_clock::now();
190
2/2
✓ Branch 1 taken 20 times.
✓ Branch 2 taken 10 times.
30 for (int i = 0; i < keys.Size(); i++) {
191
1/2
✓ Branch 6 taken 20 times.
✗ Branch 7 not taken.
20 packs.emplace_back(keys[i], values[i].Size(), values[i].Data());
192 }
193
1/2
✓ Branch 1 taken 10 times.
✗ Branch 2 not taken.
10 recstore::ReportLocalShmStageMetric(
194 "cache_ps_get_pack_us", recstore::LocalShmElapsedUs(pack_build_start));
195
196 #ifdef ENABLE_PERF_REPORT
197 auto end_time = std::chrono::high_resolution_clock::now();
198 double start_us =
199 std::chrono::duration_cast<std::chrono::microseconds>(
200 start_time.time_since_epoch())
201 .count();
202 auto duration =
203 std::chrono::duration_cast<std::chrono::microseconds>(
204 end_time - start_time)
205 .count();
206
207 std::string report_id = "cache_ps::GetParameterRun2Completion|" +
208 std::to_string(static_cast<uint64_t>(start_us));
209
210 report("embread_stages",
211 report_id.c_str(),
212 "duration_us",
213 static_cast<double>(duration));
214
215 report("embread_stages",
216 report_id.c_str(),
217 "request_size",
218 static_cast<double>(keys.Size()));
219
220 std::string unique_id =
221 "embread_debug|" + std::to_string(static_cast<uint64_t>(start_us));
222 FlameGraphData fg_data = {
223 "cache_ps::GetParameterRun2Completion",
224 start_us,
225 3, // level
226 static_cast<double>(duration),
227 static_cast<double>(duration)};
228 report_flame_graph("emb_read_flame_map", unique_id.c_str(), fg_data);
229 #endif
230 10 return true;
231 10 }
232
233 14 bool GetParameterFlat(
234 base::ConstArray<uint64_t> keys,
235 float* values,
236 int64_t num_rows,
237 int64_t embedding_dim,
238 int tid,
239 FlatGetProfile* profile = nullptr) {
240
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 14 times.
14 if (values == nullptr) {
241 LOG(ERROR) << "GetParameterFlat values pointer is null";
242 return false;
243 }
244
2/4
✓ Branch 0 taken 14 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 14 times.
14 if (num_rows < 0 || embedding_dim <= 0) {
245 LOG(ERROR) << "GetParameterFlat invalid shape rows=" << num_rows
246 << " dim=" << embedding_dim;
247 return false;
248 }
249
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 14 times.
14 if (keys.Size() != static_cast<size_t>(num_rows)) {
250 LOG(ERROR) << "GetParameterFlat keys size mismatch " << keys.Size()
251 << " vs " << num_rows;
252 return false;
253 }
254
255 14 const auto batch_get_start = std::chrono::steady_clock::now();
256 14 BaseKV::BatchGetFlatStats flat_stats;
257 14 BaseKV::BatchGetFlatStats* flat_stats_ptr =
258
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 14 times.
14 profile != nullptr ? &flat_stats : nullptr;
259
1/2
✓ Branch 2 taken 14 times.
✗ Branch 3 not taken.
14 const bool flat_ok = base_kv_->BatchGetFlat(
260 keys, values, num_rows, embedding_dim, tid, flat_stats_ptr);
261
2/2
✓ Branch 0 taken 12 times.
✓ Branch 1 taken 2 times.
14 if (flat_ok) {
262
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 12 times.
12 if (profile != nullptr) {
263 profile->batch_get_ns = static_cast<std::uint64_t>(
264 std::chrono::duration_cast< std::chrono::nanoseconds>(
265 std::chrono::steady_clock::now() - batch_get_start)
266 .count());
267 profile->rows = static_cast<std::uint64_t>(num_rows);
268 profile->value_bytes =
269 static_cast<std::uint64_t>(num_rows) *
270 static_cast<std::uint64_t>(embedding_dim) * sizeof(float);
271 profile->zero_fill_ns = flat_stats.zero_fill_ns;
272 profile->index_lookup_ns = flat_stats.index_lookup_ns;
273 profile->row_copy_ns = flat_stats.row_copy_ns;
274 profile->missing_rows = flat_stats.missing_rows;
275 }
276
1/2
✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
12 recstore::ReportLocalShmStageMetric(
277 "cache_ps_get_batch_get_us",
278 recstore::LocalShmElapsedUs(batch_get_start));
279 12 recstore::ReportLocalShmStageMetric("cache_ps_get_copy_us", 0);
280 12 return true;
281 }
282
283 2 std::vector<base::ConstArray<float>> value_slices;
284
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 value_slices.reserve(static_cast<std::size_t>(num_rows));
285
1/2
✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
2 base_kv_->BatchGet(keys, &value_slices, tid);
286
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 if (profile != nullptr) {
287 profile->batch_get_ns = static_cast<std::uint64_t>(
288 std::chrono::duration_cast< std::chrono::nanoseconds>(
289 std::chrono::steady_clock::now() - batch_get_start)
290 .count());
291 profile->rows = static_cast<std::uint64_t>(num_rows);
292 profile->value_bytes =
293 static_cast<std::uint64_t>(num_rows) *
294 static_cast<std::uint64_t>(embedding_dim) * sizeof(float);
295 }
296
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 recstore::ReportLocalShmStageMetric(
297 "cache_ps_get_batch_get_us",
298 recstore::LocalShmElapsedUs(batch_get_start));
299
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2 times.
2 if (value_slices.size() != static_cast<size_t>(num_rows)) {
300 LOG(ERROR) << "GetParameterFlat BatchGet returned " << value_slices.size()
301 << " rows, expected " << num_rows;
302 return false;
303 }
304
305
1/2
✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
2 for (int64_t row = 0; row < num_rows; ++row) {
306 2 const auto& slice = value_slices[static_cast<size_t>(row)];
307
2/4
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 2 times.
✗ Branch 4 not taken.
4 if (slice.Size() != 0 &&
308
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 static_cast<int64_t>(slice.Size()) != embedding_dim) {
309
4/8
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 2 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 2 times.
✗ Branch 8 not taken.
✓ Branch 10 taken 2 times.
✗ Branch 11 not taken.
4 LOG(ERROR) << "GetParameterFlat embedding_dim mismatch at row=" << row
310
3/6
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 2 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 2 times.
✗ Branch 9 not taken.
2 << " key=" << keys[static_cast<size_t>(row)] << " expected="
311
3/6
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 2 times.
✗ Branch 5 not taken.
✓ Branch 8 taken 2 times.
✗ Branch 9 not taken.
2 << embedding_dim << " actual=" << slice.Size();
312 2 return false;
313 }
314 }
315
316 const auto row_copy_start = std::chrono::steady_clock::now();
317 std::uint64_t missing_zero_fill_ns = 0;
318 for (int64_t row = 0; row < num_rows; ++row) {
319 const auto& slice = value_slices[static_cast<size_t>(row)];
320 if (slice.Size() > 0) {
321 std::memcpy(values + row * embedding_dim,
322 slice.Data(),
323 static_cast<std::size_t>(embedding_dim) * sizeof(float));
324 } else {
325 const auto missing_zero_start = std::chrono::steady_clock::now();
326 std::memset(values + row * embedding_dim,
327 0,
328 static_cast<std::size_t>(embedding_dim) * sizeof(float));
329 if (profile != nullptr) {
330 missing_zero_fill_ns += static_cast<std::uint64_t>(
331 std::chrono::duration_cast< std::chrono::nanoseconds>(
332 std::chrono::steady_clock::now() - missing_zero_start)
333 .count());
334 ++profile->missing_rows;
335 }
336 }
337 }
338 if (profile != nullptr) {
339 profile->zero_fill_ns = missing_zero_fill_ns;
340 profile->row_copy_ns = static_cast<std::uint64_t>(
341 std::chrono::duration_cast< std::chrono::nanoseconds>(
342 std::chrono::steady_clock::now() - row_copy_start)
343 .count());
344 }
345 recstore::ReportLocalShmStageMetric(
346 "cache_ps_get_copy_us", recstore::LocalShmElapsedUs(row_copy_start));
347 return true;
348 2 }
349
350 bool ProbeParameterIndex(base::ConstArray<uint64_t> keys,
351 int tid,
352 FlatGetProfile* profile = nullptr) {
353 const auto batch_get_start = std::chrono::steady_clock::now();
354 BaseKV::BatchGetFlatStats flat_stats;
355 BaseKV::BatchGetFlatStats* flat_stats_ptr =
356 profile != nullptr ? &flat_stats : nullptr;
357 const bool ok = base_kv_->BatchGetIndexOnly(keys, tid, flat_stats_ptr);
358 if (profile != nullptr) {
359 profile->batch_get_ns = static_cast<std::uint64_t>(
360 std::chrono::duration_cast< std::chrono::nanoseconds>(
361 std::chrono::steady_clock::now() - batch_get_start)
362 .count());
363 profile->rows = static_cast<std::uint64_t>(keys.Size());
364 profile->value_bytes = 0;
365 profile->missing_rows = flat_stats.missing_rows;
366 }
367 return ok;
368 }
369
370 bool GetParameterDirectFixedRows(
371 base::ConstArray<uint64_t> keys,
372 int64_t num_rows,
373 int64_t embedding_dim,
374 int tid,
375 std::vector<DirectFixedRow>* rows,
376 FlatGetProfile* profile = nullptr) {
377 const auto batch_get_start = std::chrono::steady_clock::now();
378 BaseKV::BatchGetFlatStats flat_stats;
379 BaseKV::BatchGetFlatStats* flat_stats_ptr =
380 profile != nullptr ? &flat_stats : nullptr;
381 const bool ok = base_kv_->BatchGetDirectFixedRows(
382 keys, num_rows, embedding_dim, tid, rows, flat_stats_ptr);
383 if (profile != nullptr) {
384 profile->batch_get_ns = static_cast<std::uint64_t>(
385 std::chrono::duration_cast< std::chrono::nanoseconds>(
386 std::chrono::steady_clock::now() - batch_get_start)
387 .count());
388 profile->rows = static_cast<std::uint64_t>(num_rows);
389 profile->value_bytes =
390 static_cast<std::uint64_t>(num_rows) *
391 static_cast<std::uint64_t>(embedding_dim) * sizeof(float);
392 profile->zero_fill_ns = flat_stats.zero_fill_ns;
393 profile->index_lookup_ns = flat_stats.index_lookup_ns;
394 profile->row_copy_ns = 0;
395 profile->missing_rows = flat_stats.missing_rows;
396 }
397 return ok;
398 }
399
400 RDMABackingRegion GetRDMABackingRegion() const {
401 return base_kv_->GetRDMABackingRegion();
402 }
403
404 bool GetParameterRun2Completion(
405 coroutine<void>::push_type& sink,
406 base::ConstArray<uint64_t> keys,
407 std::vector<ParameterPack>& pack,
408 int tid) {
409 std::vector<base::ConstArray<float>> values;
410
411 base_kv_->BatchGet(sink, keys, &values, tid);
412
413 for (int i = 0; i < keys.Size(); i++) {
414 pack.emplace_back(keys[i], values[i].Size(), values[i].Data());
415 }
416 return true;
417 }
418
419 /// optimizer interface
420
421 24 bool InitTable(const std::string& table_name,
422 uint64_t num_embeddings,
423 uint64_t embedding_dim) {
424
1/2
✓ Branch 1 taken 24 times.
✗ Branch 2 not taken.
24 if (!optimizer_) {
425 // TODO: optimizer type from config
426
1/2
✓ Branch 1 taken 24 times.
✗ Branch 2 not taken.
24 optimizer_ = std::make_unique<SGD>(0.01);
427 }
428
429 24 EmbeddingTableConfig config{num_embeddings, embedding_dim};
430
5/10
✓ Branch 3 taken 24 times.
✗ Branch 4 not taken.
✓ Branch 7 taken 24 times.
✗ Branch 8 not taken.
✓ Branch 10 taken 24 times.
✗ Branch 11 not taken.
✓ Branch 14 taken 24 times.
✓ Branch 15 taken 24 times.
✗ Branch 19 not taken.
✗ Branch 20 not taken.
48 optimizer_->Init({table_name}, config, base_kv_.get());
431 24 return true;
432 }
433
434 bool UpdateParameter(const std::string& table_name,
435 const ParameterCompressReader* reader,
436 unsigned tid) {
437 if (!optimizer_) {
438 LOG(ERROR) << "Optimizer not initialized. Please call InitTable first.";
439 return false;
440 }
441
442 optimizer_->Update(table_name, reader, tid);
443 return true;
444 }
445
446 6 bool UpdateParameterFlat(
447 const std::string& table_name,
448 const base::ConstArray<uint64_t>& keys,
449 const float* grads,
450 int64_t num_rows,
451 int64_t embedding_dim,
452 unsigned tid) {
453
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6 times.
6 if (grads == nullptr) {
454 LOG(ERROR) << "UpdateParameterFlat grads pointer is null";
455 return false;
456 }
457
2/4
✓ Branch 0 taken 6 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 6 times.
6 if (num_rows < 0 || embedding_dim <= 0) {
458 LOG(ERROR) << "UpdateParameterFlat invalid shape rows=" << num_rows
459 << " dim=" << embedding_dim;
460 return false;
461 }
462
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 6 times.
6 if (keys.Size() != static_cast<size_t>(num_rows)) {
463 LOG(ERROR) << "UpdateParameterFlat keys size mismatch " << keys.Size()
464 << " vs " << num_rows;
465 return false;
466 }
467
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 6 times.
6 if (!optimizer_) {
468 LOG(ERROR) << "Optimizer not initialized. Please call InitTable first.";
469 return false;
470 }
471
1/2
✓ Branch 3 taken 6 times.
✗ Branch 4 not taken.
6 optimizer_->UpdateFlat(
472 table_name, keys, grads, num_rows, embedding_dim, tid);
473 6 return true;
474 }
475
476 private:
477 std::unique_ptr<BaseKV> base_kv_;
478 std::unique_ptr<Optimizer> optimizer_;
479 std::atomic<bool> stopFlag_{false};
480 };
481