1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
//! A restricted channel to pass data from signal handler.
//!
//! When trying to communicate data from signal handler to the outside world, one can use an atomic
//! variable (as it doesn't lock, so it can be made async-signal-safe). But this won't work for
//! larger data.
//!
//! This module provides a channel that can be used for that purpose. It is used by certain
//! [exfiltrators][crate::iterator::exfiltrator], but can be used as building block for custom
//! actions. In general, this is not a ready-made end-user API.
//!
//! # How does it work
//!
//! Each channel has a fixed number of slots and two queues (one for empty slots, one for full
//! slots). A signal handler takes a slot out of the empty one, fills it and passes it into the
//! full one. Outside of signal handler, it can take the value out of the full queue and return the
//! slot to the empty queue.
//!
//! The queues are implemented as bit-encoded indexes of the slots in the storage. The bits are
//! stored in an atomic variable.
//!
//! Note that the algorithm allows for a slot to be in neither queue (when it is being emptied or
//! filled).
//!
//! # Fallible allocation of a slot
//!
//! It is apparent that allocation of a new slot can fail (there's nothing in the empty slot). In
//! such case, there's no way to send the new value out of the handler (there's no way to safely
//! wait for a slot to appear, because the handler can be blocking the thread that is responsible
//! for emptying them). But that's considered acceptable ‒ even the kernel collates the same kinds
//! of signals together if they are not consumed by application fast enough and there are no free
//! slots exactly because some are being filled, emptied or are full ‒ in particular, the whole
//! system will yield a signal.
//!
//! This assumes that separate signals don't share the same buffer and that there's only one reader
//! (using multiple readers is still safe, but it is possible that all slots would be inside the
//! readers, but already empty, so the above argument would not hold).

// TODO: Other sizes? Does anyone need more than 5 slots?

use std::cell::UnsafeCell;
use std::sync::atomic::{AtomicU16, Ordering};

const SLOTS: usize = 5;
const BITS: u16 = 3;
const MASK: u16 = 0b111;

fn get(n: u16, idx: u16) -> u16 {
    (n >> (BITS * idx)) & MASK
}

fn set(n: u16, idx: u16, v: u16) -> u16 {
    let v = v << (BITS * idx);
    let mask = MASK << (BITS * idx);
    (n & !mask) | v
}

fn enqueue(q: &AtomicU16, val: u16) {
    let mut current = q.load(Ordering::Relaxed);
    loop {
        let empty = (0..SLOTS as u16)
            .find(|i| get(current, *i) == 0)
            .expect("No empty slot available");
        let modified = set(current, empty, val);
        match q.compare_exchange_weak(current, modified, Ordering::Release, Ordering::Relaxed) {
            Ok(_) => break,
            Err(changed) => current = changed, // And retry with the changed value
        }
    }
}

fn dequeue(q: &AtomicU16) -> Option<u16> {
    let mut current = q.load(Ordering::Relaxed);
    loop {
        let val = current & MASK;
        // It's completely empty
        if val == 0 {
            break None;
        }
        let modified = current >> BITS;
        match q.compare_exchange_weak(current, modified, Ordering::Acquire, Ordering::Relaxed) {
            Ok(_) => break Some(val),
            Err(changed) => current = changed,
        }
    }
}

/// A restricted async-signal-safe channel
///
/// This is a bit like the usual channel used for inter-thread communication, but with several
/// restrictions:
///
/// * There's a limited number of slots (currently 5).
/// * There's no way to wait for a place in it or for a value. If value is not available, `None` is
///   returned. If there's no space for a value, the value is silently dropped.
///
/// In exchange for that, all the operations on that channel are async-signal-safe. That means it
/// is possible to use it to communicate between a signal handler and the rest of the world with it
/// (specifically, it's designed to send information from the handler to the rest of the
/// application). The throwing out of values when full is in line with collating of the same type
/// in kernel (you should not use the same channel for multiple different signals).
///
/// Technically, this is a MPMC queue which preserves order, but it is expected to be used in MPSC
/// mode mostly (in theory, multiple threads can be executing a signal handler for the same signal
/// at the same time). The channel is not responsible for wakeups.
///
/// While the channel is async-signal-safe, you still need to make sure *creating* of the values is
/// too (it should not contain anything that allocates, for example ‒ so no `String`s inside, etc).
///
/// The code was *not* tuned for performance (signals are not expected to happen often).
pub struct Channel<T> {
    storage: [UnsafeCell<Option<T>>; SLOTS],
    empty: AtomicU16,
    full: AtomicU16,
}

impl<T> Channel<T> {
    /// Creates a new channel with nothing in it.
    pub fn new() -> Self {
        let storage = Default::default();
        let me = Self {
            storage,
            empty: AtomicU16::new(0),
            full: AtomicU16::new(0),
        };

        for i in 1..SLOTS + 1 {
            enqueue(&me.empty, i as u16);
        }

        me
    }

    /// Inserts a value into the channel.
    ///
    /// If the value doesn't fit, it is silently dropped. Never blocks.
    pub fn send(&self, val: T) {
        if let Some(empty_idx) = dequeue(&self.empty) {
            unsafe { *self.storage[empty_idx as usize - 1].get() = Some(val) };
            enqueue(&self.full, empty_idx);
        }
    }

    /// Takes a value from the channel.
    ///
    /// Or returns `None` if the channel is empty. Never blocks.
    pub fn recv(&self) -> Option<T> {
        dequeue(&self.full).map(|idx| {
            let result = unsafe { &mut *self.storage[idx as usize - 1].get() }
                .take()
                .expect("Full slot with nothing in it");
            enqueue(&self.empty, idx);
            result
        })
    }
}

impl<T> Default for Channel<T> {
    fn default() -> Self {
        Self::new()
    }
}

unsafe impl<T: Send> Send for Channel<T> {}

// Yes, really Send -> Sync. Having a reference to Channel allows Sending Ts, but not having refs
// on them.
unsafe impl<T: Send> Sync for Channel<T> {}

#[cfg(test)]
mod tests {
    use std::sync::Arc;
    use std::thread;

    use super::*;

    #[test]
    fn new_empty() {
        let channel = Channel::<usize>::new();
        assert!(channel.recv().is_none());
        assert!(channel.recv().is_none());
    }

    #[test]
    fn pass_value() {
        let channel = Channel::new();
        channel.send(42);
        assert_eq!(42, channel.recv().unwrap());
        assert!(channel.recv().is_none());
    }

    #[test]
    fn multiple() {
        let channel = Channel::new();
        for i in 0..1000 {
            channel.send(i);
            assert_eq!(i, channel.recv().unwrap());
            assert!(channel.recv().is_none());
        }
    }

    #[test]
    fn overflow() {
        let channel = Channel::new();
        for i in 0..10 {
            channel.send(i);
        }
        for i in 0..5 {
            assert_eq!(i, channel.recv().unwrap());
        }
        assert!(channel.recv().is_none());
    }

    #[test]
    fn multi_thread() {
        let channel = Arc::new(Channel::<usize>::new());

        let sender = thread::spawn({
            let channel = Arc::clone(&channel);
            move || {
                for i in 0..4 {
                    channel.send(i);
                }
            }
        });

        let mut results = Vec::new();
        while results.len() < 4 {
            results.extend(channel.recv());
        }

        assert_eq!(vec![0, 1, 2, 3], results);

        sender.join().unwrap();
    }
}