Commit 2bfeb56c authored by Marco Perronet's avatar Marco Perronet
Browse files

Fix update interval, write matcher unit tests

parent 05025046
Pipeline #58196 failed with stages
in 42 seconds
...@@ -130,7 +130,7 @@ git clone --recurse-submodules ...@@ -130,7 +130,7 @@ git clone --recurse-submodules
You need to have the [Rust toolchain](https://www.rust-lang.org/tools/install) and Python3 installed. Make sure you also have installed all the following python packages (mostly for plotting). You need to have the [Rust toolchain](https://www.rust-lang.org/tools/install) and Python3 installed. Make sure you also have installed all the following python packages (mostly for plotting).
``` ```
pip3 install numpy scipy pandas seaborn matplotlib psutil pip3 install numpy scipy pandas seaborn matplotlib psutil pyyaml
``` ```
For the webserver test script: For the webserver test script:
``` ```
......
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
task set: task set:
- id: 1 - id: 1
period: 100 period: 1000000000
worst-case execution time: 50 worst-case execution time: 100000000
priority: 1 priority: 1
... ...
\ No newline at end of file
...@@ -3,7 +3,6 @@ use serde_yaml; ...@@ -3,7 +3,6 @@ use serde_yaml;
use std::fs::{ use std::fs::{
OpenOptions, OpenOptions,
remove_file, remove_file,
read_to_string,
}; };
use std::collections::HashMap; use std::collections::HashMap;
use std::path::PathBuf; use std::path::PathBuf;
...@@ -12,8 +11,8 @@ use std::io::Write; ...@@ -12,8 +11,8 @@ use std::io::Write;
use rbf_trace::util::helpers::*; use rbf_trace::util::helpers::*;
use rbf_trace::event_generation::evg::{ use rbf_trace::event_generation::evg::{
EventsGenerator, EventsGenerator,
TraceEvent,
}; };
use rbf_trace::event_generation::yaml::*;
use rbf_trace::event_processing::invocation_cycle::*; use rbf_trace::event_processing::invocation_cycle::*;
use rbf_trace::real_time::arrival_sequence::*; use rbf_trace::real_time::arrival_sequence::*;
use rbf_trace::real_time::model_matching::mm::{ use rbf_trace::real_time::model_matching::mm::{
...@@ -26,7 +25,7 @@ use rbf_trace::real_time::rbf_curve::*; ...@@ -26,7 +25,7 @@ use rbf_trace::real_time::rbf_curve::*;
fn main() { fn main() {
let args = Opt::from_args(); let args = Opt::from_args();
/* Raw events ("TraceEvent") */ /* Raw events ("TraceEvent") */
let mut trace_source = YamlSource::new(&args.source_path); let mut trace_source = YamlEVG::new(&args.source_path);
/* Automaton that produces arrivals form raw events */ /* Automaton that produces arrivals form raw events */
let mut trace_cycles: HashMap<Pid, InvocationCycle> = HashMap::new(); let mut trace_cycles: HashMap<Pid, InvocationCycle> = HashMap::new();
/* Processed events ("Arrival") */ /* Processed events ("Arrival") */
...@@ -36,6 +35,7 @@ fn main() { ...@@ -36,6 +35,7 @@ fn main() {
let mut curve_models: &HashMap<Pid, RbfCurve> = &HashMap::new(); let mut curve_models: &HashMap<Pid, RbfCurve> = &HashMap::new();
let mut output = dd::Output::new(); let mut output = dd::Output::new();
let mut last_update_time = 0; let mut last_update_time = 0;
let mut first_event_time = 0;
/* Model matcher instances */ /* Model matcher instances */
let mut scalar_mm = ScalarMM::new( let mut scalar_mm = ScalarMM::new(
...@@ -60,6 +60,7 @@ fn main() { ...@@ -60,6 +60,7 @@ fn main() {
/* Perform model matching every update_interval seconds */ /* Perform model matching every update_interval seconds */
if last_update_time == 0 { if last_update_time == 0 {
last_update_time = event.instant; last_update_time = event.instant;
first_event_time = event.instant;
} }
let last_update_elapsed = ns_to_s(event.instant - last_update_time); let last_update_elapsed = ns_to_s(event.instant - last_update_time);
...@@ -81,6 +82,20 @@ fn main() { ...@@ -81,6 +82,20 @@ fn main() {
} }
} }
/* Trace might have been shorter than update_interval */
if last_update_time == first_event_time {
scalar_models = scalar_mm.update_and_match(&new_arrivals);
curve_models = curve_mm.update_rbfs(&new_arrivals);
/* Print current model */
if args.print {
for (pid, model) in scalar_models {
eprintln!("PID {}:", pid);
model.pretty_print();
}
}
}
/* Convert to output format */ /* Convert to output format */
output.scalar_models = scalar_models.clone(); output.scalar_models = scalar_models.clone();
for (pid, rbf) in curve_models { for (pid, rbf) in curve_models {
...@@ -109,57 +124,6 @@ fn main() { ...@@ -109,57 +124,6 @@ fn main() {
} }
} }
#[derive(Debug)]
struct YamlSource {
trace: Vec<TraceEvent>,
curr_event_idx: usize,
path: String,
}
/* Event Generator */
impl YamlSource {
pub fn new(path: &String) -> Self {
let mut ret = YamlSource {
trace: Vec::new(),
curr_event_idx: 0,
path: String::new(),
};
ret.path = path.clone();
ret.setup();
ret
}
}
impl EventsGenerator for YamlSource {
fn setup(&mut self) {
let s = &read_to_string(&self.path).unwrap();
self.trace = serde_yaml::from_str(s).unwrap();
}
fn shutdown(&mut self) {
println!("Trace ended!");
}
fn next_event(&mut self) -> Option<TraceEvent> {
if self.curr_event_idx < self.trace.len() {
// Check monotonicity
// TODO Can fail in non-uniprocessor configurations (clocks in different CPUs are not synchronized)
if self.curr_event_idx > 0 && self.trace[self.curr_event_idx-1].instant > self.trace[self.curr_event_idx].instant {
eprintln!("{} > {} !", self.trace[self.curr_event_idx-1].instant, self.trace[self.curr_event_idx].instant);
panic!();
}
let ret = Some(self.trace[self.curr_event_idx]);
self.curr_event_idx += 1;
return ret;
} else {
return None;
}
}
}
/* Args */ /* Args */
#[derive(Debug, StructOpt)] #[derive(Debug, StructOpt)]
...@@ -168,7 +132,7 @@ pub struct Opt { ...@@ -168,7 +132,7 @@ pub struct Opt {
#[structopt(short = "s", long)] #[structopt(short = "s", long)]
pub source_path: String, pub source_path: String,
/// Specify the output (YAML file). If not specified, will only print human-readable output. /// Specify the output (YAML file). If not specified, will only print human readable output.
#[structopt(short = "o", long, parse(from_os_str))] #[structopt(short = "o", long, parse(from_os_str))]
pub output_path: Option<PathBuf>, pub output_path: Option<PathBuf>,
......
// use crate::util::helpers::*;
// use crate::event_generation::evg::*;
// /* Generate a period constant pattern ARPRD, with only one process */
// use crate::params::PID;
// use crate::params::TIME_BETWEEN_ARRIVALS;
// use crate::params::TIME_BETWEEN_EVENTS;
// use crate::params::LIFETIME;
// pub struct DummyEVG {
// time_limit : Time,
// state : TraceEventType,
// time : Time,
// preempted : bool,
// }
// impl DummyEVG {
// pub fn new() -> DummyEVG {
// DummyEVG {
// time_limit : LIFETIME * 1000000,
// state : TraceEventType::Deactivation,
// time : 0,
// preempted : false }
// }
// }
// impl EventsGenerator for DummyEVG {
// fn next_event (&mut self) -> Option<TraceEvent> {
// let mut next_event = TraceEvent {
// etype : TraceEventType::Activation,
// pid : PID,
// instant : 0
// };
// self.time += TIME_BETWEEN_EVENTS * 1000000;
// next_event.instant = self.time;
// if self.time > self.time_limit {
// println!("Dummy events generator exited; event was requested.");
// return None;
// }
// //Make sequence [ARPRD] while there is time, then E.
// next_event.etype =
// match self.state {
// TraceEventType::Activation => { self.preempted = false; TraceEventType::Resume }
// TraceEventType::Deactivation => TraceEventType::Activation,
// TraceEventType::Preemption => { self.preempted = true; TraceEventType::Resume }
// TraceEventType::Resume => if self.preempted { TraceEventType::Deactivation } else { TraceEventType::Preemption },
// TraceEventType::Exit => TraceEventType::Exit
// };
// if self.time >= self.time_limit { next_event.etype = TraceEventType::Exit; }
// self.state = next_event.etype;
// Some(next_event)
// }
// fn setup (&mut self) {
// println!("Dummy events generator loaded.");
// }
// fn shutdown (&mut self) {
// println!("Dummy events generator shut down.");
// }
// }
\ No newline at end of file
pub mod dummy; pub mod yaml;
pub mod evg; pub mod evg;
pub mod ftrace; pub mod ftrace;
pub mod replay; pub mod replay;
......
// TODO this is outdated
use crate::event_generation::evg::*; use crate::event_generation::evg::*;
use std::fs; use std::fs;
......
use crate::event_generation::evg::*;
use serde_yaml;
use std::fs;
#[derive(Debug)]
pub struct YamlEVG {
trace: Vec<TraceEvent>,
curr_event_idx: usize,
path: String,
}
/* Event Generator */
impl YamlEVG {
pub fn new(path: &String) -> Self {
let mut ret = YamlEVG {
trace: Vec::new(),
curr_event_idx: 0,
path: String::new(),
};
ret.path = path.clone();
ret.setup();
ret
}
}
impl EventsGenerator for YamlEVG {
fn setup(&mut self) {
let s = &fs::read_to_string(&self.path).unwrap();
self.trace = serde_yaml::from_str(s).unwrap();
}
fn shutdown(&mut self) {
println!("Trace ended!");
}
fn next_event(&mut self) -> Option<TraceEvent> {
if self.curr_event_idx < self.trace.len() {
// Check monotonicity
// TODO Can fail in non-uniprocessor configurations (clocks in different CPUs are not synchronized)
if self.curr_event_idx > 0 && self.trace[self.curr_event_idx-1].instant > self.trace[self.curr_event_idx].instant {
eprintln!("{} > {} !", self.trace[self.curr_event_idx-1].instant, self.trace[self.curr_event_idx].instant);
panic!();
}
let ret = Some(self.trace[self.curr_event_idx]);
self.curr_event_idx += 1;
return ret;
} else {
return None;
}
}
}
...@@ -24,6 +24,7 @@ pub fn trace(evg: &mut dyn evg::EventsGenerator, target_pids: &Vec<Pid>, tx_evp: ...@@ -24,6 +24,7 @@ pub fn trace(evg: &mut dyn evg::EventsGenerator, target_pids: &Vec<Pid>, tx_evp:
let mut new_arrivals_cnt = 0; let mut new_arrivals_cnt = 0;
let mut curr_update_id = 0; let mut curr_update_id = 0;
let mut last_update_time = 0; let mut last_update_time = 0;
let mut first_event_time = 0;
// Hot loop in which events from EVG are consumed // Hot loop in which events from EVG are consumed
while let Some(event) = evg.next_event() { while let Some(event) = evg.next_event() {
...@@ -55,7 +56,8 @@ pub fn trace(evg: &mut dyn evg::EventsGenerator, target_pids: &Vec<Pid>, tx_evp: ...@@ -55,7 +56,8 @@ pub fn trace(evg: &mut dyn evg::EventsGenerator, target_pids: &Vec<Pid>, tx_evp:
// Send update // Send update
// TODO should we check new_arrivals_cnt for each pid? // TODO should we check new_arrivals_cnt for each pid?
if last_update_time == 0 { if last_update_time == 0 {
last_update_time = event.instant; last_update_time = event.instant;
first_event_time = event.instant;
} }
let last_update_elapsed = ns_to_s(event.instant - last_update_time); let last_update_elapsed = ns_to_s(event.instant - last_update_time);
...@@ -87,6 +89,25 @@ pub fn trace(evg: &mut dyn evg::EventsGenerator, target_pids: &Vec<Pid>, tx_evp: ...@@ -87,6 +89,25 @@ pub fn trace(evg: &mut dyn evg::EventsGenerator, target_pids: &Vec<Pid>, tx_evp:
} }
} }
/* Trace might have been shorter than update_interval */
if first_event_time == last_update_time &&
new_arrivals_cnt > 0 && !arrival_seqs_updates.is_empty() {
let mut update = EventUpdate::new(curr_update_id);
update.arrival_seqs = arrival_seqs_updates.clone();
if crate::params::print_update_size() {
eprintln!("Sending update:");
for (k, v) in update.arrival_seqs.iter() {
eprintln!("{} new arrivals for pid {}", v.arrivals.len(), k);
}
}
match tx_evp.send(update) {
// Matching thread has stopped, this thread should stop as well
Err(_) => { eprintln!("TRACE: matching thread stopped"); },
Ok(_) => {},
}
}
let mut stop_update = EventUpdate::default(); let mut stop_update = EventUpdate::default();
stop_update.tracing_stopped = true; stop_update.tracing_stopped = true;
match tx_evp.send(stop_update) { match tx_evp.send(stop_update) {
......
...@@ -67,7 +67,7 @@ impl InvocationCycle { ...@@ -67,7 +67,7 @@ impl InvocationCycle {
} }
// Only a Deactivation, Activation, or Exit can mark the completion of an invocation cycle // Only a Deactivation, Activation, or Exit can mark the completion of an invocation cycle
pub fn update_activation_cycle(&mut self, event : TraceEvent) -> Option<Arrival> { pub fn update_activation_cycle(&mut self, event: TraceEvent) -> Option<Arrival> {
match event.etype { match event.etype {
TraceEventType::Activation => { self.activation(event.instant) }, TraceEventType::Activation => { self.activation(event.instant) },
TraceEventType::Deactivation => { self.deactivation(event.instant) }, TraceEventType::Deactivation => { self.deactivation(event.instant) },
...@@ -77,8 +77,13 @@ impl InvocationCycle { ...@@ -77,8 +77,13 @@ impl InvocationCycle {
} }
} }
fn activation(&mut self, instant:Time) -> Option<Arrival> { fn activation(&mut self, instant: Time) -> Option<Arrival> {
assert!(instant > self.last_event_time); // eprintln!("ACTIVATION {}", instant);
// eprintln!("ACTIVATION Last {}", self.last_event_time);
if self.last_event_type.is_some() {
assert!(instant > self.last_event_time);
}
match self.last_event_type match self.last_event_type
{ {
Some (TraceEventType::Deactivation) => { Some (TraceEventType::Deactivation) => {
...@@ -97,7 +102,6 @@ impl InvocationCycle { ...@@ -97,7 +102,6 @@ impl InvocationCycle {
Some(Arrival::new(activation, final_cost, final_ss_time, final_ss_cnt)) Some(Arrival::new(activation, final_cost, final_ss_time, final_ss_cnt))
} else { // Account self-suspension } else { // Account self-suspension
eprintln!("*** SELF-SUSPENSION DETECTED (Timeout at {}) (Pid {}) ***", time_since_last_deactivation, self.pid); // TODO remove me eventually
self.curr_ss_time += time_since_last_deactivation; self.curr_ss_time += time_since_last_deactivation;
self.curr_ss_cnt += 1; self.curr_ss_cnt += 1;
self.last_event_time = instant; self.last_event_time = instant;
...@@ -122,6 +126,8 @@ impl InvocationCycle { ...@@ -122,6 +126,8 @@ impl InvocationCycle {
match self.last_event_type match self.last_event_type
{ {
Some (TraceEventType::Activation) => { Some (TraceEventType::Activation) => {
// eprintln!("RESUME {}", instant);
// eprintln!("RESUME Last {}", self.last_event_time);
assert!(instant >= self.last_event_time); // There can be an activation and resume at the same time assert!(instant >= self.last_event_time); // There can be an activation and resume at the same time
self.last_event_type = Some(TraceEventType::Resume); self.last_event_type = Some(TraceEventType::Resume);
self.last_event_time = instant; self.last_event_time = instant;
...@@ -137,7 +143,10 @@ impl InvocationCycle { ...@@ -137,7 +143,10 @@ impl InvocationCycle {
} }
fn preemption(&mut self, instant:Time) { fn preemption(&mut self, instant:Time) {
assert!(instant > self.last_event_time); if self.last_event_type.is_some() {
assert!(instant > self.last_event_time);
}
match self.last_event_type match self.last_event_type
{ {
Some (TraceEventType::Resume) => { Some (TraceEventType::Resume) => {
...@@ -151,7 +160,10 @@ impl InvocationCycle { ...@@ -151,7 +160,10 @@ impl InvocationCycle {
} }
fn deactivation(&mut self, instant:Time) -> Option<Arrival> { fn deactivation(&mut self, instant:Time) -> Option<Arrival> {
assert!(instant > self.last_event_time); if self.last_event_type.is_some() {
assert!(instant > self.last_event_time);
}
match self.last_event_type match self.last_event_type
{ {
Some (TraceEventType::Resume) => { Some (TraceEventType::Resume) => {
......
...@@ -6,5 +6,5 @@ pub mod real_time; ...@@ -6,5 +6,5 @@ pub mod real_time;
pub mod event_generation; pub mod event_generation;
pub mod event_processing; pub mod event_processing;
pub mod config_detection; pub mod config_detection;
pub mod testing; pub mod unit_tests;
pub mod util; pub mod util;
\ No newline at end of file
...@@ -9,7 +9,6 @@ use crate::config_detection::features::*; ...@@ -9,7 +9,6 @@ use crate::config_detection::features::*;
use crate::util::output::*; use crate::util::output::*;
use crate::util::helpers::*; use crate::util::helpers::*;
use crate::util::export; use crate::util::export;
use crate::testing::unit_test_analysis::*;
use crate::sync::*; use crate::sync::*;
pub fn start_rta_thread(rx_analyzer: Receiver<ModelUpdate>, sys_conf: &SysConf) { pub fn start_rta_thread(rx_analyzer: Receiver<ModelUpdate>, sys_conf: &SysConf) {
...@@ -23,10 +22,6 @@ fn rta_loop(rx_analyzer: Receiver<ModelUpdate>, sys_conf: &SysConf) { ...@@ -23,10 +22,6 @@ fn rta_loop(rx_analyzer: Receiver<ModelUpdate>, sys_conf: &SysConf) {
let mut wc_response_times: HashMap<Pid, Duration> = HashMap::new(); let mut wc_response_times: HashMap<Pid, Duration> = HashMap::new();
let mut rtb_curves: HashMap<Pid, RtbCurve> = HashMap::new(); let mut rtb_curves: HashMap<Pid, RtbCurve> = HashMap::new();
let mut output: Output = Output::default(); let mut output: Output = Output::default();
if crate::params::unit_test_analysis() {
unit_test_analysis();
}
loop { loop {
let message = rx_analyzer.recv(); let message = rx_analyzer.recv();
......
use std::sync::mpsc::*; use std::sync::mpsc::*;
use crate::real_time::model_matching::mm::*; use crate::real_time::model_matching::mm::*;
use crate::testing::unit_test_matching::*;
use crate::sync::*; use crate::sync::*;
pub fn start_matching_thread(rx_matcher: Receiver<EventUpdate>, tx_matcher: Sender<ModelUpdate>) { pub fn start_matching_thread(rx_matcher: Receiver<EventUpdate>, tx_matcher: Sender<ModelUpdate>) {
...@@ -19,10 +18,6 @@ pub fn matching_loop(rx_matcher: Receiver<EventUpdate>, tx_matcher: Sender<Model ...@@ -19,10 +18,6 @@ pub fn matching_loop(rx_matcher: Receiver<EventUpdate>, tx_matcher: Sender<Model
let mut curve_mm = CurveMM::new(crate::params::window_size()); let mut curve_mm = CurveMM::new(crate::params::window_size());
let mut curr_update_id = 0; let mut curr_update_id = 0;
if crate::params::unit_test_matcher() {
unit_test_matching();
}
loop { loop {
let message = rx_matcher.recv(); let message = rx_matcher.recv();
if message.is_ok() { if message.is_ok() {
......
// use std::collections::HashMap;
// use crate::real_time::model_matching::models::*;
// use crate::real_time::rbf_curve::*;
// use crate::real_time::arrival_sequence::*;
// use crate::util::helpers::*;
// use crate::params::JITTER_BOUND;
pub fn unit_test_matching() {
// let mut trace_1 = ArrivalSequence::new_dummy(vec!(0, 5, 10, 15, 20, 25, 30));
// let trace_1_arrs = Vec::from(trace_1.arrivals.clone());
// let mut trace_2 = ArrivalSequence::new_dummy(vec!(3, 6, 8, 12, 15));
// let trace_2_arrs = Vec::from(trace_2.arrivals.clone());
// let mut trace_3 = ArrivalSequence::new_dummy(vec!(10, 21, 29, 40, 50, 60));
// let trace_3_arrs = Vec::from(trace_3.arrivals.clone());
// let mut trace_4 = ArrivalSequence::new_dummy(vec!(234335, 334783, 434773, 534717));
// let trace_4_arrs = Vec::from(trace_4.arrivals.clone());
// let mut trace_5 = ArrivalSequence::new_dummy(vec!(3, 6, 9, 11, 14));
// let trace_5_arrs = Vec::from(trace_5.arrivals.clone());
// let mut trace_6 = ArrivalSequence::new_dummy(vec!(4616124428000, 4616124507000, 4616704632000, 4617301585000));
// let trace_6_arrs = Vec::from(trace_6.arrivals.clone());
// let traces = vec!(&mut trace_1, &mut trace_2, &mut trace_3, &mut trace_4, &mut trace_5, &mut trace_6);
// let dummy_rbf = RbfCurve::new(123);
/* Test fit_period() */
// TODO old tests: won't work now because of offset underflow
// assert!(fit_period(&trace_1_arrs, 6).is_none());
// assert!(fit_period(&trace_1_arrs, 5).unwrap() == (0, 0));
// assert!(fit_period(&trace_2_arrs, 5).is_none());
// assert!(fit_period(&trace_2_arrs, 4).is_none());
// assert!(fit_period(&trace_2_arrs, 3).unwrap() == (1, 2));
// assert!(fit_period(&trace_3_arrs, 5).is_none());
// assert!(fit_period(&trace_3_arrs, 9).unwrap() == (5, 10));
// assert!(fit_period(&trace_3_arrs, 10).unwrap() == (2, 9));
// assert!(fit_period(&trace_5_arrs, 2).is_none());
// assert!(fit_period(&trace_5_arrs, 3).unwrap() == (1, 2));
// assert!(fit_period(&trace_6_arrs, 298516000).is_none());
/* Test match_pjitter() */
// let arrivals: HashMap<Pid, ArrivalSequence> = HashMap::new();
// let arrival_buffers: HashMap<Pid, ArrivalSequenceSubset> = HashMap::new();
// let models: HashMap<Pid, MatchedModels> = HashMap::new();
// let mut rbfs: HashMap<Pid, RbfCurve> = HashMap::new();
// rbfs.insert(123, dummy_rbf);
// // Increase by JITTER_BOUND to avoid underflow issues (by assumption)
// for t in traces {
// for arr in &mut t.arrivals {
// arr.instant = arr.instant + JITTER_BOUND;
// }
// }
/* Single thread */