Commit b9f0ff0d authored by Marco Perronet's avatar Marco Perronet
Browse files

Refactor model matcher, remove binary search

parent c3f6b488
Pipeline #57668 failed with stages
in 1 minute and 1 second
use structopt::StructOpt;
use serde_yaml;
use std::fs;
// use structopt::StructOpt;
// use serde_yaml;
// use std::fs;
fn main() {
......
......@@ -7,20 +7,22 @@ use rbf_trace::event_generation::evg::{
EventsGenerator,
TraceEvent,
};
use rbf_trace::real_time::model_matching::mm::{
ScalarMM,
CurveMM,
};
fn main() {
// let args = Opt::from_args();
// let source: YamlSource;
let args = Opt::from_args();
let source: YamlSource;
// let mut trace_acycles: HashMap<Pid, InvocationCycle> = HashMap::new();
// if is_json_file(&args.source_path) {
// source = YamlSource::new(&args.source_path);
// } else {
// // Binary
// }
// // TODO define tx_matcher
// if !crate::params::rbf_model_only() {
// }
}
#[derive(Debug)]
......@@ -65,30 +67,13 @@ impl EventsGenerator for YamlSource {
}
}
// #[derive(Debug)]
// struct BinarySource {
// rx: Receiver<EventUpdate>,
// }
// impl BinarySource {
// pub fn new(rx: Receiver<EventUpdate>) -> Self {
// BinarySource {
// rx: rx,
// }
// }
// }
// impl EventsGenerator for BinarySource {
// fn next_event() -> Option<TraceEvent>
// }
#[derive(Debug, StructOpt)]
pub struct Opt {
/// Specify the event source. Can be a JSON file or a pipe with binary data.
/// Specify the event source (YAML file).
#[structopt(short = "s", long)]
pub source_path: String,
/// Specify the output. Can be a JSON file or a pipe with binary data.
/// Specify the output (YAML file).
#[structopt(short = "o", long)]
pub output_path: String,
......@@ -113,9 +98,3 @@ pub struct Opt {
#[structopt(short = "w", long)]
pub max_window: Option<usize>,
}
/* Helpers */
fn is_json_file(s: &String) -> bool {
s.ends_with(".json")
}
......@@ -8,7 +8,6 @@ use crate::event_generation::evg;
use crate::event_processing::invocation_cycle::*;
use crate::util::{export};
use crate::real_time::{
rbf_curve::RbfCurve,
arrival_curve::ArrivalCurve,
arrival_sequence::ArrivalSequence,
arrival::* };
......@@ -22,12 +21,11 @@ pub fn trace(evg: &mut dyn evg::EventsGenerator, target_pids: &Vec<Pid>, tx_evp:
let mut arrival_curves: HashMap<Pid, ArrivalCurve> = HashMap::new();
let mut arrival_seqs_updates: HashMap<Pid, ArrivalSequence> = HashMap::new();
let mut arrival_seqs_whole: HashMap<Pid, ArrivalSequence> = HashMap::new(); // Never trim, used for the replay
let mut rbfs: HashMap<Pid, RbfCurve> = HashMap::new();
let mut new_arrivals_cnt = 0;
let mut curr_update_id = 0;
let mut last_update_time = 0;
// Hot loop in which events from EVG are consumed and curves are built
// Hot loop in which events from EVG are consumed
while let Some(event) = evg.next_event() {
// Update the trace loggers
if crate::params::logger_replay() || crate::params::logger_event_seq() {
......@@ -40,12 +38,8 @@ pub fn trace(evg: &mut dyn evg::EventsGenerator, target_pids: &Vec<Pid>, tx_evp:
let acycle = trace_acycles.entry(event.pid).or_insert(InvocationCycle::new(event.pid, crate::params::ic_heuristic(), crate::params::ic_timeout()));
if let Some(arrival) = acycle.update_activation_cycle(event) {
let rbf_of_pid = rbfs.entry(event.pid).or_insert(RbfCurve::new(event.pid));
let arrivals_of_pid = arrival_seqs_updates.entry(event.pid).or_insert(ArrivalSequence::new(event.pid, true));
arrivals_of_pid.add_arrival(arrival);
if crate::params::extract_rbf_curve() {
rbf_of_pid.add_arrival(arrival);
}
new_arrivals_cnt += 1;
if crate::params::extract_arr_curve() {
......@@ -67,10 +61,9 @@ pub fn trace(evg: &mut dyn evg::EventsGenerator, target_pids: &Vec<Pid>, tx_evp:
if (last_update_elapsed > crate::params::update_interval() ||
new_arrivals_cnt >= crate::params::update_threshold()) &&
new_arrivals_cnt > 0 && !rbfs.is_empty() && !arrival_seqs_updates.is_empty() {
new_arrivals_cnt > 0 && !arrival_seqs_updates.is_empty() {
let mut update = EventUpdate::new(curr_update_id);
update.rbfs = rbfs.clone();
update.arrival_seqs = arrival_seqs_updates.clone();
if crate::params::print_update_size() {
......@@ -80,20 +73,14 @@ pub fn trace(evg: &mut dyn evg::EventsGenerator, target_pids: &Vec<Pid>, tx_evp:
}
}
if rbfs.len() < target_pids.len() { // There are some pids with arrivals but no rbf
filter_useless_entries(&mut update, &rbfs);
}
match tx_evp.send(update) {
// Matching thread has stopped, this thread should stop as well
Err(_) => { eprintln!("TRACE: matching thread stopped"); break; },
Ok(_) => {},
}
// TODO Should do something similar with the RBFs as well, to not send whole busy window
// Clear all arrivals after sending the update
clear_all_arrivals(&mut arrival_seqs_updates);
reset_dirty_flags(&mut rbfs);
new_arrivals_cnt = 0;
curr_update_id += 1;
last_update_time = event.instant;
......@@ -113,13 +100,11 @@ pub fn trace(evg: &mut dyn evg::EventsGenerator, target_pids: &Vec<Pid>, tx_evp:
for pid in target_pids {
let arrivals_of_pid_whole = arrival_seqs_whole.entry(*pid).or_insert(ArrivalSequence::new(*pid, true));
let arrival_curve_of_pid = arrival_curves.entry(*pid).or_insert(ArrivalCurve::new(*pid));
let rbf_of_pid = rbfs.entry(*pid).or_insert(RbfCurve::new(*pid));
let trace_logger_of_pid = trace_loggers.entry(*pid).or_insert(trace_logger::TraceLogger::new(pid.to_string()));
// Export curves for plotting
if crate::params::replay_file().is_none() {
export::export_arrival_curve(&dir_name, &arrival_curve_of_pid, *pid);
export::export_rbf_curve(&dir_name, &rbf_of_pid, *pid);
export::export_arrivals(&arrivals_of_pid_whole, *pid);
export::export_interarrivals(&arrivals_of_pid_whole, *pid);
}
......@@ -143,21 +128,3 @@ fn clear_all_arrivals(arrival_seqs_updates: &mut HashMap<Pid, ArrivalSequence>)
arr_seq.clear_arrivals();
}
}
fn reset_dirty_flags(rbfs: &mut HashMap<Pid, RbfCurve>) {
for (_pid, rbf) in rbfs {
rbf.is_dirty = false;
}
}
// For a given pid: if there are arrivals, but no RBF, there is no point in doing model matching.
// That's because without a WCET (which is stored in the RBF), there is nothing to match.
fn filter_useless_entries(update: &mut EventUpdate, rbfs: &HashMap<Pid, RbfCurve>) {
assert!(update.rbfs.len() <= update.arrival_seqs.len());
assert!(update.rbfs.len() == rbfs.len());
// Remove entries that have arrivals but no RBF
update.arrival_seqs.retain(|pid, _| {
rbfs.get(&pid).is_some()
});
}
......@@ -93,7 +93,7 @@ pub static mut SPARSIFY_FACTOR: u64 = 5;
pub static mut ANALYSIS_ON: bool = true;
pub static mut PJITTER_IGNORE_OFFSET: bool = false;
// Use the RBF model for every task, ignoring other extracted models
pub static mut RBF_MODEL_ONLY: bool = true;
pub static mut RBF_MODEL_ONLY: bool = false;
// Output a curve with the residual supply instead of a response-time bound
pub static mut RESIDUAL_SUPPLY: bool = true;
......
......@@ -16,6 +16,9 @@ pub fn start_rta_thread(rx_analyzer: Receiver<ModelUpdate>, sys_conf: &SysConf)
rta_loop(rx_analyzer, sys_conf);
}
// TODO check for emptiness in analysis loop (in case of rbf_only)
// TODO change the way models are checked
// TODO add an enum for analyses
fn rta_loop(rx_analyzer: Receiver<ModelUpdate>, sys_conf: &SysConf) {
let mut wc_response_times: HashMap<Pid, Duration> = HashMap::new();
let mut rtb_curves: HashMap<Pid, RtbCurve> = HashMap::new();
......@@ -29,15 +32,21 @@ fn rta_loop(rx_analyzer: Receiver<ModelUpdate>, sys_conf: &SysConf) {
let message = rx_analyzer.recv();
if message.is_ok() {
let update = message.unwrap();
if update.events.tracing_stopped {
if update.tracing_stopped {
eprintln!("ANALYSIS: Tracing thread has stopped");
break; // Stop thread
}
if crate::params::print_extracted_models() {
for (pid, model) in &update.chosen_models {
for (pid, _) in &update.rbfs {
let scalar_model = update.chosen_models.get(pid);
eprintln!("Model of pid {}:", pid);
model.pretty_print();
if scalar_model.is_some() { // && !scalar_model.unwrap().is_nomatch()
scalar_model.unwrap().pretty_print();
} else {
eprintln!("RBF");
}
}
eprintln!("--------------------------");
}
......@@ -72,10 +81,12 @@ fn rta_loop(rx_analyzer: Receiver<ModelUpdate>, sys_conf: &SysConf) {
if crate::params::analysis_on() {
if analysis.is_pjitter() || analysis.is_sporadic() {
scalar_rta(analysis, &analyzed_tasks, &cluster.threads, &update.chosen_models, &mut wc_response_times);
} else if analysis.is_rbf() {
rbf_rta(&analyzed_tasks, &cluster.threads, &update.events.rbfs, &mut rtb_curves);
// eprintln!("{:#?}", rtb_curves); // TODO properly output
}
}
// TODO define separate enum for analysis
// else if analysis.is_rbf() {
// rbf_rta(&analyzed_tasks, &cluster.threads, &update.events.rbfs, &mut rtb_curves);
// // eprintln!("{:#?}", rtb_curves); // TODO properly output
// }
}
/* Save to output */
......@@ -158,9 +169,10 @@ fn pick_analysis(sys_conf: &SysConf, chosen_models: &HashMap<Pid, Model>) -> Mod
}
}
if crate::params::rbf_model_only() {
return Model::Rbf;
}
// TODO define separate analysis enum without parameters?
// if crate::params::rbf_model_only() {
// return Model::Rbf;
// }
return model;
}
......
......@@ -13,6 +13,7 @@ pub struct ArrivalSequence {
pub max_interarrival: Duration,
pub sum_interarrival: Duration,
pub max_window: usize,
pub wcet: Cost,
pub tot_observations: u64, // Number of observed arrivals, accounting also the ones before window got trimmed, or the arrivals were reset through clear_arrivals()
}
......@@ -65,9 +66,11 @@ impl ArrivalSequence {
assert!(self.interarrivals.is_empty() && self.arrivals.is_empty());
self.arrivals.push_back(arrival);
}
self.wcet = self.wcet.max(arrival.cost);
}
pub fn append_arrivals(&mut self, seq: &ArrivalSequence) {
pub fn add_arrivals(&mut self, seq: &ArrivalSequence) {
for arr in &seq.arrivals {
self.add_arrival(*arr);
}
......@@ -88,6 +91,7 @@ impl ArrivalSequence {
max_interarrival: 0,
sum_interarrival: 0,
max_window: max_window,
wcet: 0,
tot_observations: 0,
}
}
......@@ -109,15 +113,19 @@ pub struct ArrivalSequenceSubset {
pub arrivals: Vec<Arrival>,
pub pid: Pid,
pub buf_size: usize,
pub wcet: Cost,
// Current feasible periods range
pub t_interval: PeriodRange
pub t_interval: PeriodRange,
}
impl ArrivalSequenceSubset {
/* Returns false if there are no feasible periods */
pub fn add_arrival(&mut self, mut new_arrival : Arrival) -> bool {
pub fn add_arrival(&mut self, mut new_arrival: Arrival) -> Option<PeriodRange> {
// Update wcet
self.wcet = self.wcet.max(new_arrival.cost);
// We assume this to avoid issues related to underflow
assert!(new_arrival.instant >= crate::params::jitter_bound());
......@@ -134,9 +142,11 @@ impl ArrivalSequenceSubset {
let t_interval_arr = PeriodRange::new(t_min, t_max);
let intersection = self.t_interval.intersect(&t_interval_arr);
if intersection.is_some() {
self.t_interval.is_empty = false;
self.t_interval = intersection.unwrap();
} else {
return false; // TODO is it okay to stop updating and just return?
self.t_interval.is_empty = true;
return None; // TODO is it okay to stop updating and just return?
}
// Update t_avg_max and t_avg_min (also for the new arrival)
......@@ -169,18 +179,18 @@ impl ArrivalSequenceSubset {
self.arrivals.push(new_arrival);
}
return true;
Some(self.t_interval)
}
/* Returns false if there are no feasible periods */
pub fn add_arrivals(&mut self, seq: &ArrivalSequence) -> bool {
pub fn add_arrivals(&mut self, seq: &ArrivalSequence) -> Option<PeriodRange> {
for arr in &seq.arrivals {
if !self.add_arrival(*arr) {
return false;
if self.add_arrival(*arr).is_none() {
return None;
}
}
return true;
Some(self.t_interval)
}
pub fn new(pid: Pid) -> Self {
......@@ -188,6 +198,7 @@ impl ArrivalSequenceSubset {
arrivals: Vec::with_capacity(crate::params::arr_buf_size()),
pid: pid,
buf_size: crate::params::arr_buf_size(),
wcet: 0,
t_interval: PeriodRange::default(),
}
}
......
use std::sync::mpsc::*;
use std::collections::HashMap;
use crate::real_time::model_matching::models::*;
use crate::real_time::model_matching::mm::*;
use crate::real_time::arrival_sequence::*;
use crate::util::helpers::*;
use crate::testing::unit_test_matching::*;
use crate::sync::*;
......@@ -11,18 +7,12 @@ pub fn start_matching_thread(rx_matcher: Receiver<EventUpdate>, tx_matcher: Send
matching_loop(rx_matcher, tx_matcher);
}
// TODO update me
pub fn matching_loop(rx_matcher: Receiver<EventUpdate>, tx_matcher: Sender<ModelUpdate>) {
/* A sliding window with MAX_WINDOW most recent arrivals. */
let mut arrival_seqs: HashMap<Pid, ArrivalSequence> = HashMap::new(); // TODO: not sure if this will be useful
/* A buffer that stores only the most relevant arrivals, based on heuristics. */
/* Contains also arrivals from before the sliding window was shifted. */
let mut arrival_buffers: HashMap<Pid, ArrivalSequenceSubset> = HashMap::new();
/* A thread can match multiple models */
let mut models: HashMap<Pid, MatchedModels> = HashMap::new();
/* But only one is chosen */
let mut chosen_models: HashMap<Pid, Model> = HashMap::new();
/* Period intervals to choose the period from */
let mut t_intervals: HashMap<Pid, PeriodRange> = HashMap::new();
let mut scalar_mm = ScalarMM::new();
let mut curve_mm = CurveMM::new();
let mut curr_update_id = 0;
if crate::params::unit_test_matcher() {
unit_test_matching();
......@@ -33,71 +23,37 @@ pub fn matching_loop(rx_matcher: Receiver<EventUpdate>, tx_matcher: Sender<Model
if message.is_ok() {
let update = message.unwrap();
let mut update_models = ModelUpdate::default();
update_models.events = update.clone();
if update_models.events.tracing_stopped {
if update.tracing_stopped {
// Stop this thread, and RTA thread as well
eprintln!("MATCH: Tracing thread has stopped");
match tx_matcher.send(update_models) {
let mut update_stop = ModelUpdate::default(curr_update_id);
update_stop.tracing_stopped = true;
match tx_matcher.send(update_stop) {
Err(_) => { eprintln!("MATCH: Rta thread has stopped "); break; },
Ok(_) => { break; },
}
}
chosen_models.clear();
for (pid, arrival_seq) in &update.arrival_seqs {
/* Add new arrivals to the sliding window */
let arrivals_of_pid = arrival_seqs.entry(*pid).or_insert(ArrivalSequence::new(*pid, false));
arrivals_of_pid.append_arrivals(&arrival_seq);
}
// Will use RBF model if pjitter is not matched: see disambiguate_models()
/* Match */
if !crate::params::rbf_model_only() {
/* Check for pjitter model */
if crate::params::minimize_hyperperiod() {
match_pjitter(&mut models, &mut arrival_buffers, &update.arrival_seqs, &update.rbfs, true);
} else {
match_pjitter(&mut models, &mut arrival_buffers, &update.arrival_seqs, &update.rbfs, false);
}
/* Check for sporadic model */
match_sporadic(&mut models, &arrival_seqs, &update.rbfs);
}
match_rbf(&mut models, &arrival_seqs);
// After updating the arrival buffers in match_pjitter()
for (pid, arr_buf_of_pid) in arrival_buffers.iter() {
t_intervals.insert(*pid, arr_buf_of_pid.t_interval);
}
if crate::params::print_period_ranges() {
eprintln!("Period intervals: {:#?}", t_intervals);
}
/* Disambiguate */
for (pid, models_of_pid) in &models {
let chosen_model_of_pid = chosen_models.entry(*pid).or_insert(Model::NoMatch);
let arrivals_of_pid = arrival_seqs.entry(*pid).or_insert(ArrivalSequence::new(*pid, false));
if arrivals_of_pid.tot_observations >= crate::params::min_observations() {
*chosen_model_of_pid = disambiguate_models(&models_of_pid);
} else {
/* Not enough observations to match a model */
*chosen_model_of_pid = Model::NoMatch;
}
scalar_mm.update_and_match(&update.arrival_seqs);
}
curve_mm.update_rbfs(&update.arrival_seqs);
/* Send update with matched models and rbfs */
update_models.matched_models = models.clone();
update_models.chosen_models = chosen_models.clone();
update_models.t_intervals = t_intervals.clone();
let mut update_models = ModelUpdate::default(curr_update_id);
update_models.matched_models = scalar_mm.get_models().clone();
update_models.chosen_models = scalar_mm.get_chosen_models().clone();
update_models.t_intervals = scalar_mm.get_period_ranges();
update_models.rbfs = curve_mm.get_rbfs().clone();
update_models.tracing_stopped = false;
match tx_matcher.send(update_models) {
// RTA thread has stopped already, this thread should stop as well
Err(_) => { eprintln!("MATCH: Rta thread has stopped"); break; },
Ok(_) => { },
}
}
else {
curr_update_id += 1;
} else {
eprintln!("MATCH: Tracing thread has stopped (on err msg)"); break;
}
}
......
......@@ -6,153 +6,208 @@ use crate::real_time::rbf_curve::*;
use crate::real_time::arrival_sequence::*;
use crate::util::helpers::*;
pub fn match_sporadic(models: &mut HashMap<Pid, MatchedModels>, arrival_seqs: &HashMap<Pid, ArrivalSequence>, rbfs: &HashMap<Pid, RbfCurve>) {
for (pid, arr_seq) in arrival_seqs {
let mit = arr_seq.min_interarrival; // Just take the mit of all the observations so far
let models_of_pid = models.entry(*pid).or_insert(MatchedModels::default());
let rbf_of_pid = rbfs.get(pid).unwrap();
if mit > 0 {
models_of_pid.sporadic = Some(Model::Sporadic(mit, rbf_of_pid.wcet));
} else {
models_of_pid.sporadic = None;
pub struct CurveMM {
/* Output */
rbfs: HashMap<Pid, RbfCurve>,
}
impl CurveMM {
pub fn update_rbfs(&mut self, new_arrivals: &HashMap<Pid, ArrivalSequence>) -> &HashMap<Pid, RbfCurve> {
for (pid, arrivals) in new_arrivals {
let rbf_of_pid = self.rbfs.entry(*pid).or_insert(RbfCurve::new(*pid));
rbf_of_pid.add_arrivals(&arrivals);
}
&self.rbfs
}
}
pub fn match_rbf(models: &mut HashMap<Pid, MatchedModels>, arrival_seqs: &HashMap<Pid, ArrivalSequence>) {
for (pid, _) in arrival_seqs {
let models_of_pid = models.entry(*pid).or_insert(MatchedModels::default());
models_of_pid.rbf = Some(Model::Rbf);
pub fn get_rbfs(&self) -> &HashMap<Pid, RbfCurve> {
&self.rbfs
}
pub fn new() -> Self {
CurveMM {
rbfs: HashMap::new()
}
}
}
/* Pick nice periods based on hyperperiod minimization */
pub fn match_pjitter(models: &mut HashMap<Pid, MatchedModels>, arrival_buffers: &mut HashMap<Pid, ArrivalSequenceSubset>, new_arrivals: &HashMap<Pid, ArrivalSequence>, rbfs: &HashMap<Pid, RbfCurve>, min_hyperperiod: bool) {
pub struct ScalarMM {
/* Saved state */
arrival_seqs: HashMap<Pid, ArrivalSequence>,
arrival_buffers: HashMap<Pid, ArrivalSequenceSubset>,
/* Update arrival buffers and period ranges for the pids that got an update */
for (pid, new_arrs_of_pid) in new_arrivals {
let arr_buf_of_pid = arrival_buffers.entry(*pid).or_insert(ArrivalSequenceSubset::new(*pid));
// The new observations contradict past observations: no feasible periods found for this pid
if !arr_buf_of_pid.add_arrivals(&new_arrs_of_pid) {
let models_of_pid = models.entry(*pid).or_insert(MatchedModels::default());
models_of_pid.pjitter = None;
/* Output */
matched_models: HashMap<Pid, MatchedModels>,
chosen_models: HashMap<Pid, Model>,
}
impl ScalarMM {
/* Returns the chosen models for each pid */
pub fn update_and_match(&mut self, new_arrivals: &HashMap<Pid, ArrivalSequence>) -> &HashMap<Pid, Model> {
/* Update */
self.matched_models.clear();
self.chosen_models.clear();
for (pid, arrivals) in new_arrivals {
let arrival_seq_of_pid = self.arrival_seqs.entry(*pid).or_insert(ArrivalSequence::new(*pid, false));
let arrival_buf_of_pid = self.arrival_buffers.entry(*pid).or_insert(ArrivalSequenceSubset::new(*pid));
arrival_seq_of_pid.add_arrivals(&arrivals);
arrival_buf_of_pid.add_arrivals(&arrivals);
}
/* Match */
if !crate::params::rbf_model_only() {
self.match_pjitter_all();
self.match_sporadic_all();
}
self.disambiguate_model_all();
&self.chosen_models
}
pub fn get_models(&self) -> &HashMap<Pid, MatchedModels> {
&self.matched_models
}
pub fn get_chosen_models(&self) -> &HashMap<Pid, Model> {
&self.chosen_models
}
// Useful for debugging and plotting
pub fn get_period_ranges(&self) -> HashMap<Pid, PeriodRange> {
let mut ret: HashMap<Pid, PeriodRange> = HashMap::new();
for (pid, arr_buf) in &self.arrival_buffers {
ret.insert(*pid, arr_buf.t_interval);
}
ret
}
/*** Match specific models ***/
fn match_sporadic_all(&mut self) {
for (pid, arr_seq) in &self.arrival_seqs {
let models_of_pid = self.matched_models.entry(*pid).or_insert(MatchedModels::default());
models_of_pid.sporadic = match_sporadic(&arr_seq);
}
}
/* Pick nice periods based on hyperperiod minimization */
fn match_pjitter_all(&mut self) {
let pjitter_offset_models = match_pjitter_offset(&self.arrival_buffers);
for (pid, model) in pjitter_offset_models {