1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
//! 几乎无锁的多生产者,单个消费者队列。
//!
//! 该模块包含一个并发 MPSC 队列的实现。
//! 此队列可用于在线程之间共享数据,也可用作 rust 中通道的构造块。
//!
//! 请注意,此队列的当前实现具有 `pop` 方法的警告,有关该信息的更多信息,请参见该方法。
//!
//! 由于此警告,此队列可能不适用于所有用例。
//!

// https://www.1024cores.net/home/lock-free-algorithms
//                          /queues/non-intrusive-mpsc-node-based-queue

#[cfg(all(test, not(target_os = "emscripten")))]
mod tests;

pub use self::PopResult::*;

use core::cell::UnsafeCell;
use core::ptr;

use crate::boxed::Box;
use crate::sync::atomic::{AtomicPtr, Ordering};

/// `pop` 函数的结果。
pub enum PopResult<T> {
    /// 一些数据已经弹出
    Data(T),
    /// 队列为空
    Empty,
    /// 队列处于不一致状态。
    /// 弹出数据应该会成功,但是为了使弹出操作成功,某些推动程序尚未取得足够的进展。
    /// 建议查看 pop(),以查看发送者是否已取得进展
    ///
    Inconsistent,
}

struct Node<T> {
    next: AtomicPtr<Node<T>>,
    value: Option<T>,
}

/// 多生产者单一消费者的结构体。
/// 这是不可克隆的,但是可以确保安全地共享它,只要保证一次仅存在一个弹出器即可 (允许很多推动器)。
///
pub struct Queue<T> {
    head: AtomicPtr<Node<T>>,
    tail: UnsafeCell<*mut Node<T>>,
}

unsafe impl<T: Send> Send for Queue<T> {}
unsafe impl<T: Send> Sync for Queue<T> {}

impl<T> Node<T> {
    unsafe fn new(v: Option<T>) -> *mut Node<T> {
        Box::into_raw(box Node { next: AtomicPtr::new(ptr::null_mut()), value: v })
    }
}

impl<T> Queue<T> {
    /// 创建一个新的队列,可以在多个生产者和一个消费者之间共享。
    ///
    pub fn new() -> Queue<T> {
        let stub = unsafe { Node::new(None) };
        Queue { head: AtomicPtr::new(stub), tail: UnsafeCell::new(stub) }
    }

    /// 将新值推送到此队列。
    pub fn push(&self, t: T) {
        unsafe {
            let n = Node::new(Some(t));
            let prev = self.head.swap(n, Ordering::AcqRel);
            (*prev).next.store(n, Ordering::Release);
        }
    }

    /// 从此队列中弹出一些数据。
    ///
    /// 请注意,当前实现意味着此函数无法返回 `Option<T>`。
    /// 此队列可能处于不一致状态,在该状态下,许多推送已成功完成并完全完成,但是 pops 无法返回 `Some(t)`。
    ///
    /// 当推送器在不合时宜的时候被抢占时,就会发生这种不一致的状态。
    ///
    /// 这种不一致的状态意味着该队列确实有数据,但是目前无法访问该队列。
    ///
    ///
    pub fn pop(&self) -> PopResult<T> {
        unsafe {
            let tail = *self.tail.get();
            let next = (*tail).next.load(Ordering::Acquire);

            if !next.is_null() {
                *self.tail.get() = next;
                assert!((*tail).value.is_none());
                assert!((*next).value.is_some());
                let ret = (*next).value.take().unwrap();
                let _: Box<Node<T>> = Box::from_raw(tail);
                return Data(ret);
            }

            if self.head.load(Ordering::Acquire) == tail { Empty } else { Inconsistent }
        }
    }
}

impl<T> Drop for Queue<T> {
    fn drop(&mut self) {
        unsafe {
            let mut cur = *self.tail.get();
            while !cur.is_null() {
                let next = (*cur).next.load(Ordering::Relaxed);
                let _: Box<Node<T>> = Box::from_raw(cur);
                cur = next;
            }
        }
    }
}