The Sparta Modeling Framework
Loading...
Searching...
No Matches
ReportStatsCollector.hpp
1#pragma once
2
3#include "simdb/apps/App.hpp"
4#include "simdb/utils/ConcurrentQueue.hpp"
5
6#include <map>
7#include <unordered_map>
8#include <unordered_set>
9
10namespace sparta {
11 class Report;
12 class Scheduler;
13 class StatisticInstance;
14}
15
16namespace sparta::report::format {
17 class ReportHeader;
18}
19
20namespace sparta::app {
21
22class ReportDescriptor;
23
30
31class ReportStatsCollector : public simdb::App
32{
33public:
34 static constexpr auto NAME = "simdb-reports";
35
36 ReportStatsCollector(simdb::DatabaseManager* db_mgr)
37 : db_mgr_(db_mgr)
38 {}
39
40 static void defineSchema(simdb::Schema&);
41
42 void createPipeline(simdb::pipeline::PipelineManager* pipeline_mgr) override;
43
44 void setScheduler(const Scheduler* scheduler);
45
46 void addDescriptor(const ReportDescriptor* desc);
47
48 int getDescriptorID(const ReportDescriptor* desc,
49 bool must_exist = true) const;
50
51 void setHeader(const ReportDescriptor* desc,
52 const report::format::ReportHeader& header);
53
54 void updateReportMetadata(const ReportDescriptor* desc,
55 const std::string& key,
56 const std::string& value);
57
58 void updateReportStartTime(const ReportDescriptor* desc);
59
60 void updateReportEndTime(const ReportDescriptor* desc);
61
62 void postInit(int argc, char** argv) override;
63
64 void collect(const ReportDescriptor* desc);
65
66 void writeSkipAnnotation(const ReportDescriptor* desc,
67 const std::string& annotation);
68
69 void postTeardown() override;
70
71private:
72 void writeReportInfo_(const ReportDescriptor* desc);
73
74 void writeReportInfo_(const ReportDescriptor* desc,
75 const Report* r,
76 std::unordered_set<std::string>& visited_stats,
77 int parent_report_id);
78
79 using Descriptor = std::tuple<std::string, std::string, std::string, std::string>;
80 std::vector<std::pair<const ReportDescriptor*, Descriptor>> descriptors_;
81 std::unordered_map<const ReportDescriptor*, int> descriptor_ids_;
82 std::unordered_map<const ReportDescriptor*, const report::format::ReportHeader*> descriptor_headers_;
83 std::unordered_map<const ReportDescriptor*, std::vector<int>> descriptor_report_ids_;
84 std::unordered_map<const ReportDescriptor*, std::vector<int>> descriptor_report_style_ids_;
85 std::unordered_map<const ReportDescriptor*, std::vector<int>> descriptor_report_meta_ids_;
86 std::unordered_map<const ReportDescriptor*, std::vector<const StatisticInstance*>> simdb_stats_;
87 std::unordered_map<const ReportDescriptor*, uint64_t> report_start_times_;
88 std::unordered_map<const ReportDescriptor*, uint64_t> report_end_times_;
89 std::unordered_map<const ReportDescriptor*, std::map<std::string, std::string>> report_metadata_;
90 std::unordered_map<const ReportDescriptor*, std::vector<std::pair<uint64_t, std::string>>> report_skip_annotations_;
91 const Scheduler* scheduler_ = nullptr;
92 simdb::DatabaseManager* db_mgr_ = nullptr;
93
94 class ReportAtTick
95 {
96 public:
97 ReportAtTick(const ReportDescriptor* descriptor, uint64_t tick)
98 : descriptor_(descriptor)
99 , tick_(tick)
100 {}
101
102 // Default constructor needed to read these out of simdb::ConcurrentQueue's
103 ReportAtTick() = default;
104
105 const ReportDescriptor* getDescriptor() const
106 {
107 return descriptor_;
108 }
109
110 uint64_t getTick() const
111 {
112 return tick_;
113 }
114
115 private:
116 const ReportDescriptor* descriptor_ = nullptr;
117 uint64_t tick_ = 0;
118 };
119
120 class ReportStatsAtTick : public ReportAtTick
121 {
122 public:
123 ReportStatsAtTick(const ReportDescriptor* descriptor,
124 uint64_t tick,
125 std::vector<double>&& stats)
126 : ReportAtTick(descriptor, tick)
127 , stats_(std::move(stats))
128 {}
129
130 // Default constructor needed to read these out of simdb::ConcurrentQueue's
131 ReportStatsAtTick() = default;
132
133 std::vector<char> compress() const;
134
135 private:
136 std::vector<double> stats_;
137 };
138
139 class CompressedReportStatsAtTick : public ReportAtTick
140 {
141 public:
142 CompressedReportStatsAtTick(ReportStatsAtTick&& uncompressed)
143 : ReportAtTick(uncompressed.getDescriptor(), uncompressed.getTick())
144 , bytes_(uncompressed.compress())
145 {}
146
147 // Default constructor needed to read these out of simdb::ConcurrentQueue's
148 CompressedReportStatsAtTick() = default;
149
150 const std::vector<char>& getBytes() const
151 {
152 return bytes_;
153 }
154
155 private:
156 std::vector<char> bytes_;
157 };
158
159 simdb::ConcurrentQueue<ReportStatsAtTick>* pipeline_queue_ = nullptr;
160};
161
162} // namespace sparta::app
A class that lets you schedule events now and in the future.
Describes one or more report to instantiate.
Sparta Application framework.
Macros for handling exponential backoff.