Commit 898f110b authored by Marco Perronet's avatar Marco Perronet
Browse files

Merge fast rbf extraction

parent ba293cd8
......@@ -23,6 +23,7 @@ num = "0.1.32"
cpu-time = "1.0.0"
itertools = "0.10.1"
structopt = "0.3.17"
linked-list = "0.0.3"
[features]
profile = []
......
......@@ -44,7 +44,9 @@ pub fn trace(evg: &mut dyn evg::EventsGenerator, target_pids: &Vec<Pid>, tx_evp:
let rbf_of_pid = rbfs.entry(event.pid).or_insert(RbfCurve::new(event.pid));
let arrivals_of_pid = arrival_seqs_updates.entry(event.pid).or_insert(ArrivalSequence::new(event.pid, true));
arrivals_of_pid.add_arrival(arrival);
rbf_of_pid.add_arrival(arrival);
if crate::params::extract_rbf_curve() {
rbf_of_pid.add_arrival(arrival);
}
new_arrivals_cnt += 1;
if crate::params::extract_arr_curve() {
......
......@@ -114,7 +114,7 @@ impl InvocationCycle {
None
},
_ => { self.reset(); None }
_ => { println!("Invocation cycle warning: Last event type: {:#?} Pid {}", self.last_event_type.unwrap(), self.pid); self.reset(); None }
}
}
......
......@@ -31,6 +31,7 @@ mod real_time {
pub mod rbf_curve;
pub mod rtb_curve;
pub mod arrival_sequence;
pub mod sparse_map;
pub mod analysis {
pub mod models;
pub mod analysis;
......
......@@ -7,14 +7,14 @@ use crate::events_processing::invocation_cycle::IcHeuristic;
/* Debug */
pub static mut DUMP_RAW_TRACE: bool = false;
pub static mut DUMP_RAW_TRACE_W_TYPE: bool = false; // TODO TURN ME OFF
pub static mut DUMP_RAW_TRACE_W_TYPE: bool = false;
pub static mut LOGGER_EVENT_SEQ: bool = false;
// Record the current run
pub static mut LOGGER_REPLAY: bool = false; // Only turn this off when measuring performance!
// Replay a previous run with this filename
pub static mut REPLAY_FILE: Option<String> = None;
pub static mut UNIT_TEST_MATCHER: bool = false; // TODO TURN ME BACK ON ONCE UNIT TESTS ARE FIXED
pub static mut UNIT_TEST_ANALYSIS: bool = false; // TODO TURN ME BACK ON
pub static mut UNIT_TEST_ANALYSIS: bool = false; // TODO TURN ME BACK ON ONCE UNIT TESTS ARE FIXED
/* Prints */
pub static mut PRINT_EXTRACTED_MODELS: bool = true;
......@@ -53,7 +53,7 @@ pub static mut LIFETIME: Time = 100000; //ms - total duration of experiment
/* Event processing */
pub static mut EXTRACT_ARR_CURVE: bool = false;
pub static mut EXTRACT_RBF_CURVE: bool = false; // TODO the rbf curve extraction should be performed by the model matcher instead. It should be performed only if the RBF model is picked.
pub static mut EXTRACT_RBF_CURVE: bool = true; // TODO the rbf curve extraction should be performed by the model matcher instead. It should be performed only if the RBF model is picked.
// Each UPDATE_INTERVAL seconds an update is sent to the matching thread
pub static mut UPDATE_INTERVAL: f32 = 5.;
// Number of new arrivals after which an update will be send to the analysis thread, regardless of UPDATE_INTERVAL
......@@ -90,12 +90,12 @@ pub static mut SPARSIFY_FACTOR: u64 = 5;
/* Analysis */
pub static mut ANALYSIS_ON: bool = false;
pub static mut ANALYSIS_ON: bool = true;
pub static mut PJITTER_IGNORE_OFFSET: bool = false;
// Use the RBF model for every task, ignoring other extracted models
pub static mut RBF_MODEL_ONLY: bool = false;
pub static mut RBF_MODEL_ONLY: bool = true;
// Output a curve with the residual supply instead of a response-time bound
pub static mut RESIDUAL_SUPPLY: bool = false;
pub static mut RESIDUAL_SUPPLY: bool = true;
/* Curves */
......
......@@ -74,7 +74,7 @@ fn rta_loop(rx_analyzer: Receiver<ModelUpdate>, sys_conf: &SysConf) {
scalar_rta(analysis, &analyzed_tasks, &cluster.threads, &update.chosen_models, &mut wc_response_times);
} else if analysis.is_rbf() {
rbf_rta(&analyzed_tasks, &cluster.threads, &update.events.rbfs, &mut rtb_curves);
eprintln!("{:#?}", rtb_curves);
eprintln!("{:#?}", rtb_curves); // TODO properly output
}
}
......
use crate::real_time::arrival::*;
use crate::util::helpers::*;
use crate::feature_detection::system::*;
use crate::real_time::sparse_map::*;
use std::collections::VecDeque;
use std::collections::BTreeMap;
use itertools::merge;
/* The "curve" map maps distance to total cost.
It answers the question: What is the minimum distance to observe AT MOST a total cost of c?
Distance 0 is considered to be a single arrival. */
The distance is *exclusive*, meaning that:
- Distance 0 is considered to be 0.
- Distance 1 is considered to be a single arrival. */
#[derive(Debug, Clone)]
pub struct RbfCurve {
pub last_arrivals_window: VecDeque <Arrival>,
pub max_window: usize,
pub curve: BTreeMap<Duration, Cost>,
pub curve: SparseMap,
pub wcet: Cost,
pub pid: Pid,
pub prio: Priority,
......@@ -21,108 +24,85 @@ pub struct RbfCurve {
impl RbfCurve {
pub fn add_arrival(&mut self, arrival : Arrival) {
// TODO the rbf curve extraction should be performed by the model matcher instead. It should be performed only if the RBF model is picked.
if crate::params::extract_rbf_curve() {
let t = arrival.instant;
let mut curr_observed_tot_cost = 0;
// sanity check: the arrival times must be monotonic
assert!(t >= self.last_arrivals_window.back().unwrap_or(&arrival).instant);
// add to treat the observed_gap = 0 case
self.last_arrivals_window.push_back(arrival);
// look at all arrival times in the sliding window, in order
// from most recent to oldest
for arr in self.last_arrivals_window.iter().rev() {
// Compute the separation from the current arrival t to the arrival
// of the (i + 1)-th preceding job.
// So if i=0, we are looking at two adjacent jobs.
let observed_gap = t - arr.instant;
curr_observed_tot_cost += arr.cost;
if !self.curve.contains_key(&observed_gap) {
// we have not yet seen a distance of length "observed_gap" -> first sample
// add new distance only if the observed total cost is bigger than the one observed in the nearest preceding gap
if curr_observed_tot_cost > self.get(observed_gap) {
self.curve.insert(observed_gap, curr_observed_tot_cost);
remove_outdated_entries_after_key(&mut self.curve, observed_gap);
self.is_dirty = true;
}
}
else {
// update belief if we have seen something of more total cost than previously
if curr_observed_tot_cost > self.curve[&observed_gap] {
self.curve.insert(observed_gap, curr_observed_tot_cost);
self.is_dirty = true;
}
}
}
// trim sliding window if necessary
if self.last_arrivals_window.len() > self.max_window {
self.last_arrivals_window.pop_front();
let t = arrival.instant;
let mut curr_observed_tot_cost = 0;
// sanity check: the arrival times must be monotonic
assert!(t >= self.last_arrivals_window.back().unwrap_or(&arrival).instant);
// add to treat the observed_gap = 0 case
self.last_arrivals_window.push_back(arrival);
// look at all arrival times in the sliding window, in order
// from most recent to oldest
for arr in self.last_arrivals_window.iter().rev() {
// Compute the separation from the current arrival t to the arrival
// of the (i + 1)-th preceding job.
// So if i=0, we are looking at two adjacent jobs.
let observed_gap = t - arr.instant + 1;
curr_observed_tot_cost += arr.cost;
// we have not yet seen a distance of length "observed_gap" -> first sample
// add new distance only if the observed total cost is bigger than the one observed in the nearest preceding gap
if curr_observed_tot_cost > self.get(observed_gap) {
let p = Point::new(observed_gap, curr_observed_tot_cost);
self.curve.add(p);
self.is_dirty = true;
}
}
// trim sliding window if necessary
if self.last_arrivals_window.len() > self.max_window {
self.last_arrivals_window.pop_front();
}
// update WCET
self.wcet = self.wcet.max(arrival.cost); // TODO could just return the cost for key 0
}
// Returns the lower nearest cost to delta (we only store the steps)
pub fn get(&self, delta: Duration) -> Cost {
return self.get_nearest_lt_cost(delta);
}
pub fn new(pid: Pid) -> Self {
let mut curve = BTreeMap::new();
curve.insert(0, 0); // TODO is this necessary?
RbfCurve {
last_arrivals_window: VecDeque::with_capacity(crate::params::max_window()+1),
max_window: crate::params::max_window(),
curve: curve,
wcet: 0,
pid: pid,
prio: get_priority(pid),
is_dirty: false,
}
}
fn get_nearest_lt_cost(&self, key: Duration) -> Cost {
let nearest_lt_cost: Duration = match self.curve.range(..key).next_back() {
None => 0,
Some(kv) => *kv.1,
};
return nearest_lt_cost;
return self.curve.get(delta);
}
pub fn sum(&mut self, other: &RbfCurve) {
// Cloning the first curve because we would need to mutate it while iterating
let curve_iter = self.curve.clone();
let mut iter = merge(curve_iter.into_iter(), other.curve.into_iter()).peekable();
let mut curr_base_cost = 0;
let mut iter = merge(self.curve.clone(), other.curve.clone()).peekable();
while let Some((delta, cost)) = iter.next() {
// Unlike "add", the "insert" method does not delete entries to keep monotonicity
while let Some(point) = iter.next() {
if let Some((next_delta, next_cost)) = iter.peek() {
if let Some(next_point) = iter.peek() {
if delta == *next_delta { // Both curves have this step
curr_base_cost = cost + next_cost;
let to_update = self.curve.get_mut(&delta).unwrap();
*to_update = curr_base_cost;
if point.delta == next_point.delta { // Both curves have this step
curr_base_cost = point.cost + next_point.cost;
let res = self.curve.update_cost(point.delta, curr_base_cost);
assert!(res.is_ok());
iter.next(); // Skip next entry
} else { // Only one curve has this step
curr_base_cost += cost;
self.curve.insert(delta, curr_base_cost);
eprintln!("base {} += {}", curr_base_cost, point.cost);
curr_base_cost += point.cost; // TODO attempt to add with overflow
self.curve.insert(Point::new(point.delta, curr_base_cost));
}
} else { // End reached, only one curve has this step
self.curve.insert(delta, curr_base_cost + cost);
self.curve.insert(Point::new(point.delta, curr_base_cost + point.cost));
}
}
}
}
// Remove entries after the insterted key such that they have a lower cost (preserve monotonicity)
// Passing the map to make the borrow checker happy
fn remove_outdated_entries_after_key(map: &mut BTreeMap<Duration, Cost>, inserted_key: Duration) {
let inserted_key_cost = *map.get(&inserted_key).unwrap();
map.retain(|k, v| !(k > &inserted_key && *v <= inserted_key_cost));
pub fn new(pid: Pid) -> Self {
let mut curve = SparseMap::new(crate::params::max_window());
curve.add(Point::new(0, 0));
RbfCurve {
last_arrivals_window: VecDeque::with_capacity(crate::params::max_window()+1),
max_window: crate::params::max_window(),
curve: curve,
wcet: 0,
pid: pid,
prio: 0,
is_dirty: false,
}
}
}
......@@ -2,11 +2,10 @@ use crate::real_time::rbf_curve::*;
use crate::util::helpers::*;
use crate::feature_detection::system::*;
/* This curve represents how long it takes in the worst case to receive a certain amount of service.
In the vector, index i represents cost i+1. */
/* This curve represents how long it takes in the worst case to receive a certain amount of service. */
#[derive(Debug, Clone)]
pub struct RtbCurve {
pub max_window: usize,
pub max_window: usize, // TODO use me for assertions
pub curve: Vec<RtbRange>,
pub pid: Pid,
pub prio: Priority,
......@@ -20,18 +19,45 @@ pub struct RtbRange {
impl RtbRange {
pub fn new(l: (Cost, Duration), r: (Cost, Duration)) -> Self {
return RtbRange {
RtbRange {
l: l,
r: r,
};
}
}
}
impl RtbCurve {
// We can extrapolate because the sbf grows linearly in each range, 1 unit at a time
pub fn get(&mut self, demand: Cost) -> Duration {
unimplemented!();
// This is a naive linear search, could be done faster with a hashmap
pub fn get(&self, demand: Cost) -> Duration {
let mut l_delta: Duration;
let mut r_delta: Duration;
let mut l_demand: Cost;
let mut r_demand = 0;
if demand == 0 {
return 0;
}
for range in &self.curve {
l_demand = range.l.0;
l_delta = range.l.1;
r_demand = range.r.0;
r_delta = range.r.1;
assert!(l_demand <= r_demand);
assert!(l_delta <= r_delta);
// Found
if l_demand <= demand && demand <= r_demand {
return l_delta + (demand - l_demand);
}
}
// TODO then should we return a delta extrapolated from an "extended" cruve?
assert!(r_demand < demand);
return 0;
}
// Subtract from SBF(delta) = delta and simultaneously convert to the "Demand => Delta" representation.
......@@ -42,10 +68,19 @@ impl RtbCurve {
let mut prev_service: i64 = 0;
let mut curr_max_service: i64 = 0;
for (del, dem) in &rbf.curve {
let delta = *del as i64;
let demand = *dem as i64;
for point in &rbf.curve {
// TODO super ugly, then why not make the rbf inclusive?
// Doing this because the rbf is exclusive, but the sbf must be inclusive
if point.delta == 0 {
continue;
}
// TODO ugly
// - 1 because of exclusiveness
let delta = (point.delta - 1) as i64;
let demand = point.cost as i64;
let service = delta - demand;
// These two can be inferred since the sbf grows linearly
let service_at_peak = prev_service + (delta - prev_delta);
let delta_curr_max = delta - (service_at_peak - curr_max_service);
......@@ -75,7 +110,7 @@ impl RtbCurve {
max_window: crate::params::max_window(),
curve: curve,
pid: pid,
prio: get_priority(pid),
prio: 0,
}
}
}
use linked_list::*;
use crate::util::helpers::*;
use serde::{Serialize, Deserialize};
#[derive(Debug, Clone, Copy, PartialEq, PartialOrd, Serialize, Deserialize)]
pub struct Point {
pub delta: Duration,
pub cost: Cost,
}
impl Point {
pub fn new(delta: Duration, cost: Cost) -> Self {
Point {
delta: delta,
cost: cost,
}
}
}
#[derive(Debug, Clone)]
pub struct SparseMap {
pub buckets : Vec<LinkedList<Point>>,
pub count : usize,
pub capacity : usize,
pub bucket_size: usize,
}
impl SparseMap {
pub fn add(&mut self, p : Point) {
self.update_map(p, true);
}
pub fn insert(&mut self, p : Point) {
self.update_map(p, false);
}
fn update_map(&mut self, p : Point, keep_monotonicity: bool) {
while p.delta as usize >= self.capacity*self.bucket_size {
self.double_buckets();
}
let bi = self.bucket_index_of(p.delta);
let mut b = &mut (self.buckets[bi]);
let mut c = b.cursor();
let mut found = false;
c.prev(); // Go to end
let mut cost;
loop {
cost = c.peek_prev();
match cost {
None => break, //no cost found
Some(point) =>
{
if point.delta == p.delta { // same cost found
point.cost = p.cost;
found = true;
c.prev(); // go back by one so that next() is the new point
break;
}
else if point.delta < p.delta { // position found
c.insert(p);
self.count += 1;
found = true;
break;
}
}
};
c.prev();
}
if !found { // either list is empty or p should be in first position
c.reset();
c.insert(p);
self.count += 1;
}
// Ensuring monotonicity: removing all non increasing elements in the same bucket
// Monotonicity can be broken only by a continuous sequence of elements starting from the newly
// inserted element. So, we start by checking the current bucket.
// If the new element remains as last in the current bucket, we move to the following ones.
if keep_monotonicity {
match c.peek_next() {
None => { panic! (); },
Some (el) => { assert! (el.delta == p.delta && el.cost == p.cost); }
}
c.next(); // cursor is not at the following element
let mut sequence_interrupted = false;
let mut cbi = bi;
loop {
loop {
match c.peek_next() {
None => { break; },
Some (el) => {
if el.cost <= p.cost {
c.remove();
}
else {
sequence_interrupted = true;
break;
}
}
}
}
// Move to the next bucket if the non increasing sequence didn't end
if sequence_interrupted { break; }
cbi += 1;
if cbi >= self.capacity { break; }
b = &mut (self.buckets[cbi]);
c = b.cursor();
}
}
}
pub fn get(&self, delta: Duration) -> Cost {
let max = (self.capacity * self.bucket_size) as u64;
if max <= delta { return 0; }
let mut bi = self.bucket_index_of(delta); // start with biggest bucket index that could contain the cost
loop {
let b = &self.buckets[bi];
for el in b.iter().rev() {
if el.delta <= delta { return el.cost; } // found
}
if bi == 0 { return 0; } // not found
bi -= 1;
}
}
pub fn update_cost(&mut self, delta: Duration, new_cost: Cost) -> Result<(), ()> {
let max = (self.capacity * self.bucket_size) as u64;
if max <= delta { return Err(()); }
let mut bi = self.bucket_index_of(delta); // start with biggest bucket index that could contain the cost
loop {
let b = &mut self.buckets[bi];
for el in b.iter_mut().rev() {
if el.delta == delta { // found
el.cost = new_cost;
return Ok(());
}
}
if bi == 0 { return Err(()); } // not found
bi -= 1;
}
}
pub fn bucket_index_of(&self, cost : Duration) -> usize {
return (cost as usize / self.bucket_size) as usize;
}
// Used when a new element cannot fit
fn double_buckets(&mut self) {
self.bucket_size *=2;
for i in (0..self.capacity/2).step_by(1) {
let mut l = LinkedList::new();
l.append(&mut self.buckets[i*2]);
if i*2 < self.capacity { l.append(&mut self.buckets[i*2+1]); }
self.buckets[i].append(&mut l);
}
}
pub fn new(capacity: usize) -> Self {
let mut map = SparseMap {
capacity : capacity,
buckets : Vec::<LinkedList<Point>>::with_capacity(capacity),
bucket_size : 1,
count : 0,
};
for i in 0..capacity {
map.buckets.push(<LinkedList<Point>>::new());
}
map
}
}
impl<'a> IntoIterator for &'a SparseMap {
type Item = Point;
type IntoIter = SparseMapIterator<'a>;
fn into_iter(self) -> Self::IntoIter {
SparseMapIterator {
map: self,
bucket_idx: 0,
list_iter: None,
}
}
}
pub struct SparseMapIterator<'a> {
map: &'a SparseMap,
bucket_idx: usize,
list_iter: Option<linked_list::Iter<'a, Point>>,
}
impl<'a> Iterator for SparseMapIterator<'a> {
type Item = Point;
fn next(&mut self) -> Option<Point> {
let mut try_next_bucket = false;
while self.bucket_idx < self.map.capacity {
let curr_list = &self.map.buckets[self.bucket_idx];
if self.list_iter.is_none() || try_next_bucket {
self.list_iter = Some(curr_list.iter());
}
if let Some(point) = self.list_iter.as_mut().unwrap().next() {
return Some(*point);
} else {
// Try in the next bucket
self.bucket_idx += 1;
try_next_bucket = true;
}
}
None
}
}
// https://doc.rust-lang.org/nomicon/send-and-sync.html
// Problem: linked_list::LinkedList<T> contains raw pointers, and as such Rust does not derive Send automatically for it. The linked list in the standard library does not allow insertion in the middle of the list, so we had to use this alternative implementaion.
unsafe impl Send for SparseMap {}