5#include "sparta/serialization/checkpoint/Checkpointer.hpp"
6#include "sparta/serialization/checkpoint/DatabaseCheckpoint.hpp"
7#include "simdb/apps/App.hpp"
8#include "simdb/pipeline/Pipeline.hpp"
9#include <unordered_set>
11namespace sparta::serialization::checkpoint
14class DatabaseCheckpointer;
25 static constexpr auto NAME =
"db-checkpointer";
28 using checkpoint_ptr = std::shared_ptr<checkpoint_type>;
29 using checkpoint_ptrs = std::vector<checkpoint_ptr>;
30 using window_id_t = uint64_t;
59 void createPipeline(simdb::pipeline::PipelineManager* pipeline_mgr)
override;
333 void createHead_() override;
345 chkpt_id_t createCheckpoint_(
bool force_snapshot=false) override;
362 void dumpCheckpointNode_(const
chkpt_id_t id, std::ostream& o) override;
390 window_id_t getWindowID_(
chkpt_id_t id) const;
395 void touchWindow_(window_id_t
id);
401 void evictWindowsIfNeeded_(
bool force_flush=false);
408 bool ensureWindowLoaded_(
chkpt_id_t id,
bool must_succeed=true);
415 const std::vector<
char>& window_bytes,
416 bool requires_decompression = true) const;
425 bool loadWindowIntoCache_(window_id_t win_id,
bool must_succeed=true);
432 simdb::SnooperCallbackOutcome handleLoadWindowIntoCacheSnooper_(
440 simdb::SnooperCallbackOutcome handleLoadWindowIntoCacheSnooper_(
442 bool requires_decompression);
449 simdb::SnooperCallbackOutcome handleDeleteCheckpointSnooper_(
457 simdb::SnooperCallbackOutcome handleDeleteCheckpointSnooper_(
459 bool requires_decompression);
475 simdb::ConcurrentQueue<
ChkptWindow>* pipeline_head_ =
nullptr;
478 simdb::ConcurrentQueue<
chkpt_id_t>* pending_deletions_head_ =
nullptr;
481 std::unordered_map<window_id_t, checkpoint_ptrs> chkpts_cache_;
484 std::list<window_id_t> lru_list_;
487 std::unordered_map<window_id_t, std::list<window_id_t>::iterator> lru_map_;
490 utils::ValidValue<uint32_t> max_cached_windows_;
493 mutable std::recursive_mutex cache_mutex_;
496 simdb::DatabaseManager* db_mgr_ =
nullptr;
499 simdb::pipeline::PipelineManager* pipeline_mgr_ =
nullptr;
502 simdb::pipeline::AsyncDatabaseAccessor* db_accessor_ =
nullptr;
505 std::unique_ptr<simdb::pipeline::RunnableFlusher> pipeline_flusher_;
511 utils::ValidValue<window_id_t> snoop_win_id_for_retrieval_;
516 utils::ValidValue<
chkpt_id_t> snoop_chkpt_id_for_deletion_;
520 utils::ValidValue<uint32_t> snap_thresh_;
537class AppFactory<
sparta::serialization::checkpoint::DatabaseCheckpointer> :
public AppFactoryBase
549 roots_by_inst_num_[instance_num] = roots;
560 AppT* createApp(DatabaseManager* db_mgr,
size_t instance_num = 0)
override
562 auto it = roots_by_inst_num_.find(instance_num);
563 if (it == roots_by_inst_num_.end()) {
565 "No TreeNode (ArchData root) set for DatabaseCheckpointer instance number ")
566 << instance_num <<
". Did you forget to call setArchDataRoot()?";
569 const auto & roots = it->second;
572 return new AppT(db_mgr, roots, scheduler_);
575 void defineSchema(Schema& schema)
const override
577 AppT::defineSchema(schema);
582 std::map<size_t, std::vector<sparta::TreeNode*>> roots_by_inst_num_;
void setArchDataRoots(size_t instance_num, const std::vector< sparta::TreeNode * > &roots)
Sets the ArchData root(s) for a given instance of the checkpointer.
void setScheduler(sparta::Scheduler &sched)
Sets the Scheduler for all instances of the checkpointer.
Contains a set of contiguous line of architectural data which can be referred to by any architected o...
A class that lets you schedule events now and in the future.
Used to construct and throw a standard C++ exception. Inherits from std::exception.
Checkpointer interface. Defines an ID-based checkpointing API for tree of related checkpoints which c...
Checkpoint::chkpt_id_t chkpt_id_t
tick_t Tick type to which checkpoints will refer
Checkpoint::tick_t tick_t
tick_t Tick type to which checkpoints will refer
Checkpoint class optimized for use with database-backed checkpointers.
Implementation of the FastCheckpointer which only holds a subset of checkpoints in memory at any give...
uint32_t getNumSnapshots() const noexcept
Gets the current number of snapshots with valid IDs.
std::shared_ptr< DatabaseCheckpoint > findCheckpoint(chkpt_id_t id, bool must_exist=false)
Finds a checkpoint by its ID.
static void defineSchema(simdb::Schema &schema)
Define the SimDB schema for this checkpointer.
std::shared_ptr< DatabaseCheckpoint > findLatestCheckpointAtOrBefore(tick_t tick, chkpt_id_t from)
Finds the latest checkpoint at or before the given tick starting at the from checkpoint and working b...
std::stack< chkpt_id_t > getRestoreChain(chkpt_id_t id)
Returns a stack of checkpoints that must be restored from top-to-bottom to fully restore the state as...
uint32_t getNumCheckpoints() const noexcept override
Gets the current number of checkpoints with valid IDs.
std::stack< chkpt_id_t > getHistoryChain(chkpt_id_t id)
Returns a stack of checkpoints from this checkpoint as far back as possible until no previous link is...
void deleteCheckpoint(chkpt_id_t) override final
Explicit checkpoint deletion is NOT supported by this checkpointer.
std::vector< chkpt_id_t > getNextIDs(chkpt_id_t id) override
Returns IDs of the checkpoints immediately following the given checkpoint.
void dumpRestoreChain(std::ostream &o, chkpt_id_t id)
Dumps the restore chain for this checkpoint.
DatabaseCheckpointer(simdb::DatabaseManager *db_mgr, const std::vector< sparta::TreeNode * > &roots, sparta::Scheduler *sched=nullptr)
DatabaseCheckpointer constructor.
void preTeardown() override
Flush all cached windows down the pipeline before threads are shut down.
void dumpAnnotatedData(std::ostream &o) override
Dumps this checkpointer's data to an ostream with annotations between each ArchData and a newline fol...
uint64_t getContentMemoryUse() const noexcept override
Computes and returns the memory usage by this checkpointer at this moment purely for the checkpoint s...
void setSnapshotThreshold(uint32_t thresh)
Sets the snapshot threshold.
std::vector< chkpt_id_t > getCheckpointsAt(tick_t t) override
Gets all checkpoints taken at tick t.
uint64_t getTotalMemoryUse() const noexcept override
Computes and returns the memory usage by this checkpointer at this moment including any framework ove...
void createPipeline(simdb::pipeline::PipelineManager *pipeline_mgr) override
Instantiate the async processing pipeline to save/load checkpoints.
void dumpData(std::ostream &o) override
Dumps this checkpointer's data to an ostream with a newline following each checkpoint.
uint32_t getNumDeltas() const noexcept
Gets the current number of delta checkpoints with valid IDs.
void loadCheckpoint(chkpt_id_t id) override
Loads state from a specific checkpoint by ID.
bool isCheckpointCached(chkpt_id_t id) const noexcept
Check if the given checkpoint is currently cached in memory.
uint32_t getDistanceToPrevSnapshot(chkpt_id_t id) noexcept
Determines how many checkpoints away the closest, earlier snapshot is.
std::deque< chkpt_id_t > getCheckpointChain(chkpt_id_t id) override
Debugging utility which gets a deque of checkpoints representing a chain starting at the checkpoint h...
std::string stringize() const override
Returns a string describing this object.
uint32_t getSnapshotThreshold() const
Returns the next-shapshot threshold.
void setMaxCachedWindows(uint32_t max_windows)
Sets the max number of cached windows (LRU).
void traceValue(std::ostream &o, chkpt_id_t id, const ArchData *container, uint32_t offset, uint32_t size) override
Debugging utility which dumps values in some bytes across a chain of checkpoints. The intent is to sh...
bool isSnapshot(chkpt_id_t id) noexcept
Check if the given checkpoint is a snapshot (not a delta).
void dumpList(std::ostream &o) override
Dumps this checkpointer's flat list of checkpoints to an ostream with a newline following each checkp...
std::vector< chkpt_id_t > getCheckpoints() override
Gets all checkpoint IDs sorted by tick (or equivalently checkpoint ID).
bool hasCheckpoint(chkpt_id_t id) noexcept override
Tests whether this checkpoint manager has a checkpoint with the given id in the cache or in the datab...
Macros for handling exponential backoff.
Compressed version of ChkptWindow to be stored in the database.
A window of checkpoints to be sent to/from the database as a unit.