1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
// Windows 的线程停放器实现。
//
// 如果可用 (Windows 8+),它将使用 WaitOnAddress 和 WakeByAddressSingle。
// 这个现代的 API 与 Linux 线程停放者使用的 futex 系统调用完全相同。当这些 API 可用时,此线程驻留程序的实现与 Linux 线程驻留程序完全匹配。
//
// 但是,当现代 API 不可用时,此实现将回退到 NT 键控事件,它们相似,但有一些重要区别。从 Windows XP 开始可用。
//
// WaitOnAddress 首先检查线程停放状态,以确保在更新停放状态和调用函数之间不会丢失 WakeByAddressSingle 调用。
//
// NtWaitForKeyedEvent 没有此选项,并且无条件阻止而无需先检查 Parker 状态。相反,NtReleaseKeyedEvent (与 WakeByAddressSingle 不同)*阻塞*,直到它唤醒 NtWaitForKeyedEvent 等待它的线程。
//
// 这样,我们可以确定没有丢失任何事件,但是如果 park_timeout() 是由超时而不是 unpark() 引起的,则我们必须注意不要阻塞 unpark()。
//
// 与 WaitOnAddress 不同,NtWaitForKeyedEvent/NtReleaseKeyedEvent 在 HANDLE 上操作 (使用 NtCreateKeyedEvent 创建)。
// 这意味着我们可以确保成功唤醒的 park() 被 unpark() 唤醒,而不是其他代码中的 NtReleaseKeyedEvent 调用,因为这些事件不仅与键 (派克 (state)) 的地址,而且还与该 HANDLE 匹配) 匹配。
// 我们在第一次需要时就懒惰地分配了这个句柄。
//
// 快速路径 (在已经调用 unpark() 之后调用 park()) 和可能的状态对于这两种实现都是相同的。
// 此处用于确保快速路径甚至不检查要使用的 API,而是可以立即返回,而与所使用的 API 无关。
// 只有慢速路径 (实际上将使 block/wake 成为一个线程) 会检查哪个 API 可用并具有不同的实现。
//
// 不幸的是,NT 键控事件是一个未记录的 Windows API。However:
// - 该 API 相对简单,具有明显的行为,并且有几篇 (unofficial) 文章记录了这些细节。[1]
// - `parking_lot` 已经使用此 API 多年 (在 Windows 8 之前的 Windows 版本上)。
// [2] 许多大型项目广泛使用 parking_lot,例如伺服和 Rust 编译器本身。
// - 它是 Windows SRW 锁和 Windows 关键部分使用的基础 API。[3] [4]
// - Wine,ReactOs 和 Windows XP 的实现的源代码可用,并且符合预期的行为。
// - 未记录 API 的主要风险是 future 中可能会更改。但是,由于我们仅将其用于 Windows 的较早版本,所以这不是问题。
// - 即使这些函数没有像我们期望的那样阻塞或唤醒 (这不太可能,请参见前面的所有要点),该实现仍将是内存安全的。NT 键控事件 API 仅在正确的位置用于 sleep/block。
//
// [1]: http://www.locklessinc.com/articles/keyed_events/
// [2]: https://github.com/Amanieu/parking_lot/commit/43abbc964e
// [3]: https://docs.microsoft.com/en-us/archive/msdn-magazine/2012/november/windows-with-c-the-evolution-of-synchronization-in-windows-and-c
// [4]: Windows 内部构件,第 1 部分,ISBN 9780735671300
//
//
//
//
//
//
//
//
//
//
//
//
//
//
//
//
//
//
//
//
//
//
//
//

use crate::convert::TryFrom;
use crate::ptr;
use crate::sync::atomic::{
    AtomicI8, AtomicUsize,
    Ordering::{Acquire, Relaxed, Release},
};
use crate::sys::{c, dur2timeout};
use crate::time::Duration;

pub struct Parker {
    state: AtomicI8,
}

const PARKED: i8 = -1;
const EMPTY: i8 = 0;
const NOTIFIED: i8 = 1;

// 有关内存顺序的注意事项:
//
// 内存排序仅与不同变量之间操作的相对排序有关。
// 仅查看单个原子变量时,甚至 Ordering::Relaxed 都可以保证单调/一致的顺序。
//
// 因此,由于该 Parker 只是单一的原子变量,因此我们只需要查看我们需要提供向外部世界的排序保证。
//
// 保留和取消保留的唯一内存排序保证是,在 unpark() 之后发生的线程上可以看到 unpark() 之前发生的事情。
// 否则,在仍然消耗 'token' 的同时,在调用 unpark() 之前将其有效地停了下来。
//
// 换句话说,unpark() 需要与 park() 消耗 token 并返回的部分进行同步。
//
// 通过在 unpark() 中写入 NOTIFIED ('token') 时使用 Ordering::Release,并在唤醒后在 park() 中读取此状态时使用 Ordering::Acquire,可以使用发布 - 获取同步来完成此操作。
//
//
//
//
//
//
//
impl Parker {
    pub fn new() -> Self {
        Self { state: AtomicI8::new(EMPTY) }
    }

    // 假定仅由拥有 Parker 的线程 (称为 `self.state != PARKED`) 调用此方法。
    //
    pub unsafe fn park(&self) {
        // 更改 NOTIFIED => EMPTY 或 EMPTY => PARKED,并在第一种情况下直接返回。
        //
        if self.state.fetch_sub(1, Acquire) == NOTIFIED {
            return;
        }

        if let Some(wait_on_address) = c::WaitOnAddress::option() {
            loop {
                // 假设它仍然设置为 PARKED,请等待发生的事情。
                wait_on_address(self.ptr(), &PARKED as *const _ as c::LPVOID, 1, c::INFINITE);
                // 更改 NOTIFIED => EMPTY,但不要将 PARKED 留在原处。
                if self.state.compare_exchange(NOTIFIED, EMPTY, Acquire, Acquire).is_ok() {
                    // 实际上是被 unpark() 唤醒的。
                    return;
                } else {
                    // 虚假的醒来。我们循环播放以重试。
                }
            }
        } else {
            // 等待 unpark() 产生此事件。
            c::NtWaitForKeyedEvent(keyed_event_handle(), self.ptr(), 0, ptr::null_mut());
            // 将状态设置回 EMPTY (从 PARKED 或 NOTIFIED)。
            // 请注意,我们不仅编写 EMPTY,而且还使用 swap() 包括获得顺序的读取,以与 unpark () 的发布顺序的写入同步。
            //
            //
            self.state.swap(EMPTY, Acquire);
        }
    }

    // 假定仅由拥有 Parker 的线程 (称为 `self.state != PARKED`) 调用此方法。
    //
    pub unsafe fn park_timeout(&self, timeout: Duration) {
        // 更改 NOTIFIED => EMPTY 或 EMPTY => PARKED,并在第一种情况下直接返回。
        //
        if self.state.fetch_sub(1, Acquire) == NOTIFIED {
            return;
        }

        if let Some(wait_on_address) = c::WaitOnAddress::option() {
            // 假设它仍然设置为 PARKED,请等待发生的事情。
            wait_on_address(self.ptr(), &PARKED as *const _ as c::LPVOID, 1, dur2timeout(timeout));
            // 将状态设置回 EMPTY (从 PARKED 或 NOTIFIED)。
            // 请注意,我们不仅编写 EMPTY,而且还使用 swap() 包括获得顺序的读取,以与 unpark () 的发布顺序的写入同步。
            //
            //
            if self.state.swap(EMPTY, Acquire) == NOTIFIED {
                // 实际上是被 unpark() 唤醒的。
            } else {
                // 超时或虚假唤醒。
                // 我们以任何一种方式返回,因为我们无法轻易分辨出是否超时。
                //
            }
        } else {
            // 需要使用 NtWaitForKeyedEvent 等待 unpark()。
            let handle = keyed_event_handle();

            // NtWaitForKeyedEvent 使用 100ns 的单元,并使用负值指示单调时钟上的相对时间。
            //
            // 此处记录了基本的 KeWaitForSingleObject 函数:
            // https://docs.microsoft.com/en-us/windows-hardware/drivers/ddi/wdm/nf-wdm-kewaitforsingleobject
            let mut timeout = match i64::try_from((timeout.as_nanos() + 99) / 100) {
                Ok(t) => -t,
                Err(_) => i64::MIN,
            };

            // 等待 unpark() 产生此事件。
            let unparked =
                c::NtWaitForKeyedEvent(handle, self.ptr(), 0, &mut timeout) == c::STATUS_SUCCESS;

            // 将状态设置回 EMPTY (从 PARKED 或 NOTIFIED)。
            let prev_state = self.state.swap(EMPTY, Acquire);

            if !unparked && prev_state == NOTIFIED {
                // 我们被超时而不是 unpark() 唤醒了,但是状态被设置为 NOTIFIED,这意味着我们 *只是* 错过了 unpark(),现在它被阻止等待它。
                //
                // 等待它消耗事件并取消阻塞该线程。
                //
                c::NtWaitForKeyedEvent(handle, self.ptr(), 0, ptr::null_mut());
            }
        }
    }

    pub fn unpark(&self) {
        // 更改 PARKED => NOTIFIED,EMPTY => NOTIFIED 或 NOTIFIED => NOTIFIED,并在第一种情况下唤醒线程。
        //
        //
        // 请注意,即使 NOTIFIED => NOTIFIED 也会导致写入。
        // 这是有目的的,以确保每个 unpark() 都具有对 park() 的发布 - 获取命令。
        //
        if self.state.swap(NOTIFIED, Release) == PARKED {
            if let Some(wake_by_address_single) = c::WakeByAddressSingle::option() {
                unsafe {
                    wake_by_address_single(self.ptr());
                }
            } else {
                // 如果我们在等待线程运行 NtWaitForKeyedEvent 之前运行 NtReleaseKeyedEvent,则此 (shortly) 会阻塞,直到我们将其唤醒。
                // 如果等待的线程在我们运行 NtReleaseKeyedEvent 之前被唤醒 (例如,由于超时),它将阻塞直到我们唤醒一个线程。
                // 为了防止该线程在这种情况下无限期阻塞,park_impl() 在唤醒后看到状态设置为 NOTIFIED 后,将再次调用 NtWaitForKeyedEvent 解除阻塞。
                //
                //
                //
                //
                unsafe {
                    c::NtReleaseKeyedEvent(keyed_event_handle(), self.ptr(), 0, ptr::null_mut());
                }
            }
        }
    }

    fn ptr(&self) -> c::LPVOID {
        &self.state as *const _ as c::LPVOID
    }
}

fn keyed_event_handle() -> c::HANDLE {
    const INVALID: usize = !0;
    static HANDLE: AtomicUsize = AtomicUsize::new(INVALID);
    match HANDLE.load(Relaxed) {
        INVALID => {
            let mut handle = c::INVALID_HANDLE_VALUE;
            unsafe {
                match c::NtCreateKeyedEvent(
                    &mut handle,
                    c::GENERIC_READ | c::GENERIC_WRITE,
                    ptr::null_mut(),
                    0,
                ) {
                    c::STATUS_SUCCESS => {}
                    r => panic!("Unable to create keyed event handle: error {}", r),
                }
            }
            match HANDLE.compare_exchange(INVALID, handle as usize, Relaxed, Relaxed) {
                Ok(_) => handle,
                Err(h) => {
                    // 在我们开始之前,这场比赛输给了另一个初始化 HANDLE 的线程。
                    // 关闭我们的句柄并使用它们的句柄。
                    unsafe {
                        c::CloseHandle(handle);
                    }
                    h as c::HANDLE
                }
            }
        }
        handle => handle as c::HANDLE,
    }
}