1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
use self::Blocker::*;
/// 同步 channels/ports
///
/// 此通道实现与 (oneshot/stream/share) 旁边的异步实现有很大不同。这是同步有界缓冲区通道的实现。
///
/// 每个通道都创建有一定数量的后备缓冲区,并且发送将 *阻塞*,直到缓冲区空间可用为止。
/// 缓冲区大小为 0 是有效的,这意味着每个成功的发送都与一个成功的 recv 配对。
///
/// 这种通道风格为通道定义了一种新的 `send_opt` 方法,该方法用于发送消息,但是如果无法传递该消息,则该线程不会 panic。
///
/// 另一个主要区别是,如果无法发送 send(),它将 *总是* 返回数据。这是因为确定性地知道何时接收数据以及何时不接收数据。
///
/// 在实现方面,可以全部用 "use a mutex plus some logic" 进行总结。此处使用的互斥锁是 OS 固有的互斥锁,这意味着互斥锁内部没有运行任何用户代码 (以防止上下文切换)。此实现几乎共享了同步通道的已缓冲和未缓冲情况的所有代码。
///
/// 对于无缓冲的情况,有一些分支,但它们大多与阻止发送者有关。
///
///
///
///
///
///
///
///
///
///
pub use self::Failure::*;

use core::intrinsics::abort;
use core::mem;
use core::ptr;

use crate::sync::atomic::{AtomicUsize, Ordering};
use crate::sync::mpsc::blocking::{self, SignalToken, WaitToken};
use crate::sync::{Mutex, MutexGuard};
use crate::time::Instant;

const MAX_REFCOUNT: usize = (isize::MAX) as usize;

pub struct Packet<T> {
    /// 互斥锁之外的唯一字段。
    /// 刚刚完成,但是主要是因为其他共享通道已经实现了代码
    channels: AtomicUsize,

    lock: Mutex<State<T>>,
}

unsafe impl<T: Send> Send for Packet<T> {}

unsafe impl<T: Send> Sync for Packet<T> {}

struct State<T> {
    disconnected: bool, // 通道是否断开连接?
    queue: Queue,       // 等待发送数据的发送者队列
    blocker: Blocker,   // 当前在此通道上被阻塞的线程
    buf: Buffer<T>,     // 缓冲消息的存储
    cap: usize,         // 该通道的容量

    /// 一个奇怪的标志,用于指示发送者是阻止还是成功阻止。
    /// 这用于将信息传输回线程,因为它没有被接收,因此它必须从缓冲区中将其消息出队。
    ///
    /// 这仅在 0 缓冲区的情况下相关。
    /// 这显然不能安全地构造,但是可以保证始终具有有效的指针值。
    ///
    canceled: Option<&'static mut bool>,
}

unsafe impl<T: Send> Send for State<T> {}

/// 可能在此通道上被阻塞的线程的味道。
enum Blocker {
    BlockedSender(SignalToken),
    BlockedReceiver(SignalToken),
    NoneBlocked,
}

/// 将线程连接在一起的简单队列。
/// 节点是栈分配的,因此该结构体根本不安全
struct Queue {
    head: *mut Node,
    tail: *mut Node,
}

struct Node {
    token: Option<SignalToken>,
    next: *mut Node,
}

unsafe impl Send for Node {}

/// 一个简单的环形缓冲区
struct Buffer<T> {
    buf: Vec<Option<T>>,
    start: usize,
    size: usize,
}

#[derive(Debug)]
pub enum Failure {
    Empty,
    Disconnected,
}

/// 以原子方式阻止当前线程,将其放入 `slot`,同时解锁 `lock`。
/// 返回时,重新锁定互斥锁。
fn wait<'a, 'b, T>(
    lock: &'a Mutex<State<T>>,
    mut guard: MutexGuard<'b, State<T>>,
    f: fn(SignalToken) -> Blocker,
) -> MutexGuard<'a, State<T>> {
    let (wait_token, signal_token) = blocking::tokens();
    match mem::replace(&mut guard.blocker, f(signal_token)) {
        NoneBlocked => {}
        _ => unreachable!(),
    }
    drop(guard); // unlock
    wait_token.wait(); // block
    lock.lock().unwrap() // relock
}

/// 与等待相同,但最多等到 `deadline`。
fn wait_timeout_receiver<'a, 'b, T>(
    lock: &'a Mutex<State<T>>,
    deadline: Instant,
    mut guard: MutexGuard<'b, State<T>>,
    success: &mut bool,
) -> MutexGuard<'a, State<T>> {
    let (wait_token, signal_token) = blocking::tokens();
    match mem::replace(&mut guard.blocker, BlockedReceiver(signal_token)) {
        NoneBlocked => {}
        _ => unreachable!(),
    }
    drop(guard); // unlock
    *success = wait_token.wait_max_until(deadline); // block
    let mut new_guard = lock.lock().unwrap(); // relock
    if !*success {
        abort_selection(&mut new_guard);
    }
    new_guard
}

fn abort_selection<T>(guard: &mut MutexGuard<'_, State<T>>) -> bool {
    match mem::replace(&mut guard.blocker, NoneBlocked) {
        NoneBlocked => true,
        BlockedSender(token) => {
            guard.blocker = BlockedSender(token);
            true
        }
        BlockedReceiver(token) => {
            drop(token);
            false
        }
    }
}

/// 唤醒线程,在正确的时间丢弃锁
fn wakeup<T>(token: SignalToken, guard: MutexGuard<'_, State<T>>) {
    // 我们需要小心地唤醒互斥锁的等待线程 *外部*,以防其引起上下文切换。
    //
    drop(guard);
    token.signal();
}

impl<T> Packet<T> {
    pub fn new(capacity: usize) -> Packet<T> {
        Packet {
            channels: AtomicUsize::new(1),
            lock: Mutex::new(State {
                disconnected: false,
                blocker: NoneBlocked,
                cap: capacity,
                canceled: None,
                queue: Queue { head: ptr::null_mut(), tail: ptr::null_mut() },
                buf: Buffer {
                    buf: (0..capacity + if capacity == 0 { 1 } else { 0 }).map(|_| None).collect(),
                    start: 0,
                    size: 0,
                },
            }),
        }
    }

    // 等待发送槽可用,将锁定的访问返回到通道状态。
    //
    fn acquire_send_slot(&self) -> MutexGuard<'_, State<T>> {
        let mut node = Node { token: None, next: ptr::null_mut() };
        loop {
            let mut guard = self.lock.lock().unwrap();
            // 我们准备好出发了吗?
            if guard.disconnected || guard.buf.size() < guard.buf.capacity() {
                return guard;
            }
            // 没地儿; 实际阻止
            let wait_token = guard.queue.enqueue(&mut node);
            drop(guard);
            wait_token.wait();
        }
    }

    pub fn send(&self, t: T) -> Result<(), T> {
        let mut guard = self.acquire_send_slot();
        if guard.disconnected {
            return Err(t);
        }
        guard.buf.enqueue(t);

        match mem::replace(&mut guard.blocker, NoneBlocked) {
            // 如果我们的容量为 0,那么我们需要等待接收者可用以获取我们的数据。
            // 等待之后,我们再次检查以确保在此期间端口没有消失。
            // 如果确实如此,我们需要交出我们的数据。
            //
            NoneBlocked if guard.cap == 0 => {
                let mut canceled = false;
                assert!(guard.canceled.is_none());
                guard.canceled = Some(unsafe { mem::transmute(&mut canceled) });
                let mut guard = wait(&self.lock, guard, BlockedSender);
                if canceled { Err(guard.buf.dequeue()) } else { Ok(()) }
            }

            // 成功,我们缓冲了一些数据
            NoneBlocked => Ok(()),

            // 成功,有人将要接收我们的缓冲数据。
            BlockedReceiver(token) => {
                wakeup(token, guard);
                Ok(())
            }

            BlockedSender(..) => panic!("lolwut"),
        }
    }

    pub fn try_send(&self, t: T) -> Result<(), super::TrySendError<T>> {
        let mut guard = self.lock.lock().unwrap();
        if guard.disconnected {
            Err(super::TrySendError::Disconnected(t))
        } else if guard.buf.size() == guard.buf.capacity() {
            Err(super::TrySendError::Full(t))
        } else if guard.cap == 0 {
            // 容量为 0 时,即使我们有缓冲区空间,我们也无法传输数据,除非有接收者在等待。
            //
            match mem::replace(&mut guard.blocker, NoneBlocked) {
                NoneBlocked => Err(super::TrySendError::Full(t)),
                BlockedSender(..) => unreachable!(),
                BlockedReceiver(token) => {
                    guard.buf.enqueue(t);
                    wakeup(token, guard);
                    Ok(())
                }
            }
        } else {
            // 如果缓冲区有一些空间并且容量不为 0,则我们只是将数据放入队列以供以后检索,以确保在有阻塞的接收者的情况下将其唤醒。
            //
            //
            assert!(guard.buf.size() < guard.buf.capacity());
            guard.buf.enqueue(t);
            match mem::replace(&mut guard.blocker, NoneBlocked) {
                BlockedReceiver(token) => wakeup(token, guard),
                NoneBlocked => {}
                BlockedSender(..) => unreachable!(),
            }
            Ok(())
        }
    }

    // 接收来自此通道的消息
    //
    // 阅读本文时,请记住,一次只能有一个接收者。
    //
    pub fn recv(&self, deadline: Option<Instant>) -> Result<T, Failure> {
        let mut guard = self.lock.lock().unwrap();

        let mut woke_up_after_waiting = false;
        // 等待缓冲区中有东西。
        // 不需要 while 循环,因为我们是唯一的接收者。
        if !guard.disconnected && guard.buf.size() == 0 {
            if let Some(deadline) = deadline {
                guard =
                    wait_timeout_receiver(&self.lock, deadline, guard, &mut woke_up_after_waiting);
            } else {
                guard = wait(&self.lock, guard, BlockedReceiver);
                woke_up_after_waiting = true;
            }
        }

        // 注意,通道在等待时可能会断开连接,因此这些条件的顺序很重要。
        //
        if guard.disconnected && guard.buf.size() == 0 {
            return Err(Disconnected);
        }

        // 收集数据,唤醒我们的邻居,然后继续
        assert!(guard.buf.size() > 0 || (deadline.is_some() && !woke_up_after_waiting));

        if guard.buf.size() == 0 {
            return Err(Empty);
        }

        let ret = guard.buf.dequeue();
        self.wakeup_senders(woke_up_after_waiting, guard);
        Ok(ret)
    }

    pub fn try_recv(&self) -> Result<T, Failure> {
        let mut guard = self.lock.lock().unwrap();

        // 简单的案例优先
        if guard.disconnected && guard.buf.size() == 0 {
            return Err(Disconnected);
        }
        if guard.buf.size() == 0 {
            return Err(Empty);
        }

        // 一定要叫醒邻居
        let ret = Ok(guard.buf.dequeue());
        self.wakeup_senders(false, guard);
        ret
    }

    // 收到一些数据后唤醒未决的发送者
    //
    // * `waited` - 如果接收者被阻止接收某些数据,或者在出路时刚刚拾取了一些数据,则进行标记
    //
    // * `guard` - 保留在此通道的锁上的锁卫
    fn wakeup_senders(&self, waited: bool, mut guard: MutexGuard<'_, State<T>>) {
        let pending_sender1: Option<SignalToken> = guard.queue.dequeue();

        // 如果这是一个无缓冲区的通道 (cap == 0),则如果我们不等待,则需要对发送方进行 ACK。
        // 如果我们等待,则唤醒我们的发送者已经是 ACK。
        //
        let pending_sender2 = if guard.cap == 0 && !waited {
            match mem::replace(&mut guard.blocker, NoneBlocked) {
                NoneBlocked => None,
                BlockedReceiver(..) => unreachable!(),
                BlockedSender(token) => {
                    guard.canceled.take();
                    Some(token)
                }
            }
        } else {
            None
        };
        mem::drop(guard);

        // 只有在锁之外,我们才能唤醒挂起的线程
        if let Some(token) = pending_sender1 {
            token.signal();
        }
        if let Some(token) = pending_sender2 {
            token.signal();
        }
    }

    // 为通道克隆准备此共享数据包,实际上只是增加引用计数。
    //
    pub fn clone_chan(&self) {
        let old_count = self.channels.fetch_add(1, Ordering::SeqCst);

        // 请参见 Arc::clone () 上的注释,了解为什么要这样做 (对于 `mem::forget`)。
        if old_count > MAX_REFCOUNT {
            abort();
        }
    }

    pub fn drop_chan(&self) {
        // 如果我们是最后一个通道,则仅将通道标记为已断开连接
        match self.channels.fetch_sub(1, Ordering::SeqCst) {
            1 => {}
            _ => return,
        }

        // 除了可以唤醒接收者以外,别无他法
        let mut guard = self.lock.lock().unwrap();
        if guard.disconnected {
            return;
        }
        guard.disconnected = true;
        match mem::replace(&mut guard.blocker, NoneBlocked) {
            NoneBlocked => {}
            BlockedSender(..) => unreachable!(),
            BlockedReceiver(token) => wakeup(token, guard),
        }
    }

    pub fn drop_port(&self) {
        let mut guard = self.lock.lock().unwrap();

        if guard.disconnected {
            return;
        }
        guard.disconnected = true;

        // 如果容量为 0,则在我们断开连接后,发送方可能希望返回其数据。
        // 否则,现在我们有责任销毁缓冲的数据。
        // 与该代码的许多其他部分一样,需要小心地销毁锁外部的数据,以防止死锁。
        //
        //
        let _data = if guard.cap != 0 { mem::take(&mut guard.buf.buf) } else { Vec::new() };
        let mut queue =
            mem::replace(&mut guard.queue, Queue { head: ptr::null_mut(), tail: ptr::null_mut() });

        let waiter = match mem::replace(&mut guard.blocker, NoneBlocked) {
            NoneBlocked => None,
            BlockedSender(token) => {
                *guard.canceled.take().unwrap() = true;
                Some(token)
            }
            BlockedReceiver(..) => unreachable!(),
        };
        mem::drop(guard);

        while let Some(token) = queue.dequeue() {
            token.signal();
        }
        if let Some(token) = waiter {
            token.signal();
        }
    }
}

impl<T> Drop for Packet<T> {
    fn drop(&mut self) {
        assert_eq!(self.channels.load(Ordering::SeqCst), 0);
        let mut guard = self.lock.lock().unwrap();
        assert!(guard.queue.dequeue().is_none());
        assert!(guard.canceled.is_none());
    }
}

////////////////////////////////////////////////////////////////////////////////
// 缓冲区,由 Vec<T> 支持的简单环形缓冲区
////////////////////////////////////////////////////////////////////////////////

impl<T> Buffer<T> {
    fn enqueue(&mut self, t: T) {
        let pos = (self.start + self.size) % self.buf.len();
        self.size += 1;
        let prev = mem::replace(&mut self.buf[pos], Some(t));
        assert!(prev.is_none());
    }

    fn dequeue(&mut self) -> T {
        let start = self.start;
        self.size -= 1;
        self.start = (self.start + 1) % self.buf.len();
        let result = &mut self.buf[start];
        result.take().unwrap()
    }

    fn size(&self) -> usize {
        self.size
    }
    fn capacity(&self) -> usize {
        self.buf.len()
    }
}

////////////////////////////////////////////////////////////////////////////////
// 队列,一个简单的队列,用于使线程排队 (栈分配的节点)
////////////////////////////////////////////////////////////////////////////////

impl Queue {
    fn enqueue(&mut self, node: &mut Node) -> WaitToken {
        let (wait_token, signal_token) = blocking::tokens();
        node.token = Some(signal_token);
        node.next = ptr::null_mut();

        if self.tail.is_null() {
            self.head = node as *mut Node;
            self.tail = node as *mut Node;
        } else {
            unsafe {
                (*self.tail).next = node as *mut Node;
                self.tail = node as *mut Node;
            }
        }

        wait_token
    }

    fn dequeue(&mut self) -> Option<SignalToken> {
        if self.head.is_null() {
            return None;
        }
        let node = self.head;
        self.head = unsafe { (*node).next };
        if self.head.is_null() {
            self.tail = ptr::null_mut();
        }
        unsafe {
            (*node).next = ptr::null_mut();
            Some((*node).token.take().unwrap())
        }
    }
}