The Sparta Modeling Framework
Loading...
Searching...
No Matches
DatabaseCheckpointer.hpp
1// <DatabaseCheckpointer> -*- C++ -*-
2
3#pragma once
4
5#include "sparta/serialization/checkpoint/Checkpointer.hpp"
6#include "sparta/serialization/checkpoint/DatabaseCheckpoint.hpp"
7#include "simdb/apps/App.hpp"
8#include "simdb/pipeline/Pipeline.hpp"
9#include <unordered_set>
10
11namespace sparta::serialization::checkpoint
12{
13
14class DatabaseCheckpointer;
15
22class DatabaseCheckpointer final : public simdb::App, public Checkpointer
23{
24public:
25 static constexpr auto NAME = "db-checkpointer";
26
28 using checkpoint_ptr = std::shared_ptr<checkpoint_type>;
29 using checkpoint_ptrs = std::vector<checkpoint_ptr>;
30 using window_id_t = uint64_t;
31
48 DatabaseCheckpointer(simdb::DatabaseManager* db_mgr, const std::vector<sparta::TreeNode*> & roots,
49 sparta::Scheduler* sched = nullptr);
50
54 static void defineSchema(simdb::Schema& schema);
55
59 void createPipeline(simdb::pipeline::PipelineManager* pipeline_mgr) override;
60
64 void preTeardown() override;
65
80 uint32_t getSnapshotThreshold() const;
81
88 void setSnapshotThreshold(uint32_t thresh);
89
96 void setMaxCachedWindows(uint32_t max_windows);
97
108 uint64_t getTotalMemoryUse() const noexcept override;
109
117 uint64_t getContentMemoryUse() const noexcept override;
118
124 void deleteCheckpoint(chkpt_id_t) override final;
125
138 void loadCheckpoint(chkpt_id_t id) override;
139
148 std::vector<chkpt_id_t> getCheckpointsAt(tick_t t) override;
149
156 std::vector<chkpt_id_t> getCheckpoints() override;
157
161 uint32_t getNumCheckpoints() const noexcept override;
162
166 uint32_t getNumSnapshots() const noexcept;
167
171 uint32_t getNumDeltas() const noexcept;
172
184 std::deque<chkpt_id_t> getCheckpointChain(chkpt_id_t id) override;
185
194 std::shared_ptr<DatabaseCheckpoint> findCheckpoint(chkpt_id_t id, bool must_exist=false);
195
214
221 bool hasCheckpoint(chkpt_id_t id) noexcept override;
222
229 void dumpRestoreChain(std::ostream& o, chkpt_id_t id);
230
238
245
251 std::vector<chkpt_id_t> getNextIDs(chkpt_id_t id) override;
252
261
266 bool isSnapshot(chkpt_id_t id) noexcept;
267
271 std::string stringize() const override;
272
278 void dumpList(std::ostream& o) override;
279
285 void dumpData(std::ostream& o) override;
286
293 void dumpAnnotatedData(std::ostream& o) override;
294
310 void traceValue(std::ostream& o, chkpt_id_t id, const ArchData* container, uint32_t offset, uint32_t size) override;
311
319 bool isCheckpointCached(chkpt_id_t id) const noexcept;
320
321private:
322
333 void createHead_() override;
334
345 chkpt_id_t createCheckpoint_(bool force_snapshot=false) override;
346
357 void deleteCheckpoint_(chkpt_id_t id);
358
362 void dumpCheckpointNode_(const chkpt_id_t id, std::ostream& o) override;
363
371 void setHead_(DatabaseCheckpoint* head);
372
379 void setCurrent_(DatabaseCheckpoint* current);
380
385 void addToCache_(std::shared_ptr<checkpoint_type> chkpt);
386
390 window_id_t getWindowID_(chkpt_id_t id) const;
391
395 void touchWindow_(window_id_t id);
396
401 void evictWindowsIfNeeded_(bool force_flush=false);
402
408 bool ensureWindowLoaded_(chkpt_id_t id, bool must_succeed=true);
409
414 std::unique_ptr<ChkptWindow> deserializeWindow_(
415 const std::vector<char>& window_bytes,
416 bool requires_decompression = true) const;
417
425 bool loadWindowIntoCache_(window_id_t win_id, bool must_succeed=true);
426
432 simdb::SnooperCallbackOutcome handleLoadWindowIntoCacheSnooper_(
433 std::deque<ChkptWindow>& queue);
434
440 simdb::SnooperCallbackOutcome handleLoadWindowIntoCacheSnooper_(
441 std::deque<ChkptWindowBytes>& queue,
442 bool requires_decompression);
443
449 simdb::SnooperCallbackOutcome handleDeleteCheckpointSnooper_(
450 std::deque<ChkptWindow>& queue);
451
457 simdb::SnooperCallbackOutcome handleDeleteCheckpointSnooper_(
458 std::deque<ChkptWindowBytes>& queue,
459 bool requires_decompression);
460
466 void forEachCheckpoint_(const std::function<void(const DatabaseCheckpoint*)>& cb);
467
469 chkpt_id_t head_id_ = checkpoint_type::UNIDENTIFIED_CHECKPOINT;
470
472 chkpt_id_t current_id_ = checkpoint_type::UNIDENTIFIED_CHECKPOINT;
473
475 simdb::ConcurrentQueue<ChkptWindow>* pipeline_head_ = nullptr;
476
478 simdb::ConcurrentQueue<chkpt_id_t>* pending_deletions_head_ = nullptr;
479
481 std::unordered_map<window_id_t, checkpoint_ptrs> chkpts_cache_;
482
484 std::list<window_id_t> lru_list_;
485
487 std::unordered_map<window_id_t, std::list<window_id_t>::iterator> lru_map_;
488
490 utils::ValidValue<uint32_t> max_cached_windows_;
491
493 mutable std::recursive_mutex cache_mutex_;
494
496 simdb::DatabaseManager* db_mgr_ = nullptr;
497
499 simdb::pipeline::PipelineManager* pipeline_mgr_ = nullptr;
500
502 simdb::pipeline::AsyncDatabaseAccessor* db_accessor_ = nullptr;
503
505 std::unique_ptr<simdb::pipeline::RunnableFlusher> pipeline_flusher_;
506
511 utils::ValidValue<window_id_t> snoop_win_id_for_retrieval_;
512
516 utils::ValidValue<chkpt_id_t> snoop_chkpt_id_for_deletion_;
517
520 utils::ValidValue<uint32_t> snap_thresh_;
521
523 chkpt_id_t next_chkpt_id_;
524};
525
526} // namespace sparta::serialization::checkpoint
527
528namespace simdb
529{
530
536template <>
537class AppFactory<sparta::serialization::checkpoint::DatabaseCheckpointer> : public AppFactoryBase
538{
539public:
541
547 void setArchDataRoots(size_t instance_num, const std::vector<sparta::TreeNode*>& roots)
548 {
549 roots_by_inst_num_[instance_num] = roots;
550 }
551
556 {
557 scheduler_ = &sched;
558 }
559
560 AppT* createApp(DatabaseManager* db_mgr, size_t instance_num = 0) override
561 {
562 auto it = roots_by_inst_num_.find(instance_num);
563 if (it == roots_by_inst_num_.end()) {
565 "No TreeNode (ArchData root) set for DatabaseCheckpointer instance number ")
566 << instance_num << ". Did you forget to call setArchDataRoot()?";
567 }
568
569 const auto & roots = it->second;
570
571 // Make the ctor call that the default AppFactory cannot make.
572 return new AppT(db_mgr, roots, scheduler_);
573 }
574
575 void defineSchema(Schema& schema) const override
576 {
577 AppT::defineSchema(schema);
578 }
579
580private:
581 sparta::Scheduler* scheduler_ = nullptr;
582 std::map<size_t, std::vector<sparta::TreeNode*>> roots_by_inst_num_;
583};
584
585} // namespace simdb
void setArchDataRoots(size_t instance_num, const std::vector< sparta::TreeNode * > &roots)
Sets the ArchData root(s) for a given instance of the checkpointer.
void setScheduler(sparta::Scheduler &sched)
Sets the Scheduler for all instances of the checkpointer.
Contains a set of contiguous line of architectural data which can be referred to by any architected o...
Definition ArchData.hpp:39
A class that lets you schedule events now and in the future.
Used to construct and throw a standard C++ exception. Inherits from std::exception.
Checkpointer interface. Defines an ID-based checkpointing API for tree of related checkpoints which c...
Checkpoint::chkpt_id_t chkpt_id_t
tick_t Tick type to which checkpoints will refer
Checkpoint::tick_t tick_t
tick_t Tick type to which checkpoints will refer
Checkpoint class optimized for use with database-backed checkpointers.
Implementation of the FastCheckpointer which only holds a subset of checkpoints in memory at any give...
uint32_t getNumSnapshots() const noexcept
Gets the current number of snapshots with valid IDs.
std::shared_ptr< DatabaseCheckpoint > findCheckpoint(chkpt_id_t id, bool must_exist=false)
Finds a checkpoint by its ID.
static void defineSchema(simdb::Schema &schema)
Define the SimDB schema for this checkpointer.
std::shared_ptr< DatabaseCheckpoint > findLatestCheckpointAtOrBefore(tick_t tick, chkpt_id_t from)
Finds the latest checkpoint at or before the given tick starting at the from checkpoint and working b...
std::stack< chkpt_id_t > getRestoreChain(chkpt_id_t id)
Returns a stack of checkpoints that must be restored from top-to-bottom to fully restore the state as...
uint32_t getNumCheckpoints() const noexcept override
Gets the current number of checkpoints with valid IDs.
std::stack< chkpt_id_t > getHistoryChain(chkpt_id_t id)
Returns a stack of checkpoints from this checkpoint as far back as possible until no previous link is...
void deleteCheckpoint(chkpt_id_t) override final
Explicit checkpoint deletion is NOT supported by this checkpointer.
std::vector< chkpt_id_t > getNextIDs(chkpt_id_t id) override
Returns IDs of the checkpoints immediately following the given checkpoint.
void dumpRestoreChain(std::ostream &o, chkpt_id_t id)
Dumps the restore chain for this checkpoint.
DatabaseCheckpointer(simdb::DatabaseManager *db_mgr, const std::vector< sparta::TreeNode * > &roots, sparta::Scheduler *sched=nullptr)
DatabaseCheckpointer constructor.
void preTeardown() override
Flush all cached windows down the pipeline before threads are shut down.
void dumpAnnotatedData(std::ostream &o) override
Dumps this checkpointer's data to an ostream with annotations between each ArchData and a newline fol...
uint64_t getContentMemoryUse() const noexcept override
Computes and returns the memory usage by this checkpointer at this moment purely for the checkpoint s...
void setSnapshotThreshold(uint32_t thresh)
Sets the snapshot threshold.
std::vector< chkpt_id_t > getCheckpointsAt(tick_t t) override
Gets all checkpoints taken at tick t.
uint64_t getTotalMemoryUse() const noexcept override
Computes and returns the memory usage by this checkpointer at this moment including any framework ove...
void createPipeline(simdb::pipeline::PipelineManager *pipeline_mgr) override
Instantiate the async processing pipeline to save/load checkpoints.
void dumpData(std::ostream &o) override
Dumps this checkpointer's data to an ostream with a newline following each checkpoint.
uint32_t getNumDeltas() const noexcept
Gets the current number of delta checkpoints with valid IDs.
void loadCheckpoint(chkpt_id_t id) override
Loads state from a specific checkpoint by ID.
bool isCheckpointCached(chkpt_id_t id) const noexcept
Check if the given checkpoint is currently cached in memory.
uint32_t getDistanceToPrevSnapshot(chkpt_id_t id) noexcept
Determines how many checkpoints away the closest, earlier snapshot is.
std::deque< chkpt_id_t > getCheckpointChain(chkpt_id_t id) override
Debugging utility which gets a deque of checkpoints representing a chain starting at the checkpoint h...
std::string stringize() const override
Returns a string describing this object.
uint32_t getSnapshotThreshold() const
Returns the next-shapshot threshold.
void setMaxCachedWindows(uint32_t max_windows)
Sets the max number of cached windows (LRU).
void traceValue(std::ostream &o, chkpt_id_t id, const ArchData *container, uint32_t offset, uint32_t size) override
Debugging utility which dumps values in some bytes across a chain of checkpoints. The intent is to sh...
bool isSnapshot(chkpt_id_t id) noexcept
Check if the given checkpoint is a snapshot (not a delta).
void dumpList(std::ostream &o) override
Dumps this checkpointer's flat list of checkpoints to an ostream with a newline following each checkp...
std::vector< chkpt_id_t > getCheckpoints() override
Gets all checkpoint IDs sorted by tick (or equivalently checkpoint ID).
bool hasCheckpoint(chkpt_id_t id) noexcept override
Tests whether this checkpoint manager has a checkpoint with the given id in the cache or in the datab...
Macros for handling exponential backoff.
Compressed version of ChkptWindow to be stored in the database.
A window of checkpoints to be sent to/from the database as a unit.