1#[cfg(test)]
8use alloc::vec::Vec;
9use alloc::{collections::VecDeque, format, string::String, sync::Arc};
10use spin::Mutex;
11
12use super::{IpcError, StreamIpcOps};
13use crate::object::KernelObject;
14use crate::object::capability::selectable::{
15 ReadyInterest, ReadySet, SelectWaitOutcome, Selectable,
16};
17use crate::object::capability::{CloneOps, StreamError, StreamOps};
18use crate::sync::waker::Waker;
19
20pub trait PipeObject: StreamIpcOps + CloneOps {
24 fn has_readers(&self) -> bool;
26
27 fn has_writers(&self) -> bool;
29
30 fn buffer_size(&self) -> usize;
32
33 fn available_bytes(&self) -> usize;
35
36 fn is_readable(&self) -> bool;
38
39 fn is_writable(&self) -> bool;
41
42 fn as_selectable(&self) -> Option<&dyn Selectable> {
47 None
48 }
49}
50
51#[derive(Debug, Clone)]
53pub enum PipeError {
54 BrokenPipe,
56 BufferFull,
58 BufferEmpty,
60 InvalidState,
62 IpcError(IpcError),
64}
65
66impl From<IpcError> for PipeError {
67 fn from(ipc_err: IpcError) -> Self {
68 PipeError::IpcError(ipc_err)
69 }
70}
71
72impl From<StreamError> for PipeError {
73 fn from(stream_err: StreamError) -> Self {
74 PipeError::IpcError(IpcError::StreamError(stream_err))
75 }
76}
77
78struct PipeState {
80 buffer: VecDeque<u8>,
82 max_size: usize,
84 reader_count: usize,
86 writer_count: usize,
88 closed: bool,
90}
91
92struct SharedPipeData {
95 state: Mutex<PipeState>,
97 read_waker: Waker,
99 write_waker: Waker,
101}
102
103impl SharedPipeData {
104 fn new(buffer_size: usize) -> Arc<Self> {
105 Arc::new(Self {
106 state: Mutex::new(PipeState {
107 buffer: VecDeque::with_capacity(buffer_size),
108 max_size: buffer_size,
109 reader_count: 0,
110 writer_count: 0,
111 closed: false,
112 }),
113 read_waker: Waker::new_interruptible("pipe_read"),
114 write_waker: Waker::new_interruptible("pipe_write"),
115 })
116 }
117}
118
119pub struct PipeEndpoint {
124 data: Arc<SharedPipeData>,
126 can_read: bool,
128 can_write: bool,
130 id: String,
132 nonblocking: core::sync::atomic::AtomicBool,
134}
135
136impl PipeEndpoint {
137 fn new(data: Arc<SharedPipeData>, can_read: bool, can_write: bool, id: String) -> Self {
139 {
141 let mut pipe_state = data.state.lock();
142 if can_read {
143 pipe_state.reader_count += 1;
144 }
145 if can_write {
146 pipe_state.writer_count += 1;
147 }
148 }
149
150 Self {
151 data,
152 can_read,
153 can_write,
154 id,
155 nonblocking: core::sync::atomic::AtomicBool::new(false),
156 }
157 }
158}
159
160impl StreamOps for PipeEndpoint {
161 fn read(&self, buffer: &mut [u8]) -> Result<usize, StreamError> {
162 if !self.can_read {
163 return Err(StreamError::NotSupported);
164 }
165
166 loop {
167 let mut state = self.data.state.lock();
168
169 if state.closed {
170 return Err(StreamError::Closed);
171 }
172
173 if state.buffer.is_empty() {
174 if state.writer_count == 0 {
175 return Ok(0);
177 } else {
178 use crate::task::mytask;
181 if let Some(task) = mytask() {
182 if self.nonblocking.load(core::sync::atomic::Ordering::Relaxed) {
183 return Err(StreamError::WouldBlock);
184 }
185 let task_id = task.get_id();
187 let trapframe = task.get_trapframe();
188
189 core::sync::atomic::fence(core::sync::atomic::Ordering::SeqCst);
191
192 drop(state);
193
194 core::sync::atomic::fence(core::sync::atomic::Ordering::SeqCst);
196
197 core::sync::atomic::fence(core::sync::atomic::Ordering::SeqCst);
199
200 self.data.read_waker.wait(task_id, trapframe);
202
203 core::sync::atomic::fence(core::sync::atomic::Ordering::SeqCst);
205
206 continue;
208 } else {
209 return Err(StreamError::WouldBlock);
211 }
212 }
213 }
214
215 let bytes_to_read = buffer.len().min(state.buffer.len());
216 for i in 0..bytes_to_read {
217 buffer[i] = state.buffer.pop_front().unwrap();
218 }
219
220 drop(state);
222
223 if bytes_to_read > 0 {
225 self.data.write_waker.wake_all();
226 }
227
228 return Ok(bytes_to_read);
229 }
230 }
231
232 fn write(&self, buffer: &[u8]) -> Result<usize, StreamError> {
233 if !self.can_write {
234 return Err(StreamError::NotSupported);
235 }
236
237 loop {
238 let mut state = self.data.state.lock();
239
240 if state.closed {
241 return Err(StreamError::Closed);
242 }
243
244 if state.reader_count == 0 {
245 return Err(StreamError::BrokenPipe);
246 }
247
248 let available_space = state.max_size - state.buffer.len();
249 if available_space == 0 {
250 use crate::task::mytask;
253 if let Some(task) = mytask() {
254 if self.nonblocking.load(core::sync::atomic::Ordering::Relaxed) {
255 return Err(StreamError::WouldBlock);
256 }
257 let task_id = task.get_id();
259 let trapframe = task.get_trapframe();
260
261 core::sync::atomic::fence(core::sync::atomic::Ordering::SeqCst);
263
264 drop(state);
265
266 core::sync::atomic::fence(core::sync::atomic::Ordering::SeqCst);
268
269 core::sync::atomic::fence(core::sync::atomic::Ordering::SeqCst);
271
272 self.data.write_waker.wait(task_id, trapframe);
274
275 core::sync::atomic::fence(core::sync::atomic::Ordering::SeqCst);
277
278 continue;
280 } else {
281 return Err(StreamError::WouldBlock);
283 }
284 }
285
286 let bytes_to_write = buffer.len().min(available_space);
287 for &byte in &buffer[..bytes_to_write] {
288 state.buffer.push_back(byte);
289 }
290
291 drop(state);
293
294 if bytes_to_write > 0 {
296 self.data.read_waker.wake_all();
297 }
298
299 return Ok(bytes_to_write);
300 }
301 }
302}
303
304impl StreamIpcOps for PipeEndpoint {
305 fn is_connected(&self) -> bool {
306 let state = self.data.state.lock();
307 !state.closed && (state.reader_count > 0 || state.writer_count > 0)
308 }
309
310 fn peer_count(&self) -> usize {
311 let state = self.data.state.lock();
313
314 match (self.can_read, self.can_write) {
315 (true, false) => state.writer_count, (false, true) => state.reader_count, (false, false) => 0, (true, true) => {
319 (state.reader_count + state.writer_count).saturating_sub(2)
322 }
323 }
324 }
325
326 fn description(&self) -> String {
327 let access = match (self.can_read, self.can_write) {
328 (true, true) => "read/write",
329 (true, false) => "read-only",
330 (false, true) => "write-only",
331 (false, false) => "no-access",
332 };
333
334 format!("{}({})", self.id, access)
335 }
336}
337
338impl CloneOps for PipeEndpoint {
339 fn custom_clone(&self) -> KernelObject {
340 KernelObject::from_pipe_object(Arc::new(self.clone()))
343 }
344}
345
346impl PipeObject for PipeEndpoint {
347 fn has_readers(&self) -> bool {
348 let state = self.data.state.lock();
349 state.reader_count > 0
350 }
351
352 fn has_writers(&self) -> bool {
353 let state = self.data.state.lock();
354 state.writer_count > 0
355 }
356
357 fn buffer_size(&self) -> usize {
358 let state = self.data.state.lock();
359 state.max_size
360 }
361
362 fn available_bytes(&self) -> usize {
363 let state = self.data.state.lock();
364 state.buffer.len()
365 }
366
367 fn is_readable(&self) -> bool {
368 self.can_read
369 }
370
371 fn is_writable(&self) -> bool {
372 self.can_write
373 }
374}
375
376impl Drop for PipeEndpoint {
377 fn drop(&mut self) {
378 let mut state = self.data.state.lock();
379
380 if self.can_read {
381 state.reader_count = state.reader_count.saturating_sub(1);
382 }
383 if self.can_write {
384 state.writer_count = state.writer_count.saturating_sub(1);
385 }
386
387 if state.reader_count == 0 && state.writer_count == 0 {
388 state.closed = true;
389 state.buffer.clear();
390 }
391 }
392}
393
394impl Clone for PipeEndpoint {
395 fn clone(&self) -> Self {
396 let new_pipe = Self {
397 data: self.data.clone(),
398 can_read: self.can_read,
399 can_write: self.can_write,
400 id: format!("{}_clone", self.id),
401 nonblocking: core::sync::atomic::AtomicBool::new(
402 self.nonblocking.load(core::sync::atomic::Ordering::Relaxed),
403 ),
404 };
405
406 {
408 let mut state = self.data.state.lock();
409 if self.can_read {
410 state.reader_count += 1;
411 }
412 if self.can_write {
413 state.writer_count += 1;
414 }
415 }
416
417 new_pipe
418 }
419}
420
421pub struct UnidirectionalPipe {
423 endpoint: PipeEndpoint,
424}
425
426impl UnidirectionalPipe {
427 pub fn create_pair(buffer_size: usize) -> (KernelObject, KernelObject) {
429 let data = SharedPipeData::new(buffer_size);
430
431 let read_end = Self {
432 endpoint: PipeEndpoint::new(data.clone(), true, false, "unidirectional_read".into()),
433 };
434
435 let write_end = Self {
436 endpoint: PipeEndpoint::new(data.clone(), false, true, "unidirectional_write".into()),
437 };
438
439 let read_obj = KernelObject::from_pipe_object(Arc::new(read_end));
441 let write_obj = KernelObject::from_pipe_object(Arc::new(write_end));
442
443 (read_obj, write_obj)
444 }
445
446 #[cfg(test)]
448 pub fn create_pair_raw(buffer_size: usize) -> (Self, Self) {
449 let data = SharedPipeData::new(buffer_size);
450
451 let read_end = Self {
452 endpoint: PipeEndpoint::new(data.clone(), true, false, "unidirectional_read".into()),
453 };
454
455 let write_end = Self {
456 endpoint: PipeEndpoint::new(data.clone(), false, true, "unidirectional_write".into()),
457 };
458
459 (read_end, write_end)
460 }
461}
462
463impl StreamOps for UnidirectionalPipe {
465 fn read(&self, buffer: &mut [u8]) -> Result<usize, StreamError> {
466 self.endpoint.read(buffer)
467 }
468
469 fn write(&self, buffer: &[u8]) -> Result<usize, StreamError> {
470 self.endpoint.write(buffer)
471 }
472}
473
474impl StreamIpcOps for UnidirectionalPipe {
475 fn is_connected(&self) -> bool {
476 self.endpoint.is_connected()
477 }
478
479 fn peer_count(&self) -> usize {
480 let state = self.endpoint.data.state.lock();
482
483 match (self.endpoint.can_read, self.endpoint.can_write) {
484 (true, false) => state.writer_count, (false, true) => state.reader_count, _ => 0, }
488 }
489
490 fn description(&self) -> String {
491 self.endpoint.description()
492 }
493}
494
495impl CloneOps for UnidirectionalPipe {
496 fn custom_clone(&self) -> KernelObject {
497 KernelObject::from_pipe_object(Arc::new(self.clone()))
500 }
501}
502
503impl PipeObject for UnidirectionalPipe {
504 fn has_readers(&self) -> bool {
505 self.endpoint.has_readers()
506 }
507
508 fn has_writers(&self) -> bool {
509 self.endpoint.has_writers()
510 }
511
512 fn buffer_size(&self) -> usize {
513 self.endpoint.buffer_size()
514 }
515
516 fn available_bytes(&self) -> usize {
517 self.endpoint.available_bytes()
518 }
519
520 fn is_readable(&self) -> bool {
521 self.endpoint.is_readable()
522 }
523
524 fn is_writable(&self) -> bool {
525 self.endpoint.is_writable()
526 }
527
528 fn as_selectable(&self) -> Option<&dyn Selectable> {
529 Some(self)
530 }
531}
532
533impl Clone for UnidirectionalPipe {
534 fn clone(&self) -> Self {
535 Self {
536 endpoint: self.endpoint.clone(),
537 }
538 }
539}
540
541impl Selectable for UnidirectionalPipe {
542 fn current_ready(&self, interest: ReadyInterest) -> ReadySet {
543 let mut set = ReadySet::none();
544 let st = self.endpoint.data.state.lock();
546 if interest.read && self.endpoint.can_read {
547 set.read = !st.buffer.is_empty() || st.writer_count == 0;
549 }
550 if interest.write && self.endpoint.can_write {
551 let available_space = st.max_size.saturating_sub(st.buffer.len());
553 set.write = available_space > 0 || st.reader_count == 0;
554 }
555 if interest.except {
556 set.except = false;
557 }
558 set
559 }
560
561 fn wait_until_ready(
562 &self,
563 interest: ReadyInterest,
564 trapframe: &mut crate::arch::Trapframe,
565 _timeout_ticks: Option<u64>,
566 ) -> SelectWaitOutcome {
567 use crate::task::mytask;
568 if interest.read && self.endpoint.can_read {
570 let should_block = {
571 let st = self.endpoint.data.state.lock();
572 st.buffer.is_empty() && st.writer_count > 0
573 }; if should_block {
576 if let Some(task) = mytask() {
577 core::sync::atomic::fence(core::sync::atomic::Ordering::SeqCst);
579
580 self.endpoint.data.read_waker.wait(task.get_id(), trapframe);
582
583 core::sync::atomic::fence(core::sync::atomic::Ordering::SeqCst);
585 }
586 }
587 } else if interest.write && self.endpoint.can_write {
588 let should_block = {
589 let st = self.endpoint.data.state.lock();
590 let space = st.max_size.saturating_sub(st.buffer.len());
591 space == 0 && st.reader_count > 0
592 }; if should_block {
595 if let Some(task) = mytask() {
596 core::sync::atomic::fence(core::sync::atomic::Ordering::SeqCst);
598
599 self.endpoint
601 .data
602 .write_waker
603 .wait(task.get_id(), trapframe);
604
605 core::sync::atomic::fence(core::sync::atomic::Ordering::SeqCst);
607 }
608 }
609 }
610 SelectWaitOutcome::Ready
611 }
612
613 fn set_nonblocking(&self, enabled: bool) {
614 self.endpoint
615 .nonblocking
616 .store(enabled, core::sync::atomic::Ordering::Relaxed);
617 }
618
619 fn is_nonblocking(&self) -> bool {
620 self.endpoint
621 .nonblocking
622 .load(core::sync::atomic::Ordering::Relaxed)
623 }
624}
625
626#[cfg(test)]
627mod tests {
628 use super::*;
629
630 #[test_case]
631 fn test_pipe_creation() {
632 let (read_end, write_end) = UnidirectionalPipe::create_pair_raw(1024);
633
634 assert!(read_end.is_readable());
635 assert!(!read_end.is_writable());
636 assert!(!write_end.is_readable());
637 assert!(write_end.is_writable());
638
639 assert!(read_end.has_writers());
640 assert!(write_end.has_readers());
641 }
642
643 #[test_case]
644 fn test_pipe_basic_io() {
645 let (read_end, write_end) = UnidirectionalPipe::create_pair_raw(1024);
646
647 let data = b"Hello, Pipe!";
648 let written = write_end.write(data).unwrap();
649 assert_eq!(written, data.len());
650
651 let mut buffer = [0u8; 1024];
652 let read = read_end.read(&mut buffer).unwrap();
653 assert_eq!(read, data.len());
654 assert_eq!(&buffer[..read], data);
655 }
656
657 #[test_case]
658 fn test_pipe_nonblocking_behaviour() {
659 let (read_end, write_end) = UnidirectionalPipe::create_pair_raw(4);
660
661 crate::object::capability::selectable::Selectable::set_nonblocking(&read_end, true);
662 crate::object::capability::selectable::Selectable::set_nonblocking(&write_end, true);
663
664 let mut buffer = [0u8; 1];
665 let read_result = read_end.read(&mut buffer);
666 assert!(matches!(read_result, Err(StreamError::WouldBlock)));
667
668 let data = [1u8, 2, 3, 4];
669 assert_eq!(write_end.write(&data).unwrap(), data.len());
670
671 let would_block = write_end.write(&[5]);
672 assert!(matches!(would_block, Err(StreamError::WouldBlock)));
673 }
674
675 #[test_case]
676 fn test_pipe_reference_counting() {
677 let (read_end, write_end) = UnidirectionalPipe::create_pair_raw(1024);
678
679 assert_eq!(read_end.peer_count(), 1); assert_eq!(write_end.peer_count(), 1); assert!(read_end.has_writers());
683 assert!(write_end.has_readers());
684
685 {
687 let state = read_end.endpoint.data.state.lock();
688 assert_eq!(state.reader_count, 1);
689 assert_eq!(state.writer_count, 1);
690 }
691
692 let read_end_clone = read_end.clone();
694
695 {
697 let state = read_end.endpoint.data.state.lock();
698 assert_eq!(state.reader_count, 2); assert_eq!(state.writer_count, 1); }
701
702 assert_eq!(read_end.peer_count(), 1); assert_eq!(write_end.peer_count(), 2); assert_eq!(read_end_clone.peer_count(), 1); let write_end_clone = write_end.clone();
708
709 {
711 let state = read_end.endpoint.data.state.lock();
712 assert_eq!(state.reader_count, 2); assert_eq!(state.writer_count, 2); }
715
716 assert_eq!(read_end.peer_count(), 2); assert_eq!(write_end.peer_count(), 2); assert_eq!(write_end_clone.peer_count(), 2); drop(read_end_clone);
722 assert_eq!(read_end.peer_count(), 2); assert_eq!(write_end.peer_count(), 1); drop(write_end_clone);
727 assert_eq!(read_end.peer_count(), 1); assert_eq!(write_end.peer_count(), 1); }
730
731 #[test_case]
732 fn test_pipe_broken_pipe_detection() {
733 let (read_end, write_end) = UnidirectionalPipe::create_pair_raw(1024);
734
735 assert!(read_end.is_connected());
737 assert!(write_end.is_connected());
738 assert!(read_end.has_writers());
739 assert!(write_end.has_readers());
740
741 drop(write_end);
743
744 assert!(!read_end.has_writers());
746
747 let mut buffer = [0u8; 10];
749 let bytes_read = read_end.read(&mut buffer).unwrap();
750 assert_eq!(bytes_read, 0); }
752
753 #[test_case]
754 fn test_pipe_write_to_closed_pipe() {
755 let (read_end, write_end) = UnidirectionalPipe::create_pair_raw(1024);
756
757 drop(read_end);
759
760 assert!(!write_end.has_readers());
762
763 let data = b"Should fail";
765 let result = write_end.write(data);
766 assert!(result.is_err());
767 if let Err(StreamError::BrokenPipe) = result {
768 } else {
770 panic!("Expected BrokenPipe error, got: {:?}", result);
771 }
772 }
773
774 #[test_case]
775 fn test_pipe_clone_independent_operations() {
776 let (read_end, write_end) = UnidirectionalPipe::create_pair_raw(1024);
777
778 let read_clone = read_end.clone();
780 let write_clone = write_end.clone();
781
782 let data1 = b"From original";
784 write_end.write(data1).unwrap();
785
786 let data2 = b" and clone";
788 write_clone.write(data2).unwrap();
789
790 let mut buffer1 = [0u8; 50];
792 let bytes1 = read_end.read(&mut buffer1).unwrap();
793 let total_expected = data1.len() + data2.len();
794 assert_eq!(bytes1, total_expected);
795
796 let mut expected_data = Vec::new();
798 expected_data.extend_from_slice(data1);
799 expected_data.extend_from_slice(data2);
800 assert_eq!(&buffer1[..bytes1], &expected_data);
801
802 let mut buffer2 = [0u8; 10];
804 let bytes2 = read_clone.read(&mut buffer2);
805 assert!(bytes2.is_err() || bytes2.unwrap() == 0);
806 }
807
808 #[test_case]
809 fn test_pipe_buffer_management() {
810 let (read_end, write_end) = UnidirectionalPipe::create_pair_raw(10); assert_eq!(read_end.buffer_size(), 10);
814 assert_eq!(write_end.buffer_size(), 10);
815 assert_eq!(read_end.available_bytes(), 0);
816
817 let data = b"12345";
819 write_end.write(data).unwrap();
820 assert_eq!(read_end.available_bytes(), 5);
821 assert_eq!(write_end.available_bytes(), 5);
822
823 let more_data = b"67890";
825 write_end.write(more_data).unwrap();
826 assert_eq!(read_end.available_bytes(), 10);
827
828 let overflow_data = b"X";
830 let result = write_end.write(overflow_data);
831 assert!(result.is_err() || result.unwrap() == 0);
832
833 let mut buffer = [0u8; 3];
835 let bytes_read = read_end.read(&mut buffer).unwrap();
836 assert_eq!(bytes_read, 3);
837 assert_eq!(&buffer, b"123");
838 assert_eq!(read_end.available_bytes(), 7);
839
840 let new_data = b"XYZ";
842 let bytes_written = write_end.write(new_data).unwrap();
843 assert_eq!(bytes_written, 3);
844 assert_eq!(read_end.available_bytes(), 10);
845 }
846
847 #[test_case]
851 fn test_kernel_object_pipe_dup_semantics() {
852 let (read_obj, write_obj) = UnidirectionalPipe::create_pair(1024);
854
855 if let Some(read_pipe) = read_obj.as_pipe() {
857 if let Some(write_pipe) = write_obj.as_pipe() {
858 assert_eq!(read_pipe.peer_count(), 1); assert_eq!(write_pipe.peer_count(), 1); assert!(read_pipe.has_writers());
862 assert!(write_pipe.has_readers());
863 } else {
864 panic!("write_obj should be a pipe");
865 }
866 } else {
867 panic!("read_obj should be a pipe");
868 }
869
870 let read_obj_cloned = read_obj.clone();
872
873 if let Some(read_pipe) = read_obj.as_pipe() {
875 if let Some(write_pipe) = write_obj.as_pipe() {
876 if let Some(read_pipe_cloned) = read_obj_cloned.as_pipe() {
877 assert_eq!(write_pipe.peer_count(), 2); assert_eq!(read_pipe.peer_count(), 1); assert_eq!(read_pipe_cloned.peer_count(), 1); assert!(read_pipe.has_writers());
884 assert!(write_pipe.has_readers());
885 assert!(read_pipe_cloned.has_writers());
886 } else {
887 panic!("read_obj_cloned should be a pipe");
888 }
889 }
890 }
891 }
892
893 #[test_case]
894 fn test_kernel_object_pipe_write_dup_semantics() {
895 let (read_obj, write_obj) = UnidirectionalPipe::create_pair(1024);
897
898 let write_obj_cloned = write_obj.clone();
900
901 if let Some(read_pipe) = read_obj.as_pipe() {
903 if let Some(write_pipe) = write_obj.as_pipe() {
904 if let Some(write_pipe_cloned) = write_obj_cloned.as_pipe() {
905 assert_eq!(read_pipe.peer_count(), 2); assert_eq!(write_pipe.peer_count(), 1); assert_eq!(write_pipe_cloned.peer_count(), 1); assert!(read_pipe.has_writers());
912 assert!(write_pipe.has_readers());
913 assert!(write_pipe_cloned.has_readers());
914 } else {
915 panic!("write_obj_cloned should be a pipe");
916 }
917 }
918 }
919 }
920
921 #[test_case]
922 fn test_kernel_object_pipe_dup_io_operations() {
923 let (read_obj, write_obj) = UnidirectionalPipe::create_pair(1024);
925
926 let read_obj_cloned = read_obj.clone();
928 let write_obj_cloned = write_obj.clone();
929
930 if let Some(write_stream) = write_obj.as_stream() {
932 let data1 = b"Hello from original writer";
933 let written = write_stream.write(data1).unwrap();
934 assert_eq!(written, data1.len());
935 }
936
937 if let Some(write_stream_cloned) = write_obj_cloned.as_stream() {
939 let data2 = b" and cloned writer";
940 let written = write_stream_cloned.write(data2).unwrap();
941 assert_eq!(written, data2.len());
942 }
943
944 if let Some(read_stream) = read_obj.as_stream() {
946 let mut buffer = [0u8; 100];
947 let bytes_read = read_stream.read(&mut buffer).unwrap();
948 let total_expected = b"Hello from original writer and cloned writer".len();
949 assert_eq!(bytes_read, total_expected);
950 assert_eq!(
951 &buffer[..bytes_read],
952 b"Hello from original writer and cloned writer"
953 );
954 }
955
956 if let Some(read_stream_cloned) = read_obj_cloned.as_stream() {
958 let mut buffer = [0u8; 10];
959 let result = read_stream_cloned.read(&mut buffer);
960 assert!(result.is_err() || result.unwrap() == 0);
962 }
963 }
964
965 #[test_case]
966 fn test_kernel_object_pipe_dup_broken_pipe_detection() {
967 let (read_obj, write_obj) = UnidirectionalPipe::create_pair(1024);
969
970 let read_obj_cloned = read_obj.clone();
972
973 if let Some(write_pipe) = write_obj.as_pipe() {
975 assert_eq!(write_pipe.peer_count(), 2);
976 }
977
978 drop(read_obj);
980
981 if let Some(write_pipe) = write_obj.as_pipe() {
983 assert_eq!(write_pipe.peer_count(), 1);
984 assert!(write_pipe.has_readers());
985 }
986
987 if let Some(write_stream) = write_obj.as_stream() {
989 let data = b"Still works";
990 let written = write_stream.write(data).unwrap();
991 assert_eq!(written, data.len());
992 }
993
994 drop(read_obj_cloned);
996
997 if let Some(write_pipe) = write_obj.as_pipe() {
999 assert_eq!(write_pipe.peer_count(), 0);
1000 assert!(!write_pipe.has_readers());
1001 }
1002
1003 if let Some(write_stream) = write_obj.as_stream() {
1005 let data = b"Should fail";
1006 let result = write_stream.write(data);
1007 assert!(result.is_err());
1008 if let Err(StreamError::BrokenPipe) = result {
1009 } else {
1011 panic!("Expected BrokenPipe error, got: {:?}", result);
1012 }
1013 }
1014 }
1015
1016 #[test_case]
1017 fn test_kernel_object_pipe_dup_vs_arc_clone_comparison() {
1018 let (read_obj, write_obj) = UnidirectionalPipe::create_pair(1024);
1022
1023 let _read_obj_dup = read_obj.clone();
1025
1026 if let Some(write_pipe) = write_obj.as_pipe() {
1028 assert_eq!(write_pipe.peer_count(), 2); }
1030
1031 if let Some(cloneable) = read_obj.as_cloneable() {
1036 let _custom_cloned = cloneable.custom_clone();
1038
1039 if let Some(write_pipe) = write_obj.as_pipe() {
1041 assert_eq!(write_pipe.peer_count(), 3); }
1043 } else {
1044 panic!("Pipe should implement CloneOps capability");
1045 }
1046 }
1047}