1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
use crate::Task;
use async_channel::{Receiver, Sender};
use async_mutex::Mutex;
use futures_lite::future;
use once_cell::sync::OnceCell;
use std::{io, thread};

// The current number of threads (some might be shutting down and not in the pool anymore)
static GLOBAL_EXECUTOR_THREADS_NUMBER: Mutex<usize> = Mutex::new(0);
// The expected number of threads (excluding the one that are shutting down)
static GLOBAL_EXECUTOR_EXPECTED_THREADS_NUMBER: Mutex<usize> = Mutex::new(0);

thread_local! {
    // Used to shutdown a thread when we receive a message from the Sender.
    // We send an ack using to the Receiver once we're finished shutting down.
    static THREAD_SHUTDOWN: OnceCell<(Sender<()>, Receiver<()>)> = OnceCell::new();
}

/// Spawn more executor threads, up to configured max value.
///
/// Returns how many threads we spawned.
///
/// # Examples
///
/// ```
/// async_global_executor::spawn_more_threads(2);
/// ```
pub async fn spawn_more_threads(count: usize) -> io::Result<usize> {
    // Get the current configuration, or initialize the thread pool.
    let config = crate::config::GLOBAL_EXECUTOR_CONFIG
        .get()
        .unwrap_or_else(|| {
            crate::init();
            crate::config::GLOBAL_EXECUTOR_CONFIG.get().unwrap()
        });
    // How many threads do we have (including shutting down)
    let mut threads_number = GLOBAL_EXECUTOR_THREADS_NUMBER.lock().await;
    // How many threads are we supposed to have (when all shutdowns are complete)
    let mut expected_threads_number = GLOBAL_EXECUTOR_EXPECTED_THREADS_NUMBER.lock().await;
    // Ensure we don't exceed configured max threads (including shutting down)
    let count = count.min(config.max_threads - *threads_number);
    for _ in 0..count {
        thread::Builder::new()
            .name((config.thread_name_fn)())
            .spawn(thread_main_loop)?;
        *threads_number += 1;
        *expected_threads_number += 1;
    }
    Ok(count)
}

/// Stop one of the executor threads, down to configured min value
///
/// Returns whether a thread has been stopped.
///
/// # Examples
///
/// ```
/// async_global_executor::stop_thread();
/// ```
pub fn stop_thread() -> Task<bool> {
    crate::spawn(stop_current_executor_thread())
}

/// Stop the current executor thread, if we exceed the configured min value
///
/// Returns whether the thread has been stopped.
///
/// # Examples
///
/// ```
/// async_global_executor::stop_current_thread();
/// ```
pub fn stop_current_thread() -> Task<bool> {
    crate::spawn_local(stop_current_executor_thread())
}

fn thread_main_loop() {
    // This will be used to ask for shutdown.
    let (s, r) = async_channel::bounded(1);
    // This wil be used to ack once shutdown is complete.
    let (s_ack, r_ack) = async_channel::bounded(1);
    THREAD_SHUTDOWN.with(|thread_shutdown| drop(thread_shutdown.set((s, r_ack))));

    // Main loop
    loop {
        #[allow(clippy::blocks_in_if_conditions)]
        if std::panic::catch_unwind(|| {
            crate::executor::LOCAL_EXECUTOR.with(|executor| {
                let local = executor.run(async {
                    // Wait until we're asked to shutdown.
                    let _ = r.recv().await;
                });
                let global = crate::executor::GLOBAL_EXECUTOR.run(future::pending::<()>());
                crate::reactor::block_on(future::or(local, global));
            });
        })
        .is_ok()
        {
            break;
        }
    }

    wait_for_local_executor_completion();

    // Ack that we're done shutting down.
    crate::reactor::block_on(async {
        let _ = s_ack.send(()).await;
    });
}

fn wait_for_local_executor_completion() {
    loop {
        #[allow(clippy::blocks_in_if_conditions)]
        if std::panic::catch_unwind(|| {
            crate::executor::LOCAL_EXECUTOR.with(|executor| {
                crate::reactor::block_on(async {
                    // Wait for spawned tasks completion
                    while !executor.is_empty() {
                        executor.tick().await;
                    }
                });
            });
        })
        .is_ok()
        {
            break;
        }
    }
}

async fn stop_current_executor_thread() -> bool {
    // How many threads are we supposed to have (when all shutdowns are complete)
    let mut expected_threads_number = GLOBAL_EXECUTOR_EXPECTED_THREADS_NUMBER.lock().await;
    // Ensure we don't go below the configured min_threads (ignoring shutting down)
    if *expected_threads_number
        > crate::config::GLOBAL_EXECUTOR_CONFIG
            .get()
            .unwrap()
            .min_threads
    {
        let (s, r_ack) =
            THREAD_SHUTDOWN.with(|thread_shutdown| thread_shutdown.get().unwrap().clone());
        let _ = s.send(()).await;
        // We now expect to have one less thread (this one is shutting down)
        *expected_threads_number -= 1;
        // Unlock the Mutex
        drop(expected_threads_number);
        let _ = r_ack.recv().await;
        // This thread is done shutting down
        *GLOBAL_EXECUTOR_THREADS_NUMBER.lock().await -= 1;
        true
    } else {
        false
    }
}