1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
/// 流通道
///
/// 这就是通道的风格,它针对一位发送者和一位接收者进行了优化。
/// 如果通道被克隆,则发送方将升级到共享通道。
///
/// 可以在父模块的注释中找到高级实现细节。
///
///
pub use self::Failure::*;
use self::Message::*;
pub use self::UpgradeResult::*;

use core::cmp;

use crate::cell::UnsafeCell;
use crate::ptr;
use crate::thread;
use crate::time::Instant;

use crate::sync::atomic::{AtomicBool, AtomicIsize, AtomicUsize, Ordering};
use crate::sync::mpsc::blocking::{self, SignalToken};
use crate::sync::mpsc::spsc_queue as spsc;
use crate::sync::mpsc::Receiver;

const DISCONNECTED: isize = isize::MIN;
#[cfg(test)]
const MAX_STEALS: isize = 5;
#[cfg(not(test))]
const MAX_STEALS: isize = 1 << 20;

pub struct Packet<T> {
    // 所有邮件的内部队列
    queue: spsc::Queue<Message<T>, ProducerAddition, ConsumerAddition>,
}

struct ProducerAddition {
    cnt: AtomicIsize,     // 该通道上有多少项
    to_wake: AtomicUsize, // SignalToken 使被阻塞的线程唤醒

    port_dropped: AtomicBool, // 如果通道已被销毁,则进行标记。
}

struct ConsumerAddition {
    steals: UnsafeCell<isize>, // 一个端口收到多少次无阻塞?
}

pub enum Failure<T> {
    Empty,
    Disconnected,
    Upgraded(Receiver<T>),
}

pub enum UpgradeResult {
    UpSuccess,
    UpDisconnected,
    UpWoke(SignalToken),
}

// 任何消息都可以包含到新共享端口的 "upgrade request",因此内部队列是 T 队列,而 Message<T>
//
enum Message<T> {
    Data(T),
    GoUp(Receiver<T>),
}

impl<T> Packet<T> {
    pub fn new() -> Packet<T> {
        Packet {
            queue: unsafe {
                spsc::Queue::with_additions(
                    128,
                    ProducerAddition {
                        cnt: AtomicIsize::new(0),
                        to_wake: AtomicUsize::new(0),

                        port_dropped: AtomicBool::new(false),
                    },
                    ConsumerAddition { steals: UnsafeCell::new(0) },
                )
            },
        }
    }

    pub fn send(&self, t: T) -> Result<(), T> {
        // 如果另一个端口确定性地消失了,那么绝对必须将数据返回到栈中。
        // 否则,数据被视为已发送。
        //
        if self.queue.producer_addition().port_dropped.load(Ordering::SeqCst) {
            return Err(t);
        }

        match self.do_send(Data(t)) {
            UpSuccess | UpDisconnected => {}
            UpWoke(token) => {
                token.signal();
            }
        }
        Ok(())
    }

    pub fn upgrade(&self, up: Receiver<T>) -> UpgradeResult {
        // 如果端口已消失,则无需继续进行任何操作。
        //
        if self.queue.producer_addition().port_dropped.load(Ordering::SeqCst) {
            return UpDisconnected;
        }

        self.do_send(GoUp(up))
    }

    fn do_send(&self, t: Message<T>) -> UpgradeResult {
        self.queue.push(t);
        match self.queue.producer_addition().cnt.fetch_add(1, Ordering::SeqCst) {
            // 如 mod 的文档注释中所述,-1 == 唤醒
            -1 => UpWoke(self.take_to_wake()),
            // 如前所述,SPSC 队列必须 >= -2
            -2 => UpSuccess,

            // 确保保留断开连接状态,在这种情况下,返回值将取决于是否接收到我们的数据。
            //
            // 这表明我们是否有空队列。
            //
            // 首先,需要 drain 此处的队列,因为端口将永远不会删除此数据。
            // drain 最多只能有一个项 (端口将其余部分排出)。
            //
            DISCONNECTED => {
                self.queue.producer_addition().cnt.store(DISCONNECTED, Ordering::SeqCst);
                let first = self.queue.pop();
                let second = self.queue.pop();
                assert!(second.is_none());

                match first {
                    Some(..) => UpSuccess,  // 我们无法发送数据
                    None => UpDisconnected, // 我们成功发送了数据
                }
            }

            // 否则,我们只是在非等待队列中发送了一些数据,因此只需确保世界保持理智并继续前进即可!
            //
            n => {
                assert!(n >= 0);
                UpSuccess
            }
        }
    }

    // 占用 'to_wake' 字段的所有权。
    fn take_to_wake(&self) -> SignalToken {
        let ptr = self.queue.producer_addition().to_wake.load(Ordering::SeqCst);
        self.queue.producer_addition().to_wake.store(0, Ordering::SeqCst);
        assert!(ptr != 0);
        unsafe { SignalToken::cast_from_usize(ptr) }
    }

    // 减少睡眠者的通道上的计数,如果睡眠者不应该睡眠,则将其返回。
    // 请注意,这是我们考虑盗窃的位置。
    //
    fn decrement(&self, token: SignalToken) -> Result<(), SignalToken> {
        assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), 0);
        let ptr = unsafe { token.cast_to_usize() };
        self.queue.producer_addition().to_wake.store(ptr, Ordering::SeqCst);

        let steals = unsafe { ptr::replace(self.queue.consumer_addition().steals.get(), 0) };

        match self.queue.producer_addition().cnt.fetch_sub(1 + steals, Ordering::SeqCst) {
            DISCONNECTED => {
                self.queue.producer_addition().cnt.store(DISCONNECTED, Ordering::SeqCst);
            }
            // 如果我们将盗窃因素考虑在内,并且注意到该通道没有数据,那么我们就可以成功入睡
            //
            n => {
                assert!(n >= 0);
                if n - steals <= 0 {
                    return Ok(());
                }
            }
        }

        self.queue.producer_addition().to_wake.store(0, Ordering::SeqCst);
        Err(unsafe { SignalToken::cast_from_usize(ptr) })
    }

    pub fn recv(&self, deadline: Option<Instant>) -> Result<T, Failure<T>> {
        // 乐观的飞行前检查 (计划很昂贵)。
        match self.try_recv() {
            Err(Empty) => {}
            data => return data,
        }

        // 抱歉,我们的通道没有数据。
        // 取消调度当前线程并启动阻止协议。
        let (wait_token, signal_token) = blocking::tokens();
        if self.decrement(signal_token).is_ok() {
            if let Some(deadline) = deadline {
                let timed_out = !wait_token.wait_max_until(deadline);
                if timed_out {
                    self.abort_selection(/* was_upgrade = */ false).map_err(Upgraded)?;
                }
            } else {
                wait_token.wait();
            }
        }

        match self.try_recv() {
            // 实际上从队列中弹出的消息不应算作窃取,因此请在此处抵消减少的费用 (我们已经将 "steal" 计入了上面的通道计数中)。
            //
            //
            data @ (Ok(..) | Err(Upgraded(..))) => unsafe {
                *self.queue.consumer_addition().steals.get() -= 1;
                data
            },

            data => data,
        }
    }

    pub fn try_recv(&self) -> Result<T, Failure<T>> {
        match self.queue.pop() {
            // 如果我们窃取了一些数据,请记录下来 (稍后将其计入 cnt)。
            //
            // 请注意,我们不允许窃取行为无止境地增长,以防止窃取行为或 cnt 的最终溢出,因为溢出会带来灾难性的结果。
            // 有时,窃取 > cnt,但是其他时候 > 窃取 > cnt,因此我们不知道窃取和 cnt 之间的关系。
            // 此代码路径很少执行,因此我们执行了相当慢的操作,将 0 交换到 cnt,尽可能多地窃取 (不转为负数),然后再加上我们无法算作窃取的内容。
            //
            //
            //
            //
            //
            //
            Some(data) => unsafe {
                if *self.queue.consumer_addition().steals.get() > MAX_STEALS {
                    match self.queue.producer_addition().cnt.swap(0, Ordering::SeqCst) {
                        DISCONNECTED => {
                            self.queue
                                .producer_addition()
                                .cnt
                                .store(DISCONNECTED, Ordering::SeqCst);
                        }
                        n => {
                            let m = cmp::min(n, *self.queue.consumer_addition().steals.get());
                            *self.queue.consumer_addition().steals.get() -= m;
                            self.bump(n - m);
                        }
                    }
                    assert!(*self.queue.consumer_addition().steals.get() >= 0);
                }
                *self.queue.consumer_addition().steals.get() += 1;
                match data {
                    Data(t) => Ok(t),
                    GoUp(up) => Err(Upgraded(up)),
                }
            },

            None => {
                match self.queue.producer_addition().cnt.load(Ordering::SeqCst) {
                    n if n != DISCONNECTED => Err(Empty),

                    // 这是一个棘手的情况。我们未能弹出上面的数据,然后我们查看了通道已断开连接。
                    // 在此窗口中,可能已经在通道上发送了更多数据。
                    // 当通道上确实有数据时,返回断开的通道并没有多大意义,因此请再弹出一次以确保没有数据。
                    //
                    //
                    // 我们可以忽略窃取,因为另一端断开了连接,我们再也不需要真正考虑我们的窃取了。
                    //
                    //
                    //
                    //
                    //
                    _ => match self.queue.pop() {
                        Some(Data(t)) => Ok(t),
                        Some(GoUp(up)) => Err(Upgraded(up)),
                        None => Err(Disconnected),
                    },
                }
            }
        }
    }

    pub fn drop_chan(&self) {
        // 丢弃通道非常简单,我们只是将其标记为已断开连接,然后在存在阻塞器时唤醒它。
        //
        match self.queue.producer_addition().cnt.swap(DISCONNECTED, Ordering::SeqCst) {
            -1 => {
                self.take_to_wake().signal();
            }
            DISCONNECTED => {}
            n => {
                assert!(n >= 0);
            }
        }
    }

    pub fn drop_port(&self) {
        // 丢弃端口似乎是一件微不足道的事情。从理论上讲,我们需要做的只是标记我们断开连接,然后其他所有事情都可以接管 (我们没有任何人可以唤醒)。
        //
        // Ports 的要点是我们要丢弃队列的全部内容。
        // 拥有此属性有多个原因,其中最大的原因是,如果另一个通道在此通道中等待 (但尚未收到),则在该端口上等待将导致死锁。
        //
        //
        // 因此,如果我们接受必须立即销毁队列的全部内容的代码,那么这段代码可能会更有意义。
        // 棘手的是,我们不能让任何正在进行的发送都被丢弃,我们必须确保所有内容都已丢弃,并且没有新内容进入通道。
        //
        //
        //
        //
        //
        //

        // 我们要做的第一件事是设置一个标志,说我们已经完成了。
        // 所有发送都在此标志上进行门控,因此我们立即保证有一定数量的活动发送必须处理。
        //
        //
        self.queue.producer_addition().port_dropped.store(true, Ordering::SeqCst);

        // 现在确保可以处理一定数量的发送者,我们需要将队列 drain 处理。draining 过程相对于通道的 "count" 是原子发生的。
        // 如果计数为非零 (考虑到窃取),则通道上必须有数据。
        // 在这种情况下,我们一切都 drain,然后重试。
        // 当主动发送方在丢弃数据的同时发送数据时,我们将继续失败,但最终我们肯定会打破这个循环 (因为发送方的数量是有限的)。
        //
        //
        //
        //
        let mut steals = unsafe { *self.queue.consumer_addition().steals.get() };
        while {
            match self.queue.producer_addition().cnt.compare_exchange(
                steals,
                DISCONNECTED,
                Ordering::SeqCst,
                Ordering::SeqCst,
            ) {
                Ok(_) => false,
                Err(old) => old != DISCONNECTED,
            }
        } {
            while self.queue.pop().is_some() {
                steals += 1;
            }
        }

        // 此时,我们已禁止所有 future 发送者发送,并且已将通道标记为已断开连接。
        // 但是,发送者仍然要承担一些责任,因为有些发送可能要等到我们标记断开连接后才能完成。
        //
        // 发送方法中有更多详细信息,请参见已断开连接
        //
    }

    ////////////////////////////////////////////////////////////////////////////
    // 选择实现
    ////////////////////////////////////////////////////////////////////////////

    // 增加通道上的计数 (用于选择)
    fn bump(&self, amt: isize) -> isize {
        match self.queue.producer_addition().cnt.fetch_add(amt, Ordering::SeqCst) {
            DISCONNECTED => {
                self.queue.producer_addition().cnt.store(DISCONNECTED, Ordering::SeqCst);
                DISCONNECTED
            }
            n => n,
        }
    }

    // 删除先前的线程,使其不再被阻塞在该端口中
    pub fn abort_selection(&self, was_upgrade: bool) -> Result<bool, Receiver<T>> {
        // 如果我们是从 oneshot 升级后终止选择,那么我们保证没有人在等待。
        // 我们可以看到升级的唯一方法是,是否实际上又在通道上再次发送了数据。
        // 对我们来说,这意味着该通道上肯定有数据。
        // 此外,我们保证以前没有 start_selection,因此根本不需要修改 `self.cnt`。
        //
        //
        // 因此,由于这些不变性,我们立即返回 `Ok(true)`。
        // 请注意,数据可能尚未真正在通道上发送。
        // 另一端可能已经标记了升级,但没有向该端发送数据。
        // 这很好,因为我们知道在实际发送数据之前,它的时间是有限的 windows。
        //
        //
        //
        if was_upgrade {
            assert_eq!(unsafe { *self.queue.consumer_addition().steals.get() }, 0);
            assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), 0);
            return Ok(true);
        }

        // 我们要确保通道上的计数为非负数,并且在流情况下,我们最多只能进行一次盗用,因此只需假设我们进行了一次盗用即可。
        //
        //
        let steals = 1;
        let prev = self.bump(steals + 1);

        // 如果我们以前已经断开连接,那么我们可以肯定地知道 to_wake 中没有线程,因此只要继续
        //
        let has_data = if prev == DISCONNECTED {
            assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), 0);
            true // 有数据,就是我们断开了连接
        } else {
            let cur = prev + steals + 1;
            assert!(cur >= 0);

            // 如果先前的计数为负,那么我们只是使结果为正,因此我们通过了 -1 边界,我们有责任删除 to_wake() 字段并将其丢弃。
            //
            // 如果以前的数字是正数,那么我们将处在更加艰难的境地。
            // 一个可能的竞赛是发送方只是通过 -1 递增 (这意味着它将尝试唤醒线程),但尚未读取 to_wake。
            // 为了防止 future recv() 太早唤醒 (此发送者在 to_wake 上拾取了抹灰),我们在此处旋转循环以等待 to_wake 为 0。
            // 请注意,整个 select() 实现都需要大修,这并不是最坏的部分,因此,这并不是最终解决方案,而是出于使当前工作正常的需要。
            //
            //
            //
            //
            //
            //
            //
            //
            if prev < 0 {
                drop(self.take_to_wake());
            } else {
                while self.queue.producer_addition().to_wake.load(Ordering::SeqCst) != 0 {
                    thread::yield_now();
                }
            }
            unsafe {
                assert_eq!(*self.queue.consumer_addition().steals.get(), 0);
                *self.queue.consumer_addition().steals.get() = steals;
            }

            // 如果我们以前是积极的,那么肯定会有数据要接收
            //
            prev >= 0
        };

        // 现在,我们确定该队列为 "has data",我们将窥视该队列以查看数据是否为升级。
        //
        // 如果是升级,则需要销毁该端口并在升级的端口上终止选择。
        //
        if has_data {
            match self.queue.peek() {
                Some(&mut GoUp(..)) => match self.queue.pop() {
                    Some(GoUp(port)) => Err(port),
                    _ => unreachable!(),
                },
                _ => Ok(true),
            }
        } else {
            Ok(false)
        }
    }
}

impl<T> Drop for Packet<T> {
    fn drop(&mut self) {
        // 请注意,此负载不仅是断开连接正确性的断言,而且还是读取 `to_wake` 之前的适当围栏,因此无法在删除 `to_wake` 断言的同时删除此断言。
        //
        //
        //
        assert_eq!(self.queue.producer_addition().cnt.load(Ordering::SeqCst), DISCONNECTED);
        assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), 0);
    }
}