event_loop.rs 6.59 KB
Newer Older
Marco Maida's avatar
Marco Maida committed
1
use std::collections::HashMap;
2
use std::sync::mpsc::*;
Marco Maida's avatar
Marco Maida committed
3

4
use crate::sync::*;
Marco Maida's avatar
Marco Maida committed
5
use crate::util::helpers::*;
Marco Maida's avatar
Marco Maida committed
6
use crate::util::trace_logger;
7
8
use crate::event_generation::evg;
use crate::event_processing::invocation_cycle::*;
9
use crate::util::{export};
10
11
use crate::real_time::{
    arrival_curve::ArrivalCurve,
12
13
    arrival_sequence::ArrivalSequence,
    arrival::* };
14

15
pub fn trace(evg: &mut dyn evg::EventsGenerator, target_pids: &Vec<Pid>, tx_evp: Sender<EventUpdate>) {
16
    let mut trace_acycles: HashMap<Pid, InvocationCycle> = HashMap::new();
17

18
    // Setting up the curves and data the EVP will build
Marco Perronet's avatar
Marco Perronet committed
19
    let mut trace_logger_run = trace_logger::TraceLogger::new(export::generate_run_name(target_pids));
Marco Maida's avatar
Marco Maida committed
20
    let mut trace_loggers: HashMap<Pid, trace_logger::TraceLogger> = HashMap::new();
21
    let mut arrival_curves: HashMap<Pid, ArrivalCurve> = HashMap::new();
22
    let mut arrival_seqs_updates: HashMap<Pid, ArrivalSequence> = HashMap::new();
Marco Perronet's avatar
Marco Perronet committed
23
    let mut arrival_seqs_whole: HashMap<Pid, ArrivalSequence> = HashMap::new(); // Never trim, used for the replay
24
25
    let mut new_arrivals_cnt = 0;
    let mut curr_update_id = 0;
26
    let mut last_update_time = 0;
27
    let mut first_event_time = 0;
Marco Maida's avatar
Marco Maida committed
28

29
    // Hot loop in which events from EVG are consumed
30
    while let Some(event) = evg.next_event() {
31
        // Update the trace loggers
Marco Perronet's avatar
Marco Perronet committed
32
        if crate::params::logger_replay() || crate::params::logger_event_seq() {
Marco Maida's avatar
Marco Maida committed
33
34
            let logger = trace_loggers.entry(event.pid).or_insert(trace_logger::TraceLogger::new(event.pid.to_string()));
            logger.add_event(event);
Marco Perronet's avatar
Marco Perronet committed
35
            trace_logger_run.add_event(event);
Marco Maida's avatar
Marco Maida committed
36
37
        }

38
        // Check consistency of automaton, update the invocation cycle of the found PID, update curves
Marco Perronet's avatar
Marco Perronet committed
39
        let acycle = trace_acycles.entry(event.pid).or_insert(InvocationCycle::new(event.pid, crate::params::ic_heuristic(), crate::params::ic_timeout()));
40
        if let Some(arrival) = acycle.update_activation_cycle(event) {
41

42
            let arrivals_of_pid = arrival_seqs_updates.entry(event.pid).or_insert(ArrivalSequence::new(event.pid, true, crate::params::window_size()));
43
44
            arrivals_of_pid.add_arrival(arrival);
            new_arrivals_cnt += 1;
45

Marco Perronet's avatar
Marco Perronet committed
46
            if crate::params::extract_arr_curve() {
47
                let arrival_curve_of_pid = arrival_curves.entry(event.pid).or_insert(ArrivalCurve::new(event.pid, crate::params::window_size()));
48
49
                arrival_curve_of_pid.add_arrival(arrival);
            }
Marco Perronet's avatar
Marco Perronet committed
50
            if crate::params::logger_replay() {
51
                let arrivals_of_pid_whole = arrival_seqs_whole.entry(event.pid).or_insert(ArrivalSequence::new(event.pid, true, crate::params::window_size()));
52
53
                arrivals_of_pid_whole.add_arrival(arrival);
            }
Marco Maida's avatar
Marco Maida committed
54
        }
55

Marco Perronet's avatar
Marco Perronet committed
56
57
        // Send update
        // TODO should we check new_arrivals_cnt for each pid?
58
        if last_update_time == 0 { 
59
60
            last_update_time = event.instant;
            first_event_time = event.instant;
61
62
63
64
65
        }
        let last_update_elapsed = ns_to_s(event.instant - last_update_time);

        if (last_update_elapsed > crate::params::update_interval() || 
        new_arrivals_cnt >= crate::params::update_threshold()) && 
66
        new_arrivals_cnt > 0 && !arrival_seqs_updates.is_empty() {
67

68
            let mut update = EventUpdate::new(curr_update_id);
69
            update.arrival_seqs = arrival_seqs_updates.clone();
Marco Perronet's avatar
Marco Perronet committed
70
            
Marco Perronet's avatar
Marco Perronet committed
71
            if crate::params::print_update_size() {
72
                eprintln!("{} seconds elapsed since last update, sending update:", last_update_elapsed);
Marco Perronet's avatar
Marco Perronet committed
73
74
75
76
                for (k, v) in update.arrival_seqs.iter() {
                    eprintln!("{} new arrivals for pid {}", v.arrivals.len(), k);
                }
            }
77

78
79
80
            match tx_evp.send(update) {
                // Matching thread has stopped, this thread should stop as well
                Err(_) => { eprintln!("TRACE: matching thread stopped"); break; },
81
82
                Ok(_) => {},
            }
83
84

            // Clear all arrivals after sending the update
85
            clear_all_arrivals(&mut arrival_seqs_updates);
86
            new_arrivals_cnt = 0;
87
            curr_update_id += 1;
88
            last_update_time = event.instant;
89
        }
Marco Maida's avatar
Marco Maida committed
90
91
    }

92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
    /* Trace might have been shorter than update_interval */
    if first_event_time == last_update_time && 
        new_arrivals_cnt > 0 && !arrival_seqs_updates.is_empty() {
        
        let mut update = EventUpdate::new(curr_update_id);
        update.arrival_seqs = arrival_seqs_updates.clone();
        if crate::params::print_update_size() {
            eprintln!("Sending update:");
            for (k, v) in update.arrival_seqs.iter() {
                eprintln!("{} new arrivals for pid {}", v.arrivals.len(), k);
            }
        }
        match tx_evp.send(update) {
            // Matching thread has stopped, this thread should stop as well
            Err(_) => { eprintln!("TRACE: matching thread stopped"); },
            Ok(_) => {},
        }
    }

111
112
113
114
115
116
117
118
    let mut stop_update = EventUpdate::default();
    stop_update.tracing_stopped = true;
    match tx_evp.send(stop_update) {
        // Matching thread has stopped, this thread should stop as well
        Err(_) => { eprintln!("TRACE: matching thread stopped"); },
        Ok(_) => {},
    }

119
    // Output the results
Marco Perronet's avatar
Marco Perronet committed
120
    let dir_name: String = export::generate_run_name(&target_pids);
Marco Maida's avatar
Marco Maida committed
121
    for pid in target_pids {
122
123
        let arrivals_of_pid_whole = arrival_seqs_whole.entry(*pid).or_insert(ArrivalSequence::new(*pid, true, crate::params::window_size()));
        let arrival_curve_of_pid = arrival_curves.entry(*pid).or_insert(ArrivalCurve::new(*pid, crate::params::window_size()));
Marco Maida's avatar
Marco Maida committed
124
        let trace_logger_of_pid = trace_loggers.entry(*pid).or_insert(trace_logger::TraceLogger::new(pid.to_string()));
125

126
        // Export curves for plotting
127
        if crate::params::replay_file().is_none() {
Marco Perronet's avatar
Marco Perronet committed
128
            export::export_arrival_curve(&dir_name, &arrival_curve_of_pid, *pid);
Marco Perronet's avatar
Marco Perronet committed
129
130
            export::export_arrivals(&arrivals_of_pid_whole, *pid);
            export::export_interarrivals(&arrivals_of_pid_whole, *pid);
Marco Perronet's avatar
Marco Perronet committed
131
        }
Marco Maida's avatar
Marco Maida committed
132

133
        // Save trace for replay
Marco Perronet's avatar
Marco Perronet committed
134
135
136
        if crate::params::logger_replay() {
            trace_logger_of_pid.save_replay("replay".to_string());
            trace_logger_run.save_replay("replay_run".to_string());
Marco Perronet's avatar
Marco Perronet committed
137
        }
138
139

        // Print events
Marco Perronet's avatar
Marco Perronet committed
140
        if crate::params::logger_event_seq() {
Marco Perronet's avatar
Marco Perronet committed
141
142
143
            trace_logger_of_pid.print_event_sequence();
            trace_logger_of_pid.print_event_type_sequence();
        }
Marco Maida's avatar
Marco Maida committed
144
    }
Marco Maida's avatar
Marco Maida committed
145
}
146

147
148
fn clear_all_arrivals(arrival_seqs_updates: &mut HashMap<Pid, ArrivalSequence>) {
    for arr_seq in arrival_seqs_updates.values_mut() {
149
150
151
        arr_seq.clear_arrivals();
    }
}