1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
/// Oneshot channels/ports
///
/// 这是用于通讯模块的 channels/ports 的初始风格。这是针对通道的一次使用情况的优化。这种类型的主要优化是在创建 chan/port 对时仅分配一个且仅分配一个。
///
/// 另一个可能的优化是不使用 Arc box,因为从理论上讲我们知道何时可以重新分配共享数据包 (不需要真正的原子引用计数),但我遇到了如何在一个端口丢弃的早期销毁数据的麻烦。
///
///
/// # Implementation
///
/// Oneshots 是围绕一个原子的 usize 变量实现的。该变量不仅指示 port/chan 的状态,还包含端口上阻塞的所有线程。所有的原子运算都发生在这个词上。
///
/// 为了升级 oneshot 通道,升级被视为代表事物在通道方面的断开连接 (在心理上可以认为是在占用端口)。然后,此升级也存储在共享数据包中。
/// 需要考虑的一个警告是,当端口看到断开的通道时,它必须检查数据,因为没有 "data plus upgrade" 状态。
///
///
///
///
///
///
///
///
///
///
pub use self::Failure::*;
use self::MyUpgrade::*;
pub use self::UpgradeResult::*;

use crate::cell::UnsafeCell;
use crate::ptr;
use crate::sync::atomic::{AtomicUsize, Ordering};
use crate::sync::mpsc::blocking::{self, SignalToken};
use crate::sync::mpsc::Receiver;
use crate::time::Instant;

// 您可以在各种状态下找到端口。
const EMPTY: usize = 0; // 初始状态: 无数据,无阻塞接收者
const DATA: usize = 1; // 准备好供接收者接收的数据
const DISCONNECTED: usize = 2; // 通道已断开连接或已升级
// 任何其他值都表示一个指向 SignalToken 值的指针。
// 该协议确保当状态从 *移到指针时,将 token 的所有权提供给数据包,并且在状态从* 指针移向 * 时,token 的所有权将转移给更改状态的任何人。
//
//
//

pub struct Packet<T> {
    // chan/port 对的内部状态 (也存储阻塞的线程)
    state: AtomicUsize,
    // 一键式数据插槽位置
    data: UnsafeCell<Option<T>>,
    // 第二次使用时,必须升级 oneshot 通道,其中包含用于升级的插槽
    //
    upgrade: UnsafeCell<MyUpgrade<T>>,
}

pub enum Failure<T> {
    Empty,
    Disconnected,
    Upgraded(Receiver<T>),
}

pub enum UpgradeResult {
    UpSuccess,
    UpDisconnected,
    UpWoke(SignalToken),
}

enum MyUpgrade<T> {
    NothingSent,
    SendUsed,
    GoUp(Receiver<T>),
}

impl<T> Packet<T> {
    pub fn new() -> Packet<T> {
        Packet {
            data: UnsafeCell::new(None),
            upgrade: UnsafeCell::new(NothingSent),
            state: AtomicUsize::new(EMPTY),
        }
    }

    pub fn send(&self, t: T) -> Result<(), T> {
        unsafe {
            // 完整性检查
            match *self.upgrade.get() {
                NothingSent => {}
                _ => panic!("sending on a oneshot that's already sent on "),
            }
            assert!((*self.data.get()).is_none());
            ptr::write(self.data.get(), Some(t));
            ptr::write(self.upgrade.get(), SendUsed);

            match self.state.swap(DATA, Ordering::SeqCst) {
                // 发送数据,没有人在等待
                EMPTY => Ok(()),

                // 无法发送数据,端口首先挂断。
                // 将数据返回栈。
                DISCONNECTED => {
                    self.state.swap(DISCONNECTED, Ordering::SeqCst);
                    ptr::write(self.upgrade.get(), NothingSent);
                    Err((&mut *self.data.get()).take().unwrap())
                }

                // 不可能,这些是一次性通道
                DATA => unreachable!(),

                // 另一端有一个线程在等待。
                // 我们将 'DATA' 状态留在内部,以便在另一端将其拾取。
                ptr => {
                    SignalToken::cast_from_usize(ptr).signal();
                    Ok(())
                }
            }
        }
    }

    // 只需测试此通道是否已发送,就可以从发送方安全地使用它。
    //
    pub fn sent(&self) -> bool {
        unsafe { !matches!(*self.upgrade.get(), NothingSent) }
    }

    pub fn recv(&self, deadline: Option<Instant>) -> Result<T, Failure<T>> {
        // 尝试不阻塞线程 (这有点贵)。
        // 如果看起来我们不是空的,那么请立即进行 `try_recv` 的操作。
        if self.state.load(Ordering::SeqCst) == EMPTY {
            let (wait_token, signal_token) = blocking::tokens();
            let ptr = unsafe { signal_token.cast_to_usize() };

            // 与发送者竞争进入阻止状态
            if self.state.compare_exchange(EMPTY, ptr, Ordering::SeqCst, Ordering::SeqCst).is_ok() {
                if let Some(deadline) = deadline {
                    let timed_out = !wait_token.wait_max_until(deadline);
                    // 尝试重置状态
                    if timed_out {
                        self.abort_selection().map_err(Upgraded)?;
                    }
                } else {
                    wait_token.wait();
                    debug_assert!(self.state.load(Ordering::SeqCst) != EMPTY);
                }
            } else {
                // 丢弃信号 token,因为我们从未阻止
                drop(unsafe { SignalToken::cast_from_usize(ptr) });
            }
        }

        self.try_recv()
    }

    pub fn try_recv(&self) -> Result<T, Failure<T>> {
        unsafe {
            match self.state.load(Ordering::SeqCst) {
                EMPTY => Err(Empty),

                // 我们在通道上看到了一些数据,但是可以再次使用通道来向我们发送升级。
                // 结果,我们需要重新插入没有可用数据的通道 (否则,下一次我们将只看到 DATA)。
                //
                // 这是作为 cmpxchg 完成的,因为如果状态在我们的脚下改变,我们宁愿只看到状态改变。
                //
                //
                DATA => {
                    let _ = self.state.compare_exchange(
                        DATA,
                        EMPTY,
                        Ordering::SeqCst,
                        Ordering::SeqCst,
                    );
                    match (&mut *self.data.get()).take() {
                        Some(data) => Ok(data),
                        None => unreachable!(),
                    }
                }

                // 无法保证在升级发生之前我们会收到,并且升级会将通道标记为断开连接,因此,当我们看到此消息时,我们首先需要检查是否有可用数据,然后 *然后* 进行升级。
                //
                //
                //
                DISCONNECTED => match (&mut *self.data.get()).take() {
                    Some(data) => Ok(data),
                    None => match ptr::replace(self.upgrade.get(), SendUsed) {
                        SendUsed | NothingSent => Err(Disconnected),
                        GoUp(upgrade) => Err(Upgraded(upgrade)),
                    },
                },

                // 我们是唯一的接收者; 已经没有阻塞的接收者了。
                //
                _ => unreachable!(),
            }
        }
    }

    // 返回升级是否完成。
    // 如果升级未完成,则无法将端口发送到另一半 (它将永远不会收到)。
    //
    pub fn upgrade(&self, up: Receiver<T>) -> UpgradeResult {
        unsafe {
            let prev = match *self.upgrade.get() {
                NothingSent => NothingSent,
                SendUsed => SendUsed,
                _ => panic!("upgrading again"),
            };
            ptr::write(self.upgrade.get(), GoUp(up));

            match self.state.swap(DISCONNECTED, Ordering::SeqCst) {
                // 如果通道为空或上面有数据,那么我们很好。
                // 发送者将在升级之前检查数据 (以防我们粘贴 DATA 状态)。
                //
                DATA | EMPTY => UpSuccess,

                // 如果另一端已经断开连接,则升级失败。
                // 请确保将我们给的端口丢掉。
                DISCONNECTED => {
                    ptr::replace(self.upgrade.get(), prev);
                    UpDisconnected
                }

                // 如果有人在等,我们得叫醒他们
                ptr => UpWoke(SignalToken::cast_from_usize(ptr)),
            }
        }
    }

    pub fn drop_chan(&self) {
        match self.state.swap(DISCONNECTED, Ordering::SeqCst) {
            DATA | DISCONNECTED | EMPTY => {}

            // 如果有人在等,我们得叫醒他们
            ptr => unsafe {
                SignalToken::cast_from_usize(ptr).signal();
            },
        }
    }

    pub fn drop_port(&self) {
        match self.state.swap(DISCONNECTED, Ordering::SeqCst) {
            // 空通道无关紧要,而远程断开连接的通道也无关紧要 b/c 我们将要运行 drop glue
            //
            //
            DISCONNECTED | EMPTY => {}

            // 通道上有数据,因此请确保我们立即销毁它。
            // 这就是为什么不使用圆弧会有些困难的原因 (需要 box 才能在获取数据时保持有效)。
            //
            DATA => unsafe {
                (&mut *self.data.get()).take().unwrap();
            },

            // 我们是唯一可以阻止此端口的人
            _ => unreachable!(),
        }
    }

    ////////////////////////////////////////////////////////////////////////////
    // 选择实现
    ////////////////////////////////////////////////////////////////////////////

    // 从该端口中删除先前的选择线程。
    // 这确保了被阻塞的线程将不再对任何其他线程可见。
    //
    // 返回值指示此端口上是否有数据。
    pub fn abort_selection(&self) -> Result<bool, Receiver<T>> {
        let state = match self.state.load(Ordering::SeqCst) {
            // 这些状态中的每一个都意味着在堕胎选择方面不会发生进一步的活动
            //
            s @ (EMPTY | DATA | DISCONNECTED) => s,

            // 如果我们有一个阻塞的线程,则使用原子来获得它的所有权 (可能会失败)
            //
            ptr => self
                .state
                .compare_exchange(ptr, EMPTY, Ordering::SeqCst, Ordering::SeqCst)
                .unwrap_or_else(|x| x),
        };

        // 既然我们已经拥有了国家的所有权,那么想出办法去做。
        //
        match state {
            EMPTY => unreachable!(),
            // 用于选择的线程被盗
            DATA => Ok(true),

            // 如果另一端挂断了,那么我们就拥有该端口的完全所有权。首先,检查是否有数据在等待我们。
            // 如果另一端发送了一些内容然后挂断,则可能发生这种情况。
            //
            // 然后,我们需要检查是否请求了升级,如果是,则升级后的端口需要中止选择。
            //
            //
            DISCONNECTED => unsafe {
                if (*self.data.get()).is_some() {
                    Ok(true)
                } else {
                    match ptr::replace(self.upgrade.get(), SendUsed) {
                        GoUp(port) => Err(port),
                        _ => Ok(true),
                    }
                }
            },

            // 我们从精选中唤醒了自己。
            ptr => unsafe {
                drop(SignalToken::cast_from_usize(ptr));
                Ok(false)
            },
        }
    }
}

impl<T> Drop for Packet<T> {
    fn drop(&mut self) {
        assert_eq!(self.state.load(Ordering::SeqCst), DISCONNECTED);
    }
}