1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
use crate::os::windows::prelude::*;

use crate::ffi::OsStr;
use crate::io::{self, IoSlice, IoSliceMut};
use crate::mem;
use crate::path::Path;
use crate::ptr;
use crate::slice;
use crate::sync::atomic::AtomicUsize;
use crate::sync::atomic::Ordering::SeqCst;
use crate::sys::c;
use crate::sys::fs::{File, OpenOptions};
use crate::sys::handle::Handle;
use crate::sys::hashmap_random_keys;

////////////////////////////////////////////////////////////////////////////////
// 匿名管道
////////////////////////////////////////////////////////////////////////////////

pub struct AnonPipe {
    inner: Handle,
}

pub struct Pipes {
    pub ours: AnonPipe,
    pub theirs: AnonPipe,
}

/// 尽管这看起来与 Unix 模块中的 `anon_pipe` 相似,但实际上有细微的差别。
/// 在这里,我们将在 `Pipes` 返回值中返回两个管道,但是一个管道用于 "us",而另一个管道用于 "someone else"。
///
/// 当前,此函数的唯一用例是标准库中进程的标准输入输出管道,因此 "ours" 将保留在我们的进程中,而 "theirs" 将被继承给子进程。
///
///
/// ours/theirs 管道 *不是* 专门可读或可写的。每个仅支持读或写,但这取决于给定的布尔标志。
/// 如果 `ours_readable` 是 `true`,那么 `ours` 是可读的,`theirs` 是可写的。
/// 相反,如果 `ours_readable` 是 `false`,则 `ours` 是可写的,并且 `theirs` 是可读的。
///
/// 另请注意,`ours` 管道始终是在重叠模式下打开的句柄。
/// 这意味着从技术上讲,它只能用于 `OVERLAPPED` 实例,但是一次只能使用一次也可以 (我们确实保证)。
///
///
///
///
///
///
///
pub fn anon_pipe(ours_readable: bool, their_handle_inheritable: bool) -> io::Result<Pipes> {
    // 请注意,我们这里特别 *不* 使用 `CreatePipe`,因为不幸的是,返回的匿名管道不支持重叠的操作。
    //
    // 而是,我们创建一个 "hopefully unique" 名称并创建一个已启用重叠操作的命名管道。
    //
    // 完成此操作后,我们将照常通过 `CreateFileW` 进行连接,然后返回这些 reader/writer 一半。
    // 请注意,`ours` 管道返回值始终是命名管道,而 `theirs` 只是普通文件。
    // 希望这可以使我们免受子进程的影响,这些子进程假设其 stdout 是命名管道,这确实是奇怪的!
    //
    //
    //
    unsafe {
        let ours;
        let mut name;
        let mut tries = 0;
        let mut reject_remote_clients_flag = c::PIPE_REJECT_REMOTE_CLIENTS;
        loop {
            tries += 1;
            name = format!(
                r"\\.\pipe\__rust_anonymous_pipe1__.{}.{}",
                c::GetCurrentProcessId(),
                random_number()
            );
            let wide_name = OsStr::new(&name).encode_wide().chain(Some(0)).collect::<Vec<_>>();
            let mut flags = c::FILE_FLAG_FIRST_PIPE_INSTANCE | c::FILE_FLAG_OVERLAPPED;
            if ours_readable {
                flags |= c::PIPE_ACCESS_INBOUND;
            } else {
                flags |= c::PIPE_ACCESS_OUTBOUND;
            }

            let handle = c::CreateNamedPipeW(
                wide_name.as_ptr(),
                flags,
                c::PIPE_TYPE_BYTE
                    | c::PIPE_READMODE_BYTE
                    | c::PIPE_WAIT
                    | reject_remote_clients_flag,
                1,
                4096,
                4096,
                0,
                ptr::null_mut(),
            );

            // 我们通过了上面的 `FILE_FLAG_FIRST_PIPE_INSTANCE` 标志,并且我们也在尽力选择唯一的名称。
            // 如果返回 `ERROR_ACCESS_DENIED`,则可能意味着我们意外地与已经存在的管道发生冲突,因此请重试。
            //
            // 尽管不要再尝试太多,因为这也可能是合法错误。
            // 如果返回 `ERROR_INVALID_PARAMETER`,则可能意味着我们正在不支持 `PIPE_REJECT_REMOTE_CLIENTS` 的 Vista 之前版本上运行,因此我们将继续尝试不使用 `PIPE_REJECT_REMOTE_CLIENTS`。
            // 这意味着允许通过远程计算机连接到此管道,从而降低了 Vista 之前的 Windows 版本的安全性。
            // 正确的修复会增加 FFI 导入的数量,并引入大量 Windows XP 专用代码,而没有干净的测试策略。有关更多信息, see https://github.com/rust-lang/rust/pull/37677.
            //
            //
            //
            //
            //
            //
            //
            //
            //
            //
            if handle == c::INVALID_HANDLE_VALUE {
                let err = io::Error::last_os_error();
                let raw_os_err = err.raw_os_error();
                if tries < 10 {
                    if raw_os_err == Some(c::ERROR_ACCESS_DENIED as i32) {
                        continue;
                    } else if reject_remote_clients_flag != 0
                        && raw_os_err == Some(c::ERROR_INVALID_PARAMETER as i32)
                    {
                        reject_remote_clients_flag = 0;
                        tries -= 1;
                        continue;
                    }
                }
                return Err(err);
            }
            ours = Handle::new(handle);
            break;
        }

        // 连接到我们刚刚创建的命名管道。
        // 该句柄将在 `theirs` 中返回,因此,如果 `ours` 是可读的,我们希望它是可写的,否则,如果 `ours` 是可写的,我们希望它是可读的。
        //
        //
        // 另外,我们不对此启用重叠模式,因为大多数客户端进程均未启用重叠模式。
        //
        //
        let mut opts = OpenOptions::new();
        opts.write(ours_readable);
        opts.read(!ours_readable);
        opts.share_mode(0);
        let size = mem::size_of::<c::SECURITY_ATTRIBUTES>();
        let mut sa = c::SECURITY_ATTRIBUTES {
            nLength: size as c::DWORD,
            lpSecurityDescriptor: ptr::null_mut(),
            bInheritHandle: their_handle_inheritable as i32,
        };
        opts.security_attributes(&mut sa);
        let theirs = File::open(Path::new(&name), &opts)?;
        let theirs = AnonPipe { inner: theirs.into_handle() };

        Ok(Pipes {
            ours: AnonPipe { inner: ours },
            theirs: AnonPipe { inner: theirs.into_handle() },
        })
    }
}

fn random_number() -> usize {
    static N: AtomicUsize = AtomicUsize::new(0);
    loop {
        if N.load(SeqCst) != 0 {
            return N.fetch_add(1, SeqCst);
        }

        N.store(hashmap_random_keys().0 as usize, SeqCst);
    }
}

impl AnonPipe {
    pub fn handle(&self) -> &Handle {
        &self.inner
    }
    pub fn into_handle(self) -> Handle {
        self.inner
    }

    pub fn read(&self, buf: &mut [u8]) -> io::Result<usize> {
        self.inner.read(buf)
    }

    pub fn read_vectored(&self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
        self.inner.read_vectored(bufs)
    }

    #[inline]
    pub fn is_read_vectored(&self) -> bool {
        self.inner.is_read_vectored()
    }

    pub fn write(&self, buf: &[u8]) -> io::Result<usize> {
        self.inner.write(buf)
    }

    pub fn write_vectored(&self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
        self.inner.write_vectored(bufs)
    }

    #[inline]
    pub fn is_write_vectored(&self) -> bool {
        self.inner.is_write_vectored()
    }
}

pub fn read2(p1: AnonPipe, v1: &mut Vec<u8>, p2: AnonPipe, v2: &mut Vec<u8>) -> io::Result<()> {
    let p1 = p1.into_handle();
    let p2 = p2.into_handle();

    let mut p1 = AsyncPipe::new(p1, v1)?;
    let mut p2 = AsyncPipe::new(p2, v2)?;
    let objs = [p1.event.raw(), p2.event.raw()];

    // 在一个循环中,我们等待任何一个管道的预定读取操作完成。
    // 如果操作以 0 个字节完成,则表示已达到 EOF,在这种情况下,我们将完全结束另一个管道。
    //
    // 请注意,重叠的 I/O 通常是超级不安全的,因为我们必须注意确保所有正在播放的指针在 I/O 操作的整个持续时间内都是有效的 (其中大量操作也可能会失败)。
    //
    // `AsyncPipe` 的析构函数最终将完成大部分工作。
    //
    //
    loop {
        let res = unsafe { c::WaitForMultipleObjects(2, objs.as_ptr(), c::FALSE, c::INFINITE) };
        if res == c::WAIT_OBJECT_0 {
            if !p1.result()? || !p1.schedule_read()? {
                return p2.finish();
            }
        } else if res == c::WAIT_OBJECT_0 + 1 {
            if !p2.result()? || !p2.schedule_read()? {
                return p1.finish();
            }
        } else {
            return Err(io::Error::last_os_error());
        }
    }
}

struct AsyncPipe<'a> {
    pipe: Handle,
    event: Handle,
    overlapped: Box<c::OVERLAPPED>, // 需要一个稳定的地址
    dst: &'a mut Vec<u8>,
    state: State,
}

#[derive(PartialEq, Debug)]
enum State {
    NotReading,
    Reading,
    Read(usize),
}

impl<'a> AsyncPipe<'a> {
    fn new(pipe: Handle, dst: &'a mut Vec<u8>) -> io::Result<AsyncPipe<'a>> {
        // 创建一个用于协调重叠操作的事件,该事件将在 WaitForMultipleObjects 中使用,并作为 OVERLAPPED 句柄的一部分传递。
        //
        // 请注意,我们在这里通过将事件标记为手动重置并将其初始设置为信号状态来做一些聪明的事情。
        // 这意味着对于最初创建的管道,我们自然会经历上面的 WaitForMultipleObjects 调用,并且一旦成功调度了 I/O 操作 (我们想要的),偶数返回 "unset" 的唯一时间。
        //
        //
        //
        //
        //
        //
        let event = Handle::new_event(true, true)?;
        let mut overlapped: Box<c::OVERLAPPED> = unsafe { Box::new(mem::zeroed()) };
        overlapped.hEvent = event.raw();
        Ok(AsyncPipe { pipe, overlapped, event, dst, state: State::NotReading })
    }

    /// 执行重叠的读取操作。
    ///
    /// 当前不能正在读取,并返回管道当前是否处于 EOF。
    /// 如果管道不在 EOF 处,则必须调用 `result()` 以完成以后的读取 (可能会阻塞),但是,如果管道在 EOF 处,则不应调用 `result()`,因为它将永远阻塞。
    ///
    ///
    fn schedule_read(&mut self) -> io::Result<bool> {
        assert_eq!(self.state, State::NotReading);
        let amt = unsafe {
            let slice = slice_to_end(self.dst);
            self.pipe.read_overlapped(slice, &mut *self.overlapped)?
        };

        // 如果此读取立即完成,则我们的重叠事件将继续发出信号 (信号已传入此处),然后我们将继续进行下面的方法。
        //
        //
        // 否则,将调度 I/O 操作,并且系统会将事件设置为不发出信号,因此我们将自己标记为读取状态并继续前进。
        //
        //
        //
        self.state = match amt {
            Some(0) => return Ok(false),
            Some(amt) => State::Read(amt),
            None => State::Reading,
        };
        Ok(true)
    }

    /// 等待先前执行的重叠操作的结果。
    ///
    /// 接受参数 `wait`,该参数指示当前是否正在读取此管道,函数是否应阻塞以等待读取完成。
    ///
    ///
    /// 返回值:
    ///
    /// * `true` - 完成所有未完成的读取,并且管道未处于 EOF 位置 (继续运行)
    /// * `false` - 完成所有未完成的读取,并且管道位于 EOF (停止发出读取)
    ///
    ///
    fn result(&mut self) -> io::Result<bool> {
        let amt = match self.state {
            State::NotReading => return Ok(true),
            State::Reading => self.pipe.overlapped_result(&mut *self.overlapped, true)?,
            State::Read(amt) => amt,
        };
        self.state = State::NotReading;
        unsafe {
            let len = self.dst.len();
            self.dst.set_len(len + amt);
        }
        Ok(amt != 0)
    }

    /// 完成全部读取此管道。
    ///
    /// 等待任何未决和计划读取,然后在必要时调用 `read_to_end` 以读取所有剩余信息。
    ///
    fn finish(&mut self) -> io::Result<()> {
        while self.result()? && self.schedule_read()? {
            // ...
        }
        Ok(())
    }
}

impl<'a> Drop for AsyncPipe<'a> {
    fn drop(&mut self) {
        match self.state {
            State::Reading => {}
            _ => return,
        }

        // 如果我们有一个待处理的读取操作,那么在实际丢弃此类型之前,必须确保它已完成。
        // 内核要求 `OVERLAPPED` 和缓冲区指针对于整个 I/O 操作均有效。
        //
        // 为此,我们调用 `CancelIo` 取消任何挂起的操作,如果操作成功,我们将等待重叠的结果。
        //
        // 如果这里的任何事情失败了,我们实际上无能为力,因此我们泄漏了 buffer/OVERLAPPED 指针以确保我们至少是内存安全的。
        //
        //
        //
        //
        if self.pipe.cancel_io().is_err() || self.result().is_err() {
            let buf = mem::take(self.dst);
            let overlapped = Box::new(unsafe { mem::zeroed() });
            let overlapped = mem::replace(&mut self.overlapped, overlapped);
            mem::forget((buf, overlapped));
        }
    }
}

unsafe fn slice_to_end(v: &mut Vec<u8>) -> &mut [u8] {
    if v.capacity() == 0 {
        v.reserve(16);
    }
    if v.capacity() == v.len() {
        v.reserve(1);
    }
    slice::from_raw_parts_mut(v.as_mut_ptr().add(v.len()), v.capacity() - v.len())
}