kernel/ipc/
pipe.rs

1//! Pipe implementation for inter-process communication
2//!
3//! This module provides unidirectional pipe implementations for data streaming between processes:
4//! - PipeEndpoint: Basic pipe endpoint with read/write capabilities
5//! - UnidirectionalPipe: Traditional unidirectional pipe (read-only or write-only)
6
7#[cfg(test)]
8use alloc::vec::Vec;
9use alloc::{collections::VecDeque, format, string::String, sync::Arc};
10use spin::Mutex;
11
12use super::{IpcError, StreamIpcOps};
13use crate::object::KernelObject;
14use crate::object::capability::selectable::{
15    ReadyInterest, ReadySet, SelectWaitOutcome, Selectable,
16};
17use crate::object::capability::{CloneOps, StreamError, StreamOps};
18use crate::sync::waker::Waker;
19
20/// Pipe-specific operations
21///
22/// This trait extends StreamIpcOps with pipe-specific functionality.
23pub trait PipeObject: StreamIpcOps + CloneOps {
24    /// Check if there are readers on the other end
25    fn has_readers(&self) -> bool;
26
27    /// Check if there are writers on the other end
28    fn has_writers(&self) -> bool;
29
30    /// Get the buffer size of the pipe
31    fn buffer_size(&self) -> usize;
32
33    /// Get the number of bytes currently in the pipe buffer
34    fn available_bytes(&self) -> usize;
35
36    /// Check if this end of the pipe is readable
37    fn is_readable(&self) -> bool;
38
39    /// Check if this end of the pipe is writable
40    fn is_writable(&self) -> bool;
41
42    /// Optional capability: expose select/pselect readiness/wait interface.
43    ///
44    /// By default, pipes are not exposed via this hook unless the implementation
45    /// also implements `Selectable` and overrides this to return `Some(self)`.
46    fn as_selectable(&self) -> Option<&dyn Selectable> {
47        None
48    }
49}
50
51/// Represents errors specific to pipe operations
52#[derive(Debug, Clone)]
53pub enum PipeError {
54    /// The pipe is broken (no readers or writers)
55    BrokenPipe,
56    /// The pipe buffer is full
57    BufferFull,
58    /// The pipe buffer is empty
59    BufferEmpty,
60    /// Invalid pipe state
61    InvalidState,
62    /// General IPC error
63    IpcError(IpcError),
64}
65
66impl From<IpcError> for PipeError {
67    fn from(ipc_err: IpcError) -> Self {
68        PipeError::IpcError(ipc_err)
69    }
70}
71
72impl From<StreamError> for PipeError {
73    fn from(stream_err: StreamError) -> Self {
74        PipeError::IpcError(IpcError::StreamError(stream_err))
75    }
76}
77
78/// Internal shared state of a pipe (data only, no wakers)
79struct PipeState {
80    /// Ring buffer for pipe data
81    buffer: VecDeque<u8>,
82    /// Maximum buffer size
83    max_size: usize,
84    /// Number of active readers
85    reader_count: usize,
86    /// Number of active writers
87    writer_count: usize,
88    /// Whether the pipe has been closed
89    closed: bool,
90}
91
92/// Shared pipe data including both state and wakers
93/// Wakers are kept outside the Mutex to avoid deadlock when calling wait()
94struct SharedPipeData {
95    /// Main pipe state (protected by mutex)
96    state: Mutex<PipeState>,
97    /// Waker for tasks waiting to read (outside mutex to avoid deadlock)
98    read_waker: Waker,
99    /// Waker for tasks waiting to write (outside mutex to avoid deadlock)
100    write_waker: Waker,
101}
102
103impl SharedPipeData {
104    fn new(buffer_size: usize) -> Arc<Self> {
105        Arc::new(Self {
106            state: Mutex::new(PipeState {
107                buffer: VecDeque::with_capacity(buffer_size),
108                max_size: buffer_size,
109                reader_count: 0,
110                writer_count: 0,
111                closed: false,
112            }),
113            read_waker: Waker::new_interruptible("pipe_read"),
114            write_waker: Waker::new_interruptible("pipe_write"),
115        })
116    }
117}
118
119/// A generic pipe endpoint
120///
121/// This represents the basic building block for all pipe types.
122/// It can be configured for read-only, write-only, or bidirectional access.
123pub struct PipeEndpoint {
124    /// Shared pipe data (state + wakers)
125    data: Arc<SharedPipeData>,
126    /// Whether this endpoint can read
127    can_read: bool,
128    /// Whether this endpoint can write
129    can_write: bool,
130    /// Unique identifier for debugging
131    id: String,
132    /// Per-endpoint non-blocking flag (O_NONBLOCK semantics)
133    nonblocking: core::sync::atomic::AtomicBool,
134}
135
136impl PipeEndpoint {
137    /// Create a new pipe endpoint with specified capabilities
138    fn new(data: Arc<SharedPipeData>, can_read: bool, can_write: bool, id: String) -> Self {
139        // Register this endpoint in the state
140        {
141            let mut pipe_state = data.state.lock();
142            if can_read {
143                pipe_state.reader_count += 1;
144            }
145            if can_write {
146                pipe_state.writer_count += 1;
147            }
148        }
149
150        Self {
151            data,
152            can_read,
153            can_write,
154            id,
155            nonblocking: core::sync::atomic::AtomicBool::new(false),
156        }
157    }
158}
159
160impl StreamOps for PipeEndpoint {
161    fn read(&self, buffer: &mut [u8]) -> Result<usize, StreamError> {
162        if !self.can_read {
163            return Err(StreamError::NotSupported);
164        }
165
166        loop {
167            let mut state = self.data.state.lock();
168
169            if state.closed {
170                return Err(StreamError::Closed);
171            }
172
173            if state.buffer.is_empty() {
174                if state.writer_count == 0 {
175                    // No writers left, return EOF
176                    return Ok(0);
177                } else {
178                    // Writers exist but no data available - block until data becomes available
179                    // Block the current task using the pipe read waker
180                    use crate::task::mytask;
181                    if let Some(task) = mytask() {
182                        if self.nonblocking.load(core::sync::atomic::Ordering::Relaxed) {
183                            return Err(StreamError::WouldBlock);
184                        }
185                        // CRITICAL: Drop lock before wait() to avoid deadlock
186                        let task_id = task.get_id();
187                        let trapframe = task.get_trapframe();
188
189                        // Memory barrier BEFORE lock release to ensure all writes are visible
190                        core::sync::atomic::fence(core::sync::atomic::Ordering::SeqCst);
191
192                        drop(state);
193
194                        // Memory barrier AFTER lock release to ensure lock release is visible
195                        core::sync::atomic::fence(core::sync::atomic::Ordering::SeqCst);
196
197                        // Memory barrier BEFORE wait() to ensure wait() sees correct state
198                        core::sync::atomic::fence(core::sync::atomic::Ordering::SeqCst);
199
200                        // Call wait() without holding any locks
201                        self.data.read_waker.wait(task_id, trapframe);
202
203                        // Memory barrier AFTER wait() to ensure subsequent operations are visible
204                        core::sync::atomic::fence(core::sync::atomic::Ordering::SeqCst);
205
206                        // Loop back to retry read after waking up
207                        continue;
208                    } else {
209                        // No current task context, return WouldBlock for non-blocking fallback
210                        return Err(StreamError::WouldBlock);
211                    }
212                }
213            }
214
215            let bytes_to_read = buffer.len().min(state.buffer.len());
216            for i in 0..bytes_to_read {
217                buffer[i] = state.buffer.pop_front().unwrap();
218            }
219
220            // Release lock before waking writers
221            drop(state);
222
223            // Data was consumed, wake up any waiting writers
224            if bytes_to_read > 0 {
225                self.data.write_waker.wake_all();
226            }
227
228            return Ok(bytes_to_read);
229        }
230    }
231
232    fn write(&self, buffer: &[u8]) -> Result<usize, StreamError> {
233        if !self.can_write {
234            return Err(StreamError::NotSupported);
235        }
236
237        loop {
238            let mut state = self.data.state.lock();
239
240            if state.closed {
241                return Err(StreamError::Closed);
242            }
243
244            if state.reader_count == 0 {
245                return Err(StreamError::BrokenPipe);
246            }
247
248            let available_space = state.max_size - state.buffer.len();
249            if available_space == 0 {
250                // No space available - block until space becomes available
251                // Block the current task using the pipe write waker
252                use crate::task::mytask;
253                if let Some(task) = mytask() {
254                    if self.nonblocking.load(core::sync::atomic::Ordering::Relaxed) {
255                        return Err(StreamError::WouldBlock);
256                    }
257                    // CRITICAL: Drop lock before wait() to avoid deadlock
258                    let task_id = task.get_id();
259                    let trapframe = task.get_trapframe();
260
261                    // Memory barrier BEFORE lock release to ensure all writes are visible
262                    core::sync::atomic::fence(core::sync::atomic::Ordering::SeqCst);
263
264                    drop(state);
265
266                    // Memory barrier AFTER lock release to ensure lock release is visible
267                    core::sync::atomic::fence(core::sync::atomic::Ordering::SeqCst);
268
269                    // Memory barrier BEFORE wait() to ensure wait() sees correct state
270                    core::sync::atomic::fence(core::sync::atomic::Ordering::SeqCst);
271
272                    // Call wait() without holding any locks
273                    self.data.write_waker.wait(task_id, trapframe);
274
275                    // Memory barrier AFTER wait() to ensure subsequent operations are visible
276                    core::sync::atomic::fence(core::sync::atomic::Ordering::SeqCst);
277
278                    // Loop back to retry write after waking up
279                    continue;
280                } else {
281                    // No current task context, return WouldBlock for non-blocking fallback
282                    return Err(StreamError::WouldBlock);
283                }
284            }
285
286            let bytes_to_write = buffer.len().min(available_space);
287            for &byte in &buffer[..bytes_to_write] {
288                state.buffer.push_back(byte);
289            }
290
291            // Release lock before waking readers
292            drop(state);
293
294            // Data was written, wake up any waiting readers
295            if bytes_to_write > 0 {
296                self.data.read_waker.wake_all();
297            }
298
299            return Ok(bytes_to_write);
300        }
301    }
302}
303
304impl StreamIpcOps for PipeEndpoint {
305    fn is_connected(&self) -> bool {
306        let state = self.data.state.lock();
307        !state.closed && (state.reader_count > 0 || state.writer_count > 0)
308    }
309
310    fn peer_count(&self) -> usize {
311        // This is a generic implementation - specific pipe types may override this
312        let state = self.data.state.lock();
313
314        match (self.can_read, self.can_write) {
315            (true, false) => state.writer_count, // Reader: count writers
316            (false, true) => state.reader_count, // Writer: count readers
317            (false, false) => 0,                 // Invalid endpoint
318            (true, true) => {
319                // This should not happen for unidirectional pipes
320                // Return total peers minus self
321                (state.reader_count + state.writer_count).saturating_sub(2)
322            }
323        }
324    }
325
326    fn description(&self) -> String {
327        let access = match (self.can_read, self.can_write) {
328            (true, true) => "read/write",
329            (true, false) => "read-only",
330            (false, true) => "write-only",
331            (false, false) => "no-access",
332        };
333
334        format!("{}({})", self.id, access)
335    }
336}
337
338impl CloneOps for PipeEndpoint {
339    fn custom_clone(&self) -> KernelObject {
340        // Clone this endpoint directly (which properly increments counters)
341        // and wrap the result in the SAME Arc structure to maintain proper Drop behavior
342        KernelObject::from_pipe_object(Arc::new(self.clone()))
343    }
344}
345
346impl PipeObject for PipeEndpoint {
347    fn has_readers(&self) -> bool {
348        let state = self.data.state.lock();
349        state.reader_count > 0
350    }
351
352    fn has_writers(&self) -> bool {
353        let state = self.data.state.lock();
354        state.writer_count > 0
355    }
356
357    fn buffer_size(&self) -> usize {
358        let state = self.data.state.lock();
359        state.max_size
360    }
361
362    fn available_bytes(&self) -> usize {
363        let state = self.data.state.lock();
364        state.buffer.len()
365    }
366
367    fn is_readable(&self) -> bool {
368        self.can_read
369    }
370
371    fn is_writable(&self) -> bool {
372        self.can_write
373    }
374}
375
376impl Drop for PipeEndpoint {
377    fn drop(&mut self) {
378        let mut state = self.data.state.lock();
379
380        if self.can_read {
381            state.reader_count = state.reader_count.saturating_sub(1);
382        }
383        if self.can_write {
384            state.writer_count = state.writer_count.saturating_sub(1);
385        }
386
387        if state.reader_count == 0 && state.writer_count == 0 {
388            state.closed = true;
389            state.buffer.clear();
390        }
391    }
392}
393
394impl Clone for PipeEndpoint {
395    fn clone(&self) -> Self {
396        let new_pipe = Self {
397            data: self.data.clone(),
398            can_read: self.can_read,
399            can_write: self.can_write,
400            id: format!("{}_clone", self.id),
401            nonblocking: core::sync::atomic::AtomicBool::new(
402                self.nonblocking.load(core::sync::atomic::Ordering::Relaxed),
403            ),
404        };
405
406        // Increment reference counts
407        {
408            let mut state = self.data.state.lock();
409            if self.can_read {
410                state.reader_count += 1;
411            }
412            if self.can_write {
413                state.writer_count += 1;
414            }
415        }
416
417        new_pipe
418    }
419}
420
421/// A unidirectional pipe (read-only or write-only endpoint)
422pub struct UnidirectionalPipe {
423    endpoint: PipeEndpoint,
424}
425
426impl UnidirectionalPipe {
427    /// Create a new pipe pair (read_end, write_end) as KernelObjects
428    pub fn create_pair(buffer_size: usize) -> (KernelObject, KernelObject) {
429        let data = SharedPipeData::new(buffer_size);
430
431        let read_end = Self {
432            endpoint: PipeEndpoint::new(data.clone(), true, false, "unidirectional_read".into()),
433        };
434
435        let write_end = Self {
436            endpoint: PipeEndpoint::new(data.clone(), false, true, "unidirectional_write".into()),
437        };
438
439        // Wrap in KernelObjects
440        let read_obj = KernelObject::from_pipe_object(Arc::new(read_end));
441        let write_obj = KernelObject::from_pipe_object(Arc::new(write_end));
442
443        (read_obj, write_obj)
444    }
445
446    /// Create a new pipe pair for internal testing (returns raw pipes)
447    #[cfg(test)]
448    pub fn create_pair_raw(buffer_size: usize) -> (Self, Self) {
449        let data = SharedPipeData::new(buffer_size);
450
451        let read_end = Self {
452            endpoint: PipeEndpoint::new(data.clone(), true, false, "unidirectional_read".into()),
453        };
454
455        let write_end = Self {
456            endpoint: PipeEndpoint::new(data.clone(), false, true, "unidirectional_write".into()),
457        };
458
459        (read_end, write_end)
460    }
461}
462
463// Delegate all traits to the underlying endpoint
464impl StreamOps for UnidirectionalPipe {
465    fn read(&self, buffer: &mut [u8]) -> Result<usize, StreamError> {
466        self.endpoint.read(buffer)
467    }
468
469    fn write(&self, buffer: &[u8]) -> Result<usize, StreamError> {
470        self.endpoint.write(buffer)
471    }
472}
473
474impl StreamIpcOps for UnidirectionalPipe {
475    fn is_connected(&self) -> bool {
476        self.endpoint.is_connected()
477    }
478
479    fn peer_count(&self) -> usize {
480        // Unidirectional pipe specific peer_count implementation
481        let state = self.endpoint.data.state.lock();
482
483        match (self.endpoint.can_read, self.endpoint.can_write) {
484            (true, false) => state.writer_count, // Reader: count writers
485            (false, true) => state.reader_count, // Writer: count readers
486            _ => 0, // Unidirectional pipes should not have both capabilities
487        }
488    }
489
490    fn description(&self) -> String {
491        self.endpoint.description()
492    }
493}
494
495impl CloneOps for UnidirectionalPipe {
496    fn custom_clone(&self) -> KernelObject {
497        // Clone this pipe directly (which properly increments counters)
498        // and wrap the result in a new Arc
499        KernelObject::from_pipe_object(Arc::new(self.clone()))
500    }
501}
502
503impl PipeObject for UnidirectionalPipe {
504    fn has_readers(&self) -> bool {
505        self.endpoint.has_readers()
506    }
507
508    fn has_writers(&self) -> bool {
509        self.endpoint.has_writers()
510    }
511
512    fn buffer_size(&self) -> usize {
513        self.endpoint.buffer_size()
514    }
515
516    fn available_bytes(&self) -> usize {
517        self.endpoint.available_bytes()
518    }
519
520    fn is_readable(&self) -> bool {
521        self.endpoint.is_readable()
522    }
523
524    fn is_writable(&self) -> bool {
525        self.endpoint.is_writable()
526    }
527
528    fn as_selectable(&self) -> Option<&dyn Selectable> {
529        Some(self)
530    }
531}
532
533impl Clone for UnidirectionalPipe {
534    fn clone(&self) -> Self {
535        Self {
536            endpoint: self.endpoint.clone(),
537        }
538    }
539}
540
541impl Selectable for UnidirectionalPipe {
542    fn current_ready(&self, interest: ReadyInterest) -> ReadySet {
543        let mut set = ReadySet::none();
544        // Inspect internal state once
545        let st = self.endpoint.data.state.lock();
546        if interest.read && self.endpoint.can_read {
547            // Readable if buffer has data or writers are gone (EOF)
548            set.read = !st.buffer.is_empty() || st.writer_count == 0;
549        }
550        if interest.write && self.endpoint.can_write {
551            // Writable if space available or no readers (will error but not block)
552            let available_space = st.max_size.saturating_sub(st.buffer.len());
553            set.write = available_space > 0 || st.reader_count == 0;
554        }
555        if interest.except {
556            set.except = false;
557        }
558        set
559    }
560
561    fn wait_until_ready(
562        &self,
563        interest: ReadyInterest,
564        trapframe: &mut crate::arch::Trapframe,
565        _timeout_ticks: Option<u64>,
566    ) -> SelectWaitOutcome {
567        use crate::task::mytask;
568        // Prefer read wait if requested; otherwise write wait; except is ignored.
569        if interest.read && self.endpoint.can_read {
570            let should_block = {
571                let st = self.endpoint.data.state.lock();
572                st.buffer.is_empty() && st.writer_count > 0
573            }; // Lock released here
574
575            if should_block {
576                if let Some(task) = mytask() {
577                    // Memory barrier BEFORE wait() to ensure wait() sees correct state
578                    core::sync::atomic::fence(core::sync::atomic::Ordering::SeqCst);
579
580                    // CRITICAL: Call wait() without holding any locks
581                    self.endpoint.data.read_waker.wait(task.get_id(), trapframe);
582
583                    // Memory barrier AFTER wait() to ensure subsequent operations are visible
584                    core::sync::atomic::fence(core::sync::atomic::Ordering::SeqCst);
585                }
586            }
587        } else if interest.write && self.endpoint.can_write {
588            let should_block = {
589                let st = self.endpoint.data.state.lock();
590                let space = st.max_size.saturating_sub(st.buffer.len());
591                space == 0 && st.reader_count > 0
592            }; // Lock released here
593
594            if should_block {
595                if let Some(task) = mytask() {
596                    // Memory barrier BEFORE wait() to ensure wait() sees correct state
597                    core::sync::atomic::fence(core::sync::atomic::Ordering::SeqCst);
598
599                    // CRITICAL: Call wait() without holding any locks
600                    self.endpoint
601                        .data
602                        .write_waker
603                        .wait(task.get_id(), trapframe);
604
605                    // Memory barrier AFTER wait() to ensure subsequent operations are visible
606                    core::sync::atomic::fence(core::sync::atomic::Ordering::SeqCst);
607                }
608            }
609        }
610        SelectWaitOutcome::Ready
611    }
612
613    fn set_nonblocking(&self, enabled: bool) {
614        self.endpoint
615            .nonblocking
616            .store(enabled, core::sync::atomic::Ordering::Relaxed);
617    }
618
619    fn is_nonblocking(&self) -> bool {
620        self.endpoint
621            .nonblocking
622            .load(core::sync::atomic::Ordering::Relaxed)
623    }
624}
625
626#[cfg(test)]
627mod tests {
628    use super::*;
629
630    #[test_case]
631    fn test_pipe_creation() {
632        let (read_end, write_end) = UnidirectionalPipe::create_pair_raw(1024);
633
634        assert!(read_end.is_readable());
635        assert!(!read_end.is_writable());
636        assert!(!write_end.is_readable());
637        assert!(write_end.is_writable());
638
639        assert!(read_end.has_writers());
640        assert!(write_end.has_readers());
641    }
642
643    #[test_case]
644    fn test_pipe_basic_io() {
645        let (read_end, write_end) = UnidirectionalPipe::create_pair_raw(1024);
646
647        let data = b"Hello, Pipe!";
648        let written = write_end.write(data).unwrap();
649        assert_eq!(written, data.len());
650
651        let mut buffer = [0u8; 1024];
652        let read = read_end.read(&mut buffer).unwrap();
653        assert_eq!(read, data.len());
654        assert_eq!(&buffer[..read], data);
655    }
656
657    #[test_case]
658    fn test_pipe_nonblocking_behaviour() {
659        let (read_end, write_end) = UnidirectionalPipe::create_pair_raw(4);
660
661        crate::object::capability::selectable::Selectable::set_nonblocking(&read_end, true);
662        crate::object::capability::selectable::Selectable::set_nonblocking(&write_end, true);
663
664        let mut buffer = [0u8; 1];
665        let read_result = read_end.read(&mut buffer);
666        assert!(matches!(read_result, Err(StreamError::WouldBlock)));
667
668        let data = [1u8, 2, 3, 4];
669        assert_eq!(write_end.write(&data).unwrap(), data.len());
670
671        let would_block = write_end.write(&[5]);
672        assert!(matches!(would_block, Err(StreamError::WouldBlock)));
673    }
674
675    #[test_case]
676    fn test_pipe_reference_counting() {
677        let (read_end, write_end) = UnidirectionalPipe::create_pair_raw(1024);
678
679        // Initially: 1 reader, 1 writer
680        assert_eq!(read_end.peer_count(), 1); // 1 writer peer
681        assert_eq!(write_end.peer_count(), 1); // 1 reader peer
682        assert!(read_end.has_writers());
683        assert!(write_end.has_readers());
684
685        // Debug: Check internal state
686        {
687            let state = read_end.endpoint.data.state.lock();
688            assert_eq!(state.reader_count, 1);
689            assert_eq!(state.writer_count, 1);
690        }
691
692        // Clone the read end (should increment reader count)
693        let read_end_clone = read_end.clone();
694
695        // Debug: Check internal state after clone
696        {
697            let state = read_end.endpoint.data.state.lock();
698            assert_eq!(state.reader_count, 2); // Should be 2 after clone
699            assert_eq!(state.writer_count, 1); // Should remain 1
700        }
701
702        assert_eq!(read_end.peer_count(), 1); // Reader: 1 writer peer
703        assert_eq!(write_end.peer_count(), 2); // Writer: 2 reader peers (read_end + read_end_clone)
704        assert_eq!(read_end_clone.peer_count(), 1); // Reader: 1 writer peer
705
706        // Clone the write end (should increment writer count)
707        let write_end_clone = write_end.clone();
708
709        // Debug: Check internal state after write clone
710        {
711            let state = read_end.endpoint.data.state.lock();
712            assert_eq!(state.reader_count, 2); // Still 2 readers
713            assert_eq!(state.writer_count, 2); // Now 2 writers
714        }
715
716        assert_eq!(read_end.peer_count(), 2); // Reader: 2 writer peers (write_end + write_end_clone)
717        assert_eq!(write_end.peer_count(), 2); // Writer: 2 reader peers (read_end + read_end_clone)
718        assert_eq!(write_end_clone.peer_count(), 2); // Writer: 2 reader peers (read_end + read_end_clone)
719
720        // Drop one reader (should decrement reader count)
721        drop(read_end_clone);
722        assert_eq!(read_end.peer_count(), 2); // Reader: 2 writer peers (write_end + write_end_clone)
723        assert_eq!(write_end.peer_count(), 1); // Writer: 1 reader peer (read_end only)
724
725        // Drop one writer (should decrement writer count)
726        drop(write_end_clone);
727        assert_eq!(read_end.peer_count(), 1); // Reader: 1 writer peer (write_end only)
728        assert_eq!(write_end.peer_count(), 1); // Writer: 1 reader peer (read_end only)
729    }
730
731    #[test_case]
732    fn test_pipe_broken_pipe_detection() {
733        let (read_end, write_end) = UnidirectionalPipe::create_pair_raw(1024);
734
735        // Initially both ends are connected
736        assert!(read_end.is_connected());
737        assert!(write_end.is_connected());
738        assert!(read_end.has_writers());
739        assert!(write_end.has_readers());
740
741        // Drop the write end (should break the pipe for readers)
742        drop(write_end);
743
744        // Read end should detect that writers are gone
745        assert!(!read_end.has_writers());
746
747        // Reading should return EOF (0 bytes) when no writers remain
748        let mut buffer = [0u8; 10];
749        let bytes_read = read_end.read(&mut buffer).unwrap();
750        assert_eq!(bytes_read, 0); // EOF
751    }
752
753    #[test_case]
754    fn test_pipe_write_to_closed_pipe() {
755        let (read_end, write_end) = UnidirectionalPipe::create_pair_raw(1024);
756
757        // Drop the read end (no more readers)
758        drop(read_end);
759
760        // Write end should detect that readers are gone
761        assert!(!write_end.has_readers());
762
763        // Writing should fail with BrokenPipe error
764        let data = b"Should fail";
765        let result = write_end.write(data);
766        assert!(result.is_err());
767        if let Err(StreamError::BrokenPipe) = result {
768            // Expected error
769        } else {
770            panic!("Expected BrokenPipe error, got: {:?}", result);
771        }
772    }
773
774    #[test_case]
775    fn test_pipe_clone_independent_operations() {
776        let (read_end, write_end) = UnidirectionalPipe::create_pair_raw(1024);
777
778        // Clone both ends
779        let read_clone = read_end.clone();
780        let write_clone = write_end.clone();
781
782        // Write from original write end
783        let data1 = b"From original";
784        write_end.write(data1).unwrap();
785
786        // Write from cloned write end
787        let data2 = b" and clone";
788        write_clone.write(data2).unwrap();
789
790        // Read all data from original read end
791        let mut buffer1 = [0u8; 50];
792        let bytes1 = read_end.read(&mut buffer1).unwrap();
793        let total_expected = data1.len() + data2.len();
794        assert_eq!(bytes1, total_expected);
795
796        // The data should be concatenated in the order of writes
797        let mut expected_data = Vec::new();
798        expected_data.extend_from_slice(data1);
799        expected_data.extend_from_slice(data2);
800        assert_eq!(&buffer1[..bytes1], &expected_data);
801
802        // Buffer should now be empty - trying to read should block or return EOF
803        let mut buffer2 = [0u8; 10];
804        let bytes2 = read_clone.read(&mut buffer2);
805        assert!(bytes2.is_err() || bytes2.unwrap() == 0);
806    }
807
808    #[test_case]
809    fn test_pipe_buffer_management() {
810        let (read_end, write_end) = UnidirectionalPipe::create_pair_raw(10); // Small buffer
811
812        // Test buffer size reporting
813        assert_eq!(read_end.buffer_size(), 10);
814        assert_eq!(write_end.buffer_size(), 10);
815        assert_eq!(read_end.available_bytes(), 0);
816
817        // Fill buffer partially
818        let data = b"12345";
819        write_end.write(data).unwrap();
820        assert_eq!(read_end.available_bytes(), 5);
821        assert_eq!(write_end.available_bytes(), 5);
822
823        // Fill buffer completely
824        let more_data = b"67890";
825        write_end.write(more_data).unwrap();
826        assert_eq!(read_end.available_bytes(), 10);
827
828        // Buffer should be full, next write should fail or partial
829        let overflow_data = b"X";
830        let result = write_end.write(overflow_data);
831        assert!(result.is_err() || result.unwrap() == 0);
832
833        // Read some data to make space
834        let mut buffer = [0u8; 3];
835        let bytes_read = read_end.read(&mut buffer).unwrap();
836        assert_eq!(bytes_read, 3);
837        assert_eq!(&buffer, b"123");
838        assert_eq!(read_end.available_bytes(), 7);
839
840        // Now writing should work again
841        let new_data = b"XYZ";
842        let bytes_written = write_end.write(new_data).unwrap();
843        assert_eq!(bytes_written, 3);
844        assert_eq!(read_end.available_bytes(), 10);
845    }
846
847    // === DUP SEMANTICS TESTS ===
848    // These tests verify correct dup() behavior for pipes at the KernelObject level
849
850    #[test_case]
851    fn test_kernel_object_pipe_dup_semantics() {
852        // Create pipe through KernelObject interface
853        let (read_obj, write_obj) = UnidirectionalPipe::create_pair(1024);
854
855        // Verify initial state through KernelObject interface
856        if let Some(read_pipe) = read_obj.as_pipe() {
857            if let Some(write_pipe) = write_obj.as_pipe() {
858                // Initially: 1 reader, 1 writer
859                assert_eq!(read_pipe.peer_count(), 1); // 1 writer
860                assert_eq!(write_pipe.peer_count(), 1); // 1 reader
861                assert!(read_pipe.has_writers());
862                assert!(write_pipe.has_readers());
863            } else {
864                panic!("write_obj should be a pipe");
865            }
866        } else {
867            panic!("read_obj should be a pipe");
868        }
869
870        // Clone the read end using KernelObject::clone (simulates dup syscall)
871        let read_obj_cloned = read_obj.clone();
872
873        // Verify that the clone operation correctly updated peer counts
874        if let Some(read_pipe) = read_obj.as_pipe() {
875            if let Some(write_pipe) = write_obj.as_pipe() {
876                if let Some(read_pipe_cloned) = read_obj_cloned.as_pipe() {
877                    // After dup: 2 readers, 1 writer
878                    assert_eq!(write_pipe.peer_count(), 2); // 2 readers now!
879                    assert_eq!(read_pipe.peer_count(), 1); // 1 writer
880                    assert_eq!(read_pipe_cloned.peer_count(), 1); // 1 writer
881
882                    // All endpoints should still be connected
883                    assert!(read_pipe.has_writers());
884                    assert!(write_pipe.has_readers());
885                    assert!(read_pipe_cloned.has_writers());
886                } else {
887                    panic!("read_obj_cloned should be a pipe");
888                }
889            }
890        }
891    }
892
893    #[test_case]
894    fn test_kernel_object_pipe_write_dup_semantics() {
895        // Create pipe through KernelObject interface
896        let (read_obj, write_obj) = UnidirectionalPipe::create_pair(1024);
897
898        // Clone the write end using KernelObject::clone (simulates dup syscall)
899        let write_obj_cloned = write_obj.clone();
900
901        // Verify that the clone operation correctly updated peer counts
902        if let Some(read_pipe) = read_obj.as_pipe() {
903            if let Some(write_pipe) = write_obj.as_pipe() {
904                if let Some(write_pipe_cloned) = write_obj_cloned.as_pipe() {
905                    // After dup: 1 reader, 2 writers
906                    assert_eq!(read_pipe.peer_count(), 2); // 2 writers now!
907                    assert_eq!(write_pipe.peer_count(), 1); // 1 reader
908                    assert_eq!(write_pipe_cloned.peer_count(), 1); // 1 reader
909
910                    // All endpoints should still be connected
911                    assert!(read_pipe.has_writers());
912                    assert!(write_pipe.has_readers());
913                    assert!(write_pipe_cloned.has_readers());
914                } else {
915                    panic!("write_obj_cloned should be a pipe");
916                }
917            }
918        }
919    }
920
921    #[test_case]
922    fn test_kernel_object_pipe_dup_io_operations() {
923        // Create pipe through KernelObject interface
924        let (read_obj, write_obj) = UnidirectionalPipe::create_pair(1024);
925
926        // Clone both ends
927        let read_obj_cloned = read_obj.clone();
928        let write_obj_cloned = write_obj.clone();
929
930        // Write from original write end
931        if let Some(write_stream) = write_obj.as_stream() {
932            let data1 = b"Hello from original writer";
933            let written = write_stream.write(data1).unwrap();
934            assert_eq!(written, data1.len());
935        }
936
937        // Write from cloned write end
938        if let Some(write_stream_cloned) = write_obj_cloned.as_stream() {
939            let data2 = b" and cloned writer";
940            let written = write_stream_cloned.write(data2).unwrap();
941            assert_eq!(written, data2.len());
942        }
943
944        // Read from original read end
945        if let Some(read_stream) = read_obj.as_stream() {
946            let mut buffer = [0u8; 100];
947            let bytes_read = read_stream.read(&mut buffer).unwrap();
948            let total_expected = b"Hello from original writer and cloned writer".len();
949            assert_eq!(bytes_read, total_expected);
950            assert_eq!(
951                &buffer[..bytes_read],
952                b"Hello from original writer and cloned writer"
953            );
954        }
955
956        // Buffer should now be empty
957        if let Some(read_stream_cloned) = read_obj_cloned.as_stream() {
958            let mut buffer = [0u8; 10];
959            let result = read_stream_cloned.read(&mut buffer);
960            // Should either return 0 (EOF) or WouldBlock since buffer is empty
961            assert!(result.is_err() || result.unwrap() == 0);
962        }
963    }
964
965    #[test_case]
966    fn test_kernel_object_pipe_dup_broken_pipe_detection() {
967        // Create pipe through KernelObject interface
968        let (read_obj, write_obj) = UnidirectionalPipe::create_pair(1024);
969
970        // Clone the read end
971        let read_obj_cloned = read_obj.clone();
972
973        // Initially, write end should see 2 readers
974        if let Some(write_pipe) = write_obj.as_pipe() {
975            assert_eq!(write_pipe.peer_count(), 2);
976        }
977
978        // Drop one read end
979        drop(read_obj);
980
981        // Write end should still see 1 reader
982        if let Some(write_pipe) = write_obj.as_pipe() {
983            assert_eq!(write_pipe.peer_count(), 1);
984            assert!(write_pipe.has_readers());
985        }
986
987        // Writing should still work
988        if let Some(write_stream) = write_obj.as_stream() {
989            let data = b"Still works";
990            let written = write_stream.write(data).unwrap();
991            assert_eq!(written, data.len());
992        }
993
994        // Drop the last read end
995        drop(read_obj_cloned);
996
997        // Now write end should see no readers
998        if let Some(write_pipe) = write_obj.as_pipe() {
999            assert_eq!(write_pipe.peer_count(), 0);
1000            assert!(!write_pipe.has_readers());
1001        }
1002
1003        // Writing should now fail with BrokenPipe
1004        if let Some(write_stream) = write_obj.as_stream() {
1005            let data = b"Should fail";
1006            let result = write_stream.write(data);
1007            assert!(result.is_err());
1008            if let Err(StreamError::BrokenPipe) = result {
1009                // Expected error
1010            } else {
1011                panic!("Expected BrokenPipe error, got: {:?}", result);
1012            }
1013        }
1014    }
1015
1016    #[test_case]
1017    fn test_kernel_object_pipe_dup_vs_arc_clone_comparison() {
1018        // This test demonstrates the difference between KernelObject::clone (correct dup)
1019        // and Arc::clone (incorrect for pipes)
1020
1021        let (read_obj, write_obj) = UnidirectionalPipe::create_pair(1024);
1022
1023        // === Correct way: KernelObject::clone (uses CloneOps) ===
1024        let _read_obj_dup = read_obj.clone();
1025
1026        // This should correctly increment reader count
1027        if let Some(write_pipe) = write_obj.as_pipe() {
1028            assert_eq!(write_pipe.peer_count(), 2); // 2 readers after dup
1029        }
1030
1031        // === Demonstrate what would happen with Arc::clone (incorrect) ===
1032        // We can't directly test Arc::clone without exposing the internal Arc,
1033        // but we can verify that our CloneOps implementation is being used
1034
1035        if let Some(cloneable) = read_obj.as_cloneable() {
1036            // This should be Some for pipes (they implement CloneOps)
1037            let _custom_cloned = cloneable.custom_clone();
1038
1039            // Verify the custom clone also works correctly
1040            if let Some(write_pipe) = write_obj.as_pipe() {
1041                assert_eq!(write_pipe.peer_count(), 3); // 3 readers now
1042            }
1043        } else {
1044            panic!("Pipe should implement CloneOps capability");
1045        }
1046    }
1047}