The Sparta Modeling Framework
Loading...
Searching...
No Matches
StreamNode.hpp
1// <StreamNode> -*- C++ -*-
2
3#pragma once
4
5#include "sparta/report/Report.hpp"
7
8#include <queue>
9#include <mutex>
10
11#include <vector>
12#include <string>
13#include <sstream>
14#include <algorithm>
15
16namespace sparta {
17
18class Report;
19class StatisticInstance;
20
21namespace statistics {
22
23class StreamController;
24
36{
37public:
38 virtual ~StreamNode() {}
39 StreamNode(const StreamNode &) = delete;
40 StreamNode & operator=(const StreamNode &) = delete;
41
43 const std::string & getName() const {
44 return name_;
45 }
46
58 const std::string & getFullPath() {
59 if (!full_path_.empty()) {
60 return full_path_;
61 }
62 resolveFullPath_();
63 return full_path_;
64 }
65
67 std::vector<std::shared_ptr<StreamNode>> & getChildren() {
68 return children_;
69 }
70
72 const std::vector<std::shared_ptr<StreamNode>> & getChildren() const {
73 return children_;
74 }
75
80 void setParent(StreamNode * parent) {
81 sparta_assert(parent_ == nullptr || parent_ == parent,
82 "Cannot reassign a StreamNode's parent node");
83 parent_ = parent;
84 }
85
89 if (cached_root_ != nullptr) {
90 return cached_root_;
91 }
92
93 //Walk to the top node, and cache it for faster
94 //access later on
95 StreamNode * parent = this;
96 while (parent != nullptr) {
97 cached_root_ = parent;
98 parent = cached_root_->parent_;
99 }
100 return cached_root_;
101 }
102
104 void initialize() {
105 if (is_initialized_) {
106 return;
107 }
108
109 //Subclasses now turn their vector/scalar SI(s) into a single
110 //vector<double>, with all of their SI's connected to that
111 //data vector via SnapshotLogger's. When anyone asks those
112 //SI(s) what their current value is, the double value will
113 //be written into our vector. It also goes into that vector
114 //at the same index every time, so we can safely send the
115 //entire vector to any sink for faster processing than if
116 //we had to process just one point at a time.
117 initialize_();
118
119 //The reporting infrastructure will notify our root node
120 //whenever a report write/update was made. We need the root
121 //to forward that call to us so we can put the SI data vector
122 //into a buffer queue for asynchronous processing.
123 getRoot()->addStreamListener_(this);
124
125 is_initialized_ = true;
126 }
127
134
143
150 void getBufferedStreamData(std::queue<std::vector<double>> & data_queue);
151
155 void setStreamController(const std::shared_ptr<StreamController> & controller) {
156 sparta_assert(controller_ == nullptr,
157 "You cannot reset a StreamNode's stream controller");
158 controller_ = controller;
159 }
160
161protected:
162 explicit StreamNode(const std::string & name) :
163 name_(name)
164 {
165 sparta_assert(!name_.empty(), "You may not create a StreamNode without a name");
166 }
167
168private:
169 //When a report update occurs, the root stream node will be notified.
170 //This root node contains all of the child nodes that have a streaming
171 //client attached to them. We call these children "listeners".
172 //
173 // Report update -> Root StreamNode
174 // |
175 // ---------------------
176 // | |
177 // ChildA ChildB
178 // (no client) (has clients)
179 //
180 //In this example, listeners_ = {ChildB*}
181 void addStreamListener_(StreamNode * listener) {
182 std::lock_guard<std::mutex> guard(listeners_mutex_);
183 listeners_.emplace_back(listener);
184 }
185
186 //The simulation synchronously pushes packets of data into
187 //the root StreamNode, and we keep that data organized in
188 //a map of child StreamNode* -> queue<packet>
189 //
190 //This data can be consumed on a separate thread if desired.
191 //
192 // ** TEMPORARY: While asynchronous C++/Python communication
193 // ** is developed, we will process this buffered data from
194 // ** the main thread.
195 void appendDataValuesForListener_(StreamNode * listener,
196 const std::vector<double> & data)
197 {
198 std::lock_guard<std::mutex> guard(listeners_mutex_);
199 listeners_data_[listener].push(data);
200 }
201
202 //The consumer thread (or the main thread during a forced
203 //synchronous flush) is requesting all buffered data for
204 //a particular client. We do not do any bookkeeping to
205 //account for that released data. As far as the StreamNode
206 //is concerned, the data is gone forever.
207 void releaseDataBufferForListener_(
208 StreamNode * listener,
209 std::queue<std::vector<double>> & data_queue)
210 {
211 std::lock_guard<std::mutex> guard(listeners_mutex_);
212 auto iter = listeners_data_.find(listener);
213 if (iter != listeners_data_.end()) {
214 std::swap(data_queue, iter->second);
215 }
216 }
217
218 //Walk up to the topmost node in our hierarchy, and create
219 //a string for the full node path.
220 void resolveFullPath_() {
221 std::vector<std::string> path;
222 path.push_back(name_);
223
224 const StreamNode * parent = parent_;
225 while (parent) {
226 path.emplace_back(parent->name_);
227 parent = parent->parent_;
228 }
229
230 //We now have a vector that looks like:
231 // "ipc.rob.core0.top"
232 //
233 //So flip it around and dot-delimit it like TreeNode's do
234 std::reverse(path.begin(), path.end());
235 std::ostringstream oss;
236 for (const auto & p : path) {
237 oss << p << ".";
238 }
239
240 full_path_ = oss.str();
241 if (!full_path_.empty() && full_path_.back() == '.') {
242 full_path_.pop_back();
243 }
244 }
245
249 virtual void initialize_() = 0;
250
254 virtual const std::vector<double> & readFromStream_() = 0;
255
257 const std::string name_;
258 std::string full_path_;
259
260 StreamNode * parent_ = nullptr;
261 StreamNode * cached_root_ = nullptr;
262
263 std::vector<std::shared_ptr<StreamNode>> children_;
264 bool is_initialized_ = false;
265
291 std::vector<StreamNode*> listeners_;
292
293 std::unordered_map<
294 StreamNode*,
295 std::queue<std::vector<double>>> listeners_data_;
296
297 std::mutex listeners_mutex_;
298
319 std::shared_ptr<StreamController> controller_;
320};
321
330{
331public:
332 ReportStreamNode(const std::string & name,
333 const Report * r) :
334 StreamNode(name),
335 report_(r)
336 {
337 sparta_assert(report_, "Null Report given to a ReportStreamNode");
338 }
339
340 ReportStreamNode(const ReportStreamNode &) = delete;
341 ReportStreamNode & operator=(const ReportStreamNode &) = delete;
342
343private:
346 void initialize_() override;
347
351 const std::vector<double> & readFromStream_() override;
352
356 const Report * report_ = nullptr;
357 std::vector<const StatisticInstance*> stat_insts_;
358 std::vector<double> aggregated_si_values_;
359};
360
368{
369public:
370 StatisticInstStreamNode(const std::string & name,
371 const StatisticInstance * si) :
372 StreamNode(name),
373 stat_inst_(si)
374 {
375 sparta_assert(stat_inst_, "Null StatisticInstance given to a "
376 "StatisticInstStreamNode");
377 }
378
380 StatisticInstStreamNode & operator=(const StatisticInstStreamNode &) = delete;
381
382private:
385 void initialize_() override;
386
390 const std::vector<double> & readFromStream_() override;
391
393 const StatisticInstance * stat_inst_ = nullptr;
394 std::vector<double> one_si_value_;
395};
396
397} // namespace statistics
398} // namespace sparta
399
Set of macros for Sparta assertions. Caught by the framework.
#define sparta_assert(...)
Simple variadic assertion that will throw a sparta_exception if the condition fails.
Instance of either a StatisticDef or CounterBase or an Expression. Has a sample window (simulator tic...
In the stream node hierarchy, this class is used wherever we encounter a sparta::Report node,...
In the stream node hierarchy, this class is used wherever we encounter a sparta::StatisticInstance le...
When a simulation is configured to stream its statistics values for asynchronous processing,...
std::vector< std::shared_ptr< StreamNode > > & getChildren()
Direct descendants of this node, if any.
void getBufferedStreamData(std::queue< std::vector< double > > &data_queue)
const std::string & getName() const
StreamNode name - similar to a TreeNode's name.
void initialize()
Streaming interface.
const std::vector< std::shared_ptr< StreamNode > > & getChildren() const
Direct descendants of this node, if any.
const std::string & getFullPath()
void setStreamController(const std::shared_ptr< StreamController > &controller)
void setParent(StreamNode *parent)
Macros for handling exponential backoff.