kernel/ipc/
event.rs

1//! Event-based Inter-Process Communication
2//!
3//! This module provides a unified event system for Scarlet OS that handles
4//! different types of event delivery mechanisms:
5//! - Immediate: Force delivery regardless of receiver state
6//! - Notification: One-way, best-effort delivery
7//! - Subscription: Channel-based pub/sub delivery
8//! - Group: Broadcast delivery to multiple targets
9
10use alloc::collections::BTreeMap;
11use alloc::{collections::VecDeque, format, string::String, sync::Arc, vec::Vec};
12use hashbrown::HashMap;
13use spin::Mutex;
14
15/// Type alias for task identifiers
16pub type TaskId = u32;
17/// Type alias for group identifiers
18pub type GroupId = u32;
19/// Type alias for session identifiers
20pub type SessionId = u32;
21
22/// Event structure containing all event information
23///
24/// # Design Philosophy
25///
26/// This design separates **delivery mechanism** from **event content**:
27/// - `delivery`: HOW the event is delivered (direct, channel, group, broadcast)
28/// - `content`: WHAT the event represents (signal, message, notification)
29/// - `payload`: Additional data carried with the event
30/// - `metadata`: System-level tracking information
31#[derive(Debug, Clone)]
32pub struct Event {
33    /// Event delivery mechanism (routing and targeting)
34    pub delivery: EventDelivery,
35
36    /// Event content (what this event represents)
37    pub content: EventContent,
38
39    /// Event payload data (additional data)
40    pub payload: EventPayload,
41
42    /// Event metadata (system tracking)
43    pub metadata: EventMetadata,
44}
45
46/// Event delivery mechanisms
47///
48/// Defines HOW an event is delivered, independent of WHAT the event represents
49#[derive(Debug, Clone)]
50pub enum EventDelivery {
51    /// Direct task communication (1:1)
52    Direct {
53        target: TaskId,
54        priority: EventPriority,
55        reliable: bool,
56    },
57
58    /// Channel-based communication (1:many, pub/sub)
59    Channel {
60        channel_id: String,
61        create_if_missing: bool,
62        priority: EventPriority,
63    },
64
65    /// Group broadcast (1:many, membership-based)
66    Group {
67        group_target: GroupTarget,
68        priority: EventPriority,
69        reliable: bool,
70    },
71
72    /// System-wide broadcast (1:all)
73    Broadcast {
74        priority: EventPriority,
75        reliable: bool,
76    },
77}
78
79/// Event content types
80///
81/// Defines WHAT the event represents, independent of HOW it's delivered
82#[derive(Debug, Clone)]
83pub enum EventContent {
84    /// Process control events (equivalent to signals, but OS-agnostic)
85    ProcessControl(ProcessControlType),
86
87    /// Application-level message with type
88    Message {
89        message_type: u32,
90        category: MessageCategory,
91    },
92
93    /// System notification
94    Notification(NotificationType),
95
96    /// Custom event defined by ABI or application
97    Custom {
98        namespace: String, // e.g., "linux", "xv6", "user_app_123"
99        event_id: u32,
100    },
101}
102
103/// Process control event types
104///
105/// These represent universal process control operations that exist across
106/// different operating systems (Linux signals, Windows events, etc.)
107#[derive(Debug, Clone, Copy, PartialEq, Eq)]
108pub enum ProcessControlType {
109    Terminate,  // Graceful termination
110    Kill,       // Force termination
111    Stop,       // Suspend execution
112    Continue,   // Resume execution
113    Interrupt,  // User interrupt (Ctrl+C)
114    Quit,       // Quit with core dump
115    Hangup,     // Terminal hangup
116    ChildExit,  // Child process exited
117    PipeBroken, // Broken pipe
118    Alarm,      // Timer alarm
119    IoReady,    // I/O ready
120    User(u32),  // User-defined control signal (0-65535)
121                // Add more as needed
122}
123
124/// Message categories (for structured communication)
125#[derive(Debug, Clone, Copy, PartialEq, Eq)]
126pub enum MessageCategory {
127    Control,    // Control messages
128    Data,       // Data messages
129    Status,     // Status updates
130    Error,      // Error notifications
131    Custom(u8), // Custom category (0-255)
132}
133
134/// System notification types
135#[derive(Debug, Clone, Copy, PartialEq, Eq)]
136pub enum NotificationType {
137    TaskCompleted,
138    MemoryLow,
139    DeviceConnected,
140    DeviceDisconnected,
141    FilesystemFull,
142    NetworkChange,
143    SystemShutdown,
144    // Add more as needed
145}
146
147/// Group targeting options
148#[derive(Debug, Clone, PartialEq)]
149pub enum GroupTarget {
150    /// Specific task group
151    TaskGroup(GroupId),
152
153    /// All tasks in the system
154    AllTasks,
155
156    /// Session-based group
157    Session(SessionId),
158
159    /// Custom named group
160    Custom(String),
161}
162/// Event payload data
163#[derive(Debug, Clone)]
164pub enum EventPayload {
165    /// No data
166    Empty,
167
168    /// Integer value
169    Integer(i64),
170
171    /// Byte array
172    Bytes(Vec<u8>),
173
174    /// String data
175    String(String),
176
177    /// Custom binary data
178    Custom(Vec<u8>),
179}
180
181/// Event metadata
182#[derive(Debug, Clone)]
183pub struct EventMetadata {
184    /// Sender task ID
185    pub sender: Option<u32>,
186
187    /// Event priority
188    pub priority: EventPriority,
189
190    /// Timestamp
191    pub timestamp: u64,
192
193    /// Unique event ID
194    pub event_id: u64,
195}
196
197impl EventMetadata {
198    /// Create new metadata with current timestamp
199    pub fn new() -> Self {
200        Self {
201            sender: None, // Will be filled by EventManager
202            priority: EventPriority::Normal,
203            timestamp: 0, // TODO: integrate with timer system
204            event_id: generate_event_id(),
205        }
206    }
207
208    /// Create new metadata with specified priority
209    pub fn with_priority(priority: EventPriority) -> Self {
210        Self {
211            sender: None, // Will be filled by EventManager
212            priority,
213            timestamp: 0, // TODO: integrate with timer system
214            event_id: generate_event_id(),
215        }
216    }
217}
218
219/// Event priority levels
220#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
221pub enum EventPriority {
222    Low = 1,
223    Normal = 2,
224    High = 3,
225    Critical = 4,
226}
227
228/// Event filter for handler registration
229#[derive(Debug, Clone)]
230pub enum EventFilter {
231    /// All events
232    All,
233
234    /// Specific event type
235    EventType(EventTypeFilter),
236
237    /// Specific event ID
238    EventId(u32),
239
240    /// Specific channel
241    Channel(String),
242
243    /// Specific sender
244    Sender(u32),
245
246    /// Custom filter function
247    Custom(fn(&Event) -> bool),
248}
249
250impl EventFilter {
251    /// Check if this filter matches the given event
252    pub fn matches(&self, event: &Event) -> bool {
253        match self {
254            EventFilter::All => true,
255
256            EventFilter::EventType(type_filter) => {
257                match type_filter {
258                    EventTypeFilter::AnyDirect => {
259                        matches!(event.delivery, EventDelivery::Direct { .. })
260                    }
261                    EventTypeFilter::AnyChannel => {
262                        matches!(event.delivery, EventDelivery::Channel { .. })
263                    }
264                    EventTypeFilter::AnyGroup => {
265                        matches!(event.delivery, EventDelivery::Group { .. })
266                    }
267                    EventTypeFilter::AnyBroadcast => {
268                        matches!(event.delivery, EventDelivery::Broadcast { .. })
269                    }
270
271                    EventTypeFilter::Direct(content_id) => {
272                        if let EventDelivery::Direct { .. } = &event.delivery {
273                            // Check if content matches the expected ID
274                            match &event.content {
275                                EventContent::ProcessControl(ptype) => {
276                                    // Map ProcessControlType to ID for filtering
277                                    let type_id = match ptype {
278                                        ProcessControlType::Terminate => 1,
279                                        ProcessControlType::Kill => 2,
280                                        ProcessControlType::Stop => 3,
281                                        ProcessControlType::Continue => 4,
282                                        ProcessControlType::Interrupt => 7,
283                                        _ => 0,
284                                    };
285                                    type_id == *content_id
286                                }
287                                EventContent::Custom { event_id, .. } => *event_id == *content_id,
288                                _ => false,
289                            }
290                        } else {
291                            false
292                        }
293                    }
294
295                    EventTypeFilter::Channel(channel_name) => {
296                        if let EventDelivery::Channel { channel_id, .. } = &event.delivery {
297                            channel_id == channel_name
298                        } else {
299                            false
300                        }
301                    }
302
303                    EventTypeFilter::Group(group_id) => {
304                        if let EventDelivery::Group {
305                            group_target: GroupTarget::TaskGroup(id),
306                            ..
307                        } = &event.delivery
308                        {
309                            id == group_id
310                        } else {
311                            false
312                        }
313                    }
314
315                    EventTypeFilter::Broadcast(event_id) => {
316                        if let EventDelivery::Broadcast { .. } = &event.delivery {
317                            // Check if event content matches the broadcast ID
318                            match &event.content {
319                                EventContent::Custom { event_id: id, .. } => id == event_id,
320                                _ => event.metadata.event_id == *event_id as u64,
321                            }
322                        } else {
323                            false
324                        }
325                    }
326                }
327            }
328
329            EventFilter::EventId(event_id) => {
330                // Check event_id in metadata
331                event.metadata.event_id == *event_id as u64
332            }
333
334            EventFilter::Channel(channel_name) => {
335                if let EventDelivery::Channel { channel_id, .. } = &event.delivery {
336                    channel_id == channel_name
337                } else {
338                    false
339                }
340            }
341
342            EventFilter::Sender(sender_id) => event.metadata.sender == Some(*sender_id),
343
344            EventFilter::Custom(filter_fn) => filter_fn(event),
345        }
346    }
347}
348
349/// Event type filter
350#[derive(Debug, Clone)]
351pub enum EventTypeFilter {
352    /// Any direct event
353    AnyDirect,
354
355    /// Any channel event
356    AnyChannel,
357
358    /// Any group event
359    AnyGroup,
360
361    /// Any broadcast event
362    AnyBroadcast,
363
364    /// Specific direct event
365    Direct(u32),
366
367    /// Specific channel
368    Channel(String),
369
370    /// Specific group
371    Group(GroupId),
372
373    /// Specific broadcast
374    Broadcast(u32),
375}
376
377/// Delivery configuration
378#[derive(Debug, Clone)]
379pub struct DeliveryConfig {
380    /// Buffer size for queued events
381    pub buffer_size: usize,
382
383    /// Delivery timeout in milliseconds
384    pub timeout_ms: Option<u64>,
385
386    /// Retry count on failure
387    pub retry_count: u32,
388
389    /// Failure policy
390    pub failure_policy: FailurePolicy,
391}
392
393impl Default for DeliveryConfig {
394    fn default() -> Self {
395        Self {
396            buffer_size: 1024,
397            timeout_ms: Some(5000),
398            retry_count: 3,
399            failure_policy: FailurePolicy::Log,
400        }
401    }
402}
403
404/// Failure handling policy
405#[derive(Debug, Clone)]
406pub enum FailurePolicy {
407    /// Ignore failures
408    Ignore,
409
410    /// Log failures
411    Log,
412
413    /// Notify sender of failure
414    NotifySender,
415
416    /// Generate system event
417    SystemEvent,
418}
419
420/// Event system errors
421#[derive(Debug, Clone)]
422pub enum EventError {
423    /// Target not found
424    TargetNotFound,
425
426    /// Permission denied
427    PermissionDenied,
428
429    /// Delivery failed
430    DeliveryFailed,
431
432    /// Buffer full
433    BufferFull,
434
435    /// Operation timeout
436    Timeout,
437
438    /// Invalid configuration
439    InvalidConfiguration,
440
441    /// Channel not found
442    ChannelNotFound,
443
444    /// Group not found
445    GroupNotFound,
446
447    /// Other error
448    Other(String),
449}
450
451/// Event configuration for delivery settings
452#[derive(Debug, Clone)]
453pub struct EventConfig {
454    /// Default buffer size
455    pub default_buffer_size: usize,
456
457    /// Timeout settings
458    pub default_timeout_ms: u64,
459
460    /// Maximum number of channels
461    pub max_channels: usize,
462
463    /// Maximum number of groups
464    pub max_groups: usize,
465}
466
467impl Default for EventConfig {
468    fn default() -> Self {
469        Self {
470            default_buffer_size: 64,
471            default_timeout_ms: 1000,
472            max_channels: 1024,
473            max_groups: 256,
474        }
475    }
476}
477
478/// Task-specific event queue entry
479#[derive(Debug, Clone)]
480pub struct TaskEventQueue {
481    /// Events sorted by priority (higher priority first)
482    events: BTreeMap<EventPriority, VecDeque<Event>>,
483    /// Total count of queued events
484    total_count: usize,
485}
486
487impl TaskEventQueue {
488    pub fn new() -> Self {
489        Self {
490            events: BTreeMap::new(),
491            total_count: 0,
492        }
493    }
494
495    /// Add event to queue, returns true if this was the first event (0->1 transition)
496    fn enqueue(&mut self, event: Event) -> bool {
497        let was_empty = self.total_count == 0;
498        let priority = event.metadata.priority;
499
500        self.events
501            .entry(priority)
502            .or_insert_with(VecDeque::new)
503            .push_back(event);
504        self.total_count += 1;
505
506        was_empty
507    }
508
509    /// Dequeue highest priority event
510    pub fn dequeue(&mut self) -> Option<Event> {
511        // Find the highest priority (largest value) that has events
512        // BTreeMap iterates in ascending order by default, so we need to find the max
513        let priority_to_dequeue = {
514            self.events
515                .iter()
516                .filter(|(_, queue)| !queue.is_empty())
517                .map(|(&priority, _)| priority)
518                .max()
519        }?;
520
521        // Dequeue from the highest priority queue
522        if let Some(queue) = self.events.get_mut(&priority_to_dequeue) {
523            if let Some(event) = queue.pop_front() {
524                self.total_count -= 1;
525                if queue.is_empty() {
526                    self.events.remove(&priority_to_dequeue);
527                }
528                return Some(event);
529            }
530        }
531
532        None
533    }
534
535    /// Check if queue is empty
536    pub fn is_empty(&self) -> bool {
537        self.total_count == 0
538    }
539
540    /// Get total number of queued events
541    pub fn len(&self) -> usize {
542        self.total_count
543    }
544}
545
546/// Event Manager - Main implementation of the event system
547pub struct EventManager {
548    /// Task group memberships
549    groups: Mutex<HashMap<GroupId, Vec<u32>>>,
550    /// Session memberships
551    sessions: Mutex<HashMap<SessionId, Vec<u32>>>,
552    /// Named/custom group memberships
553    named_groups: Mutex<HashMap<String, Vec<u32>>>,
554
555    /// Delivery configurations per task
556    configs: Mutex<HashMap<u32, DeliveryConfig>>,
557
558    /// Task-specific event filters (handler_id, filter)
559    task_filters: Mutex<HashMap<u32, Vec<(usize, EventFilter)>>>,
560
561    /// Next event ID
562    #[allow(dead_code)]
563    next_event_id: Mutex<u64>,
564
565    /// Channel registry - EventManager only manages channels, channels manage their own subscriptions
566    channels: Mutex<HashMap<String, Arc<EventChannelObject>>>,
567}
568
569impl EventManager {
570    /// Create a new EventManager
571    pub fn new() -> Self {
572        Self {
573            groups: Mutex::new(HashMap::new()),
574            sessions: Mutex::new(HashMap::new()),
575            named_groups: Mutex::new(HashMap::new()),
576            configs: Mutex::new(HashMap::new()),
577            task_filters: Mutex::new(HashMap::new()),
578            next_event_id: Mutex::new(1),
579            channels: Mutex::new(HashMap::new()),
580        }
581    }
582
583    /// Get the global EventManager instance
584    pub fn get_manager() -> &'static EventManager {
585        static INSTANCE: spin::once::Once<EventManager> = spin::once::Once::new();
586        INSTANCE.call_once(|| EventManager::new())
587    }
588
589    /// Helper: get the currently running task id, if available
590    fn get_current_task_id(&self) -> Option<u32> {
591        #[cfg(test)]
592        {
593            // In unit tests, there is no real scheduler context
594            return Some(1);
595        }
596        #[cfg(not(test))]
597        {
598            let cpu = crate::arch::get_cpu();
599            let cpu_id = cpu.get_cpuid();
600            let sched = crate::sched::scheduler::get_scheduler();
601            if let Some(task) = sched.get_current_task(cpu_id) {
602                return Some(task.get_id() as u32);
603            }
604            None
605        }
606    }
607
608    /// Create or get an event channel as a KernelObject handle
609    ///
610    /// This method creates an EventChannel that can be inserted into a HandleTable,
611    /// providing consistent resource management with other kernel objects.
612    pub fn create_channel(&self, name: String) -> crate::object::KernelObject {
613        let mut channels = self.channels.lock();
614
615        let channel = channels
616            .entry(name.clone())
617            .or_insert_with(|| Arc::new(EventChannelObject::new(name.clone())))
618            .clone();
619
620        crate::object::KernelObject::EventChannel(channel)
621    }
622
623    /// Create a subscription to a channel as a KernelObject handle
624    ///
625    /// This method creates an EventSubscription that can be inserted into a HandleTable,
626    /// allowing tasks to receive events through the standard handle interface.
627    pub fn create_subscription(
628        &self,
629        channel_name: String,
630        task_id: u32,
631    ) -> Result<crate::object::KernelObject, EventError> {
632        // Get or create the channel first
633        let mut channels = self.channels.lock();
634        let channel = channels
635            .entry(channel_name.clone())
636            .or_insert_with(|| Arc::new(EventChannelObject::new(channel_name.clone())))
637            .clone();
638        drop(channels);
639
640        // Create subscription through the channel
641        let subscription = channel.create_subscription(task_id)?;
642        Ok(crate::object::KernelObject::EventSubscription(subscription))
643    }
644
645    /// Send an event
646    pub fn send_event(&self, mut event: Event) -> Result<(), EventError> {
647        // Auto-fill metadata: sender and timestamp
648        if event.metadata.sender.is_none() {
649            event.metadata.sender = self.get_current_task_id();
650        }
651        // Use kernel timer tick as timestamp source
652        event.metadata.timestamp = crate::timer::get_tick();
653
654        match event.delivery.clone() {
655            EventDelivery::Direct {
656                target,
657                priority,
658                reliable,
659            } => self.deliver_direct(event, target, priority, reliable),
660            EventDelivery::Channel {
661                channel_id,
662                create_if_missing,
663                priority,
664            } => self.deliver_to_channel(event, &channel_id, create_if_missing, priority),
665            EventDelivery::Group {
666                group_target,
667                priority,
668                reliable,
669            } => self.deliver_to_group(event, &group_target, priority, reliable),
670            EventDelivery::Broadcast { priority, reliable } => {
671                self.deliver_broadcast(event, priority, reliable)
672            }
673        }
674    }
675
676    /// Register an event filter for a task (without handler id)
677    pub fn register_filter(&self, task_id: u32, filter: EventFilter) -> Result<(), EventError> {
678        let mut task_filters = self.task_filters.lock();
679        let filters = task_filters.entry(task_id).or_insert_with(Vec::new);
680        // Assign a simple handler id: next index
681        let handler_id = filters.len();
682        filters.push((handler_id, filter));
683        Ok(())
684    }
685
686    /// Register an event filter for a task with explicit handler id
687    pub fn register_filter_with_id(
688        &self,
689        task_id: u32,
690        handler_id: usize,
691        filter: EventFilter,
692    ) -> Result<(), EventError> {
693        let mut task_filters = self.task_filters.lock();
694        let filters = task_filters.entry(task_id).or_insert_with(Vec::new);
695        // If an entry with same handlerId exists, replace it
696        if let Some(slot) = filters.iter_mut().find(|(hid, _)| *hid == handler_id) {
697            *slot = (handler_id, filter);
698        } else {
699            filters.push((handler_id, filter));
700        }
701        Ok(())
702    }
703
704    /// Unregister a filter by handler id
705    pub fn unregister_filter_by_id(
706        &self,
707        task_id: u32,
708        handler_id: usize,
709    ) -> Result<(), EventError> {
710        let mut task_filters = self.task_filters.lock();
711        if let Some(filters) = task_filters.get_mut(&task_id) {
712            filters.retain(|(hid, _)| *hid != handler_id);
713        }
714        Ok(())
715    }
716
717    /// Get a snapshot of filters for a task
718    pub fn get_filters_for_task(&self, task_id: u32) -> Vec<(usize, EventFilter)> {
719        let task_filters = self.task_filters.lock();
720        task_filters.get(&task_id).cloned().unwrap_or_default()
721    }
722
723    /// Remove all filters for a task
724    pub fn clear_filters(&self, task_id: u32) -> Result<(), EventError> {
725        let mut task_filters = self.task_filters.lock();
726        task_filters.remove(&task_id);
727        Ok(())
728    }
729
730    /// Subscribe to a channel
731    pub fn subscribe_channel(&self, channel: &str) -> Result<(), EventError> {
732        // Resolve current task ID from scheduler
733        let current_task_id = self
734            .get_current_task_id()
735            .ok_or_else(|| EventError::Other("No current task".into()))?;
736
737        // Get or create the channel
738        let mut channels = self.channels.lock();
739        let channel_obj = channels
740            .entry(channel.into())
741            .or_insert_with(|| Arc::new(EventChannelObject::new(channel.into())))
742            .clone();
743        drop(channels);
744
745        // Subscribe through the channel (ignore the returned subscription object for backward compatibility)
746        let _ = channel_obj.subscribe(current_task_id)?;
747        Ok(())
748    }
749
750    /// Unsubscribe from a channel
751    pub fn unsubscribe_channel(&self, channel: &str) -> Result<(), EventError> {
752        let current_task_id = self
753            .get_current_task_id()
754            .ok_or_else(|| EventError::Other("No current task".into()))?;
755
756        let channels = self.channels.lock();
757        if let Some(channel_obj) = channels.get(channel) {
758            channel_obj.unsubscribe(current_task_id)
759        } else {
760            Err(EventError::ChannelNotFound)
761        }
762    }
763
764    /// Join a task group
765    pub fn join_group(&self, group_id: GroupId) -> Result<(), EventError> {
766        let current_task_id = self
767            .get_current_task_id()
768            .ok_or_else(|| EventError::Other("No current task".into()))?;
769
770        let mut groups = self.groups.lock();
771        let group_members = groups.entry(group_id).or_insert_with(Vec::new);
772
773        if !group_members.contains(&current_task_id) {
774            group_members.push(current_task_id);
775        }
776
777        Ok(())
778    }
779
780    /// Leave a task group
781    pub fn leave_group(&self, group_id: GroupId) -> Result<(), EventError> {
782        let current_task_id = self
783            .get_current_task_id()
784            .ok_or_else(|| EventError::Other("No current task".into()))?;
785
786        let mut groups = self.groups.lock();
787        if let Some(group_members) = groups.get_mut(&group_id) {
788            group_members.retain(|&task_id| task_id != current_task_id);
789        }
790
791        Ok(())
792    }
793
794    /// Join a session group
795    pub fn join_session(&self, session_id: SessionId) -> Result<(), EventError> {
796        let current_task_id = self
797            .get_current_task_id()
798            .ok_or_else(|| EventError::Other("No current task".into()))?;
799        let mut sessions = self.sessions.lock();
800        let members = sessions.entry(session_id).or_insert_with(Vec::new);
801        if !members.contains(&current_task_id) {
802            members.push(current_task_id);
803        }
804        Ok(())
805    }
806
807    /// Leave a session group
808    pub fn leave_session(&self, session_id: SessionId) -> Result<(), EventError> {
809        let current_task_id = self
810            .get_current_task_id()
811            .ok_or_else(|| EventError::Other("No current task".into()))?;
812        let mut sessions = self.sessions.lock();
813        if let Some(members) = sessions.get_mut(&session_id) {
814            members.retain(|&tid| tid != current_task_id);
815        }
816        Ok(())
817    }
818
819    /// Join a named/custom group
820    pub fn join_named_group(&self, name: String) -> Result<(), EventError> {
821        let current_task_id = self
822            .get_current_task_id()
823            .ok_or_else(|| EventError::Other("No current task".into()))?;
824        let mut named = self.named_groups.lock();
825        let members = named.entry(name).or_insert_with(Vec::new);
826        if !members.contains(&current_task_id) {
827            members.push(current_task_id);
828        }
829        Ok(())
830    }
831
832    /// Leave a named/custom group
833    pub fn leave_named_group(&self, name: &str) -> Result<(), EventError> {
834        let current_task_id = self
835            .get_current_task_id()
836            .ok_or_else(|| EventError::Other("No current task".into()))?;
837        let mut named = self.named_groups.lock();
838        if let Some(members) = named.get_mut(name) {
839            members.retain(|&tid| tid != current_task_id);
840        }
841        Ok(())
842    }
843
844    /// Configure delivery settings
845    pub fn configure_delivery(&self, config: DeliveryConfig) -> Result<(), EventError> {
846        let current_task_id = self
847            .get_current_task_id()
848            .ok_or_else(|| EventError::Other("No current task".into()))?;
849
850        let mut configs = self.configs.lock();
851        configs.insert(current_task_id, config);
852
853        Ok(())
854    }
855
856    /// Get a task's delivery configuration or the default if none is set
857    fn get_task_config_or_default(&self, task_id: u32) -> DeliveryConfig {
858        self.configs
859            .lock()
860            .get(&task_id)
861            .cloned()
862            .unwrap_or_else(DeliveryConfig::default)
863    }
864
865    /// Handle delivery failures according to the sender's configured policy
866    fn handle_delivery_failure(&self, sender: Option<u32>, err: &EventError, event: &Event) {
867        // Determine policy from sender's config if available, else default to Log
868        let policy = match sender {
869            Some(sid) => self.get_task_config_or_default(sid).failure_policy.clone(),
870            None => FailurePolicy::Log,
871        };
872
873        match policy {
874            FailurePolicy::Ignore => { /* do nothing */ }
875            FailurePolicy::Log => {
876                crate::early_println!(
877                    "[EventManager] Delivery failure: {:?}, sender={:?}, delivery={:?}",
878                    err,
879                    sender,
880                    event.delivery
881                );
882            }
883            FailurePolicy::NotifySender => {
884                // Best-effort notify the sender without causing recursive failure handling
885                if let Some(sid) = sender {
886                    let notice = Event::direct_custom(
887                        sid,
888                        "system".into(),
889                        0x1001,
890                        EventPriority::Low,
891                        false,
892                        EventPayload::String(format!("Delivery failed: {:?}", err)),
893                    );
894                    let _ = self.deliver_to_task(sid, notice);
895                } else {
896                    // Fall back to logging when there is no sender
897                    crate::early_println!(
898                        "[EventManager] Delivery failure without sender: {:?}",
899                        err
900                    );
901                }
902            }
903            FailurePolicy::SystemEvent => {
904                // For now, log the failure to avoid recursive broadcasts. Can be expanded later.
905                crate::early_println!(
906                    "[EventManager] SystemEvent policy: delivery failure: {:?}, sender={:?}",
907                    err,
908                    sender
909                );
910            }
911        }
912    }
913
914    // === Internal Event Delivery Methods ===
915
916    /// Deliver direct event to specific task
917    fn deliver_direct(
918        &self,
919        event: Event,
920        target: TaskId,
921        _priority: EventPriority,
922        _reliable: bool,
923    ) -> Result<(), EventError> {
924        // Attempt delivery; if reliable, retry according to sender's config
925        let mut result = self.deliver_to_task(target, event.clone());
926        if result.is_err() && _reliable {
927            // Determine retry count from sender's config
928            let retries = match event.metadata.sender {
929                Some(sender) => self.get_task_config_or_default(sender).retry_count,
930                None => DeliveryConfig::default().retry_count,
931            };
932            let mut attempts = 0u32;
933            while attempts < retries {
934                // Simple immediate retry (no sleep to keep no_std constraints)
935                result = self.deliver_to_task(target, event.clone());
936                if result.is_ok() {
937                    break;
938                }
939                attempts += 1;
940            }
941            if let Err(ref e) = result {
942                self.handle_delivery_failure(event.metadata.sender, e, &event);
943            }
944        } else if let Err(ref e) = result {
945            // Not reliable, still honor failure policy for observability
946            self.handle_delivery_failure(event.metadata.sender, e, &event);
947        }
948        result
949    }
950
951    /// Deliver to channel subscribers
952    fn deliver_to_channel(
953        &self,
954        event: Event,
955        channel_id: &str,
956        create_if_missing: bool,
957        _priority: EventPriority,
958    ) -> Result<(), EventError> {
959        let mut channels = self.channels.lock();
960
961        if let Some(channel) = channels.get(channel_id) {
962            let channel = channel.clone();
963            drop(channels);
964            // Broadcast and handle per-target failures
965            let subscribers = channel.get_subscribers();
966            for task_id in subscribers {
967                if let Err(e) = self.deliver_to_task(task_id, event.clone()) {
968                    self.handle_delivery_failure(event.metadata.sender, &e, &event);
969                }
970            }
971            Ok(())
972        } else if create_if_missing {
973            // Create empty channel
974            let channel = Arc::new(EventChannelObject::new(channel_id.into()));
975            channels.insert(channel_id.into(), channel.clone());
976            drop(channels);
977            let subscribers = channel.get_subscribers();
978            for task_id in subscribers {
979                if let Err(e) = self.deliver_to_task(task_id, event.clone()) {
980                    self.handle_delivery_failure(event.metadata.sender, &e, &event);
981                }
982            }
983            Ok(())
984        } else {
985            Err(EventError::ChannelNotFound)
986        }
987    }
988
989    /// Deliver to group members
990    fn deliver_to_group(
991        &self,
992        event: Event,
993        group_target: &GroupTarget,
994        _priority: EventPriority,
995        _reliable: bool,
996    ) -> Result<(), EventError> {
997        match group_target {
998            GroupTarget::TaskGroup(group_id) => {
999                let groups = self.groups.lock();
1000                if let Some(members) = groups.get(group_id) {
1001                    let targets: alloc::vec::Vec<u32> = members.iter().cloned().collect();
1002                    drop(groups);
1003                    for &task_id in &targets {
1004                        if let Err(e) = self.deliver_to_task(task_id, event.clone()) {
1005                            self.handle_delivery_failure(event.metadata.sender, &e, &event);
1006                        }
1007                    }
1008                    Ok(())
1009                } else {
1010                    Err(EventError::GroupNotFound)
1011                }
1012            }
1013            GroupTarget::AllTasks => {
1014                let sched = crate::sched::scheduler::get_scheduler();
1015                let all_ids: alloc::vec::Vec<u32> = sched
1016                    .get_all_task_ids()
1017                    .into_iter()
1018                    .map(|x| x as u32)
1019                    .collect();
1020                for tid in all_ids {
1021                    if let Err(e) = self.deliver_to_task(tid, event.clone()) {
1022                        self.handle_delivery_failure(event.metadata.sender, &e, &event);
1023                    }
1024                }
1025                Ok(())
1026            }
1027            GroupTarget::Session(session_id) => {
1028                let sessions = self.sessions.lock();
1029                if let Some(members) = sessions.get(session_id) {
1030                    let targets: alloc::vec::Vec<u32> = members.iter().cloned().collect();
1031                    drop(sessions);
1032                    for &task_id in &targets {
1033                        if let Err(e) = self.deliver_to_task(task_id, event.clone()) {
1034                            self.handle_delivery_failure(event.metadata.sender, &e, &event);
1035                        }
1036                    }
1037                    Ok(())
1038                } else {
1039                    Err(EventError::GroupNotFound)
1040                }
1041            }
1042            GroupTarget::Custom(name) => {
1043                let named = self.named_groups.lock();
1044                if let Some(members) = named.get(name) {
1045                    let targets: alloc::vec::Vec<u32> = members.iter().cloned().collect();
1046                    drop(named);
1047                    for &task_id in &targets {
1048                        if let Err(e) = self.deliver_to_task(task_id, event.clone()) {
1049                            self.handle_delivery_failure(event.metadata.sender, &e, &event);
1050                        }
1051                    }
1052                    Ok(())
1053                } else {
1054                    Err(EventError::GroupNotFound)
1055                }
1056            }
1057        }
1058    }
1059
1060    /// Deliver broadcast event to all tasks
1061    fn deliver_broadcast(
1062        &self,
1063        event: Event,
1064        _priority: EventPriority,
1065        _reliable: bool,
1066    ) -> Result<(), EventError> {
1067        // broadcast to every task in the system
1068        let sched = crate::sched::scheduler::get_scheduler();
1069        let all_ids: alloc::vec::Vec<u32> = sched
1070            .get_all_task_ids()
1071            .into_iter()
1072            .map(|x| x as u32)
1073            .collect();
1074        for tid in all_ids {
1075            if let Err(e) = self.deliver_to_task(tid, event.clone()) {
1076                self.handle_delivery_failure(event.metadata.sender, &e, &event);
1077            }
1078        }
1079        Ok(())
1080    }
1081
1082    /// Deliver event to a specific task
1083    #[cfg(not(test))]
1084    pub fn deliver_to_task(&self, task_id: u32, event: Event) -> Result<(), EventError> {
1085        // Check if the event matches any of the task's filters
1086        let task_filters = self.task_filters.lock();
1087        if let Some(filters) = task_filters.get(&task_id) {
1088            // If task has filters, check if event matches any of them
1089            if !filters.is_empty() {
1090                let matches = filters.iter().any(|(_, filter)| filter.matches(&event));
1091                if !matches {
1092                    // Event doesn't match any filter, drop it
1093                    return Ok(());
1094                }
1095            }
1096            // If no filters are registered, allow all events (backward compatibility)
1097        }
1098        drop(task_filters); // Release the lock early
1099
1100        // Get the task and deliver event to its local queue
1101        if let Some(task) =
1102            crate::sched::scheduler::get_scheduler().get_task_by_id(task_id as usize)
1103        {
1104            // Enforce buffer size from the target task's config
1105            let cfg = self.get_task_config_or_default(task_id);
1106            let mut queue = task.event_queue.lock();
1107            if queue.len() >= cfg.buffer_size {
1108                return Err(EventError::BufferFull);
1109            }
1110            // Enqueue the event since it passed filtering and buffer check
1111            queue.enqueue(event);
1112            Ok(())
1113        } else {
1114            Err(EventError::TargetNotFound)
1115        }
1116    }
1117    #[cfg(test)]
1118    pub fn deliver_to_task(&self, _task_id: u32, _event: Event) -> Result<(), EventError> {
1119        // In tests, we simulate event delivery by simply returning success
1120        // Real integration tests should be done at a higher level with actual Task objects
1121        Ok(())
1122    }
1123
1124    /// Dequeue the next highest priority event for a task
1125    /// This method is deprecated - tasks now process events directly via process_pending_events()
1126    #[deprecated(note = "Use Task.process_pending_events() instead")]
1127    pub fn dequeue_event_for_task(&self, task_id: u32) -> Option<Event> {
1128        if let Some(task) =
1129            crate::sched::scheduler::get_scheduler().get_task_by_id(task_id as usize)
1130        {
1131            let mut queue = task.event_queue.lock();
1132            queue.dequeue()
1133        } else {
1134            None
1135        }
1136    }
1137
1138    /// Get the number of pending events for a task
1139    pub fn get_pending_event_count(&self, task_id: u32) -> usize {
1140        if let Some(task) =
1141            crate::sched::scheduler::get_scheduler().get_task_by_id(task_id as usize)
1142        {
1143            let queue = task.event_queue.lock();
1144            queue.len()
1145        } else {
1146            0
1147        }
1148    }
1149
1150    /// Check if a task has any pending events
1151    pub fn has_pending_events(&self, task_id: u32) -> bool {
1152        if let Some(task) =
1153            crate::sched::scheduler::get_scheduler().get_task_by_id(task_id as usize)
1154        {
1155            let queue = task.event_queue.lock();
1156            !queue.is_empty()
1157        } else {
1158            false
1159        }
1160    }
1161
1162    /// Get a channel by name, if it exists.
1163    pub fn get_channel(&self, name: &str) -> Option<alloc::sync::Arc<EventChannelObject>> {
1164        self.channels.lock().get(name).cloned()
1165    }
1166
1167    /// Remove a subscription from a channel by name and subscription id.
1168    pub fn remove_subscription_from_channel(
1169        &self,
1170        channel_name: &str,
1171        subscription_id: &str,
1172    ) -> Result<(), EventError> {
1173        if let Some(ch) = self.channels.lock().get(channel_name).cloned() {
1174            ch.remove_subscription(subscription_id)
1175        } else {
1176            Err(EventError::ChannelNotFound)
1177        }
1178    }
1179}
1180
1181/// Convenience functions for creating events
1182impl Event {
1183    /// Create a new event with delivery, content, and payload
1184    pub fn new(delivery: EventDelivery, content: EventContent, payload: EventPayload) -> Self {
1185        // Extract priority from delivery mechanism
1186        let priority = match &delivery {
1187            EventDelivery::Direct { priority, .. } => *priority,
1188            EventDelivery::Channel { priority, .. } => *priority,
1189            EventDelivery::Group { priority, .. } => *priority,
1190            EventDelivery::Broadcast { priority, .. } => *priority,
1191        };
1192
1193        Self {
1194            delivery,
1195            content,
1196            payload,
1197            metadata: EventMetadata::with_priority(priority),
1198        }
1199    }
1200
1201    /// Create a direct process control event to a specific task
1202    pub fn direct_process_control(
1203        target: TaskId,
1204        ptype: ProcessControlType,
1205        priority: EventPriority,
1206        reliable: bool,
1207    ) -> Self {
1208        Self::new(
1209            EventDelivery::Direct {
1210                target,
1211                priority,
1212                reliable,
1213            },
1214            EventContent::ProcessControl(ptype),
1215            EventPayload::Empty,
1216        )
1217    }
1218
1219    /// Create a direct custom event to a specific task
1220    pub fn direct_custom(
1221        target: TaskId,
1222        namespace: String,
1223        event_id: u32,
1224        priority: EventPriority,
1225        reliable: bool,
1226        payload: EventPayload,
1227    ) -> Self {
1228        Self::new(
1229            EventDelivery::Direct {
1230                target,
1231                priority,
1232                reliable,
1233            },
1234            EventContent::Custom {
1235                namespace,
1236                event_id,
1237            },
1238            payload,
1239        )
1240    }
1241
1242    /// Create a channel event
1243    pub fn channel(
1244        channel_id: String,
1245        content: EventContent,
1246        create_if_missing: bool,
1247        priority: EventPriority,
1248        payload: EventPayload,
1249    ) -> Self {
1250        Self::new(
1251            EventDelivery::Channel {
1252                channel_id,
1253                create_if_missing,
1254                priority,
1255            },
1256            content,
1257            payload,
1258        )
1259    }
1260
1261    /// Create a group event
1262    pub fn group(
1263        group_target: GroupTarget,
1264        content: EventContent,
1265        priority: EventPriority,
1266        reliable: bool,
1267        payload: EventPayload,
1268    ) -> Self {
1269        Self::new(
1270            EventDelivery::Group {
1271                group_target,
1272                priority,
1273                reliable,
1274            },
1275            content,
1276            payload,
1277        )
1278    }
1279
1280    /// Create a broadcast event
1281    pub fn broadcast(
1282        content: EventContent,
1283        priority: EventPriority,
1284        reliable: bool,
1285        payload: EventPayload,
1286    ) -> Self {
1287        Self::new(
1288            EventDelivery::Broadcast { priority, reliable },
1289            content,
1290            payload,
1291        )
1292    }
1293
1294    // Convenience methods for common use cases
1295
1296    /// Create immediate process control event for a specific task
1297    pub fn immediate_process_control(task_id: u32, ptype: ProcessControlType) -> Self {
1298        Self::direct_process_control(task_id, ptype, EventPriority::High, true)
1299    }
1300
1301    /// Create notification event for a specific task
1302    pub fn notification_to_task(task_id: u32, ntype: NotificationType) -> Self {
1303        Self::new(
1304            EventDelivery::Direct {
1305                target: task_id,
1306                priority: EventPriority::Normal,
1307                reliable: false,
1308            },
1309            EventContent::Notification(ntype),
1310            EventPayload::Empty,
1311        )
1312    }
1313
1314    /// Create channel event (simple)
1315    pub fn new_channel_event(channel: &str, content: EventContent, payload: EventPayload) -> Self {
1316        Self::channel(
1317            channel.into(),
1318            content,
1319            false,
1320            EventPriority::Normal,
1321            payload,
1322        )
1323    }
1324
1325    /// Create group broadcast event (simple)
1326    pub fn new_group_broadcast(
1327        group_target: GroupTarget,
1328        content: EventContent,
1329        payload: EventPayload,
1330    ) -> Self {
1331        Self::group(group_target, content, EventPriority::Normal, false, payload)
1332    }
1333
1334    /// Create immediate broadcast event
1335    pub fn immediate_broadcast(content: EventContent) -> Self {
1336        Self::broadcast(content, EventPriority::High, true, EventPayload::Empty)
1337    }
1338
1339    /// Create notification for a group
1340    pub fn notification_to_group(group_id: GroupId, ntype: NotificationType) -> Self {
1341        Self::group(
1342            GroupTarget::TaskGroup(group_id),
1343            EventContent::Notification(ntype),
1344            EventPriority::Normal,
1345            false,
1346            EventPayload::Empty,
1347        )
1348    }
1349}
1350
1351/// EventChannel implementation for KernelObject integration
1352pub struct EventChannelObject {
1353    name: String,
1354    /// Channel manages its own subscriptions as EventSubscriptionObjects
1355    subscriptions: Mutex<HashMap<String, Arc<EventSubscriptionObject>>>,
1356    #[allow(dead_code)]
1357    manager_ref: &'static EventManager,
1358}
1359
1360impl EventChannelObject {
1361    pub fn new(name: String) -> Self {
1362        Self {
1363            name,
1364            subscriptions: Mutex::new(HashMap::new()),
1365            manager_ref: EventManager::get_manager(),
1366        }
1367    }
1368
1369    pub fn name(&self) -> &str {
1370        &self.name
1371    }
1372
1373    /// Create a new subscription for this channel
1374    pub fn create_subscription(
1375        &self,
1376        task_id: u32,
1377    ) -> Result<Arc<EventSubscriptionObject>, EventError> {
1378        let subscription_id = format!("sub_{}_task_{}", self.name, task_id);
1379        let subscription = Arc::new(EventSubscriptionObject::new(
1380            subscription_id.clone(),
1381            self.name.clone(),
1382            task_id,
1383        ));
1384
1385        let mut subscriptions = self.subscriptions.lock();
1386        subscriptions.insert(subscription_id, subscription.clone());
1387
1388        Ok(subscription)
1389    }
1390
1391    /// Remove a subscription from this channel
1392    pub fn remove_subscription(&self, subscription_id: &str) -> Result<(), EventError> {
1393        let mut subscriptions = self.subscriptions.lock();
1394        subscriptions.remove(subscription_id);
1395        Ok(())
1396    }
1397
1398    /// Get all subscriptions for this channel
1399    pub fn get_subscriptions(&self) -> Vec<Arc<EventSubscriptionObject>> {
1400        self.subscriptions.lock().values().cloned().collect()
1401    }
1402
1403    /// Get list of current subscriber task IDs
1404    pub fn get_subscribers(&self) -> Vec<u32> {
1405        self.subscriptions
1406            .lock()
1407            .values()
1408            .map(|sub| sub.task_id())
1409            .collect()
1410    }
1411
1412    /// Send event to all subscribers of this channel
1413    pub fn broadcast_to_subscribers(&self, event: Event) -> Result<(), EventError> {
1414        let subscribers = self.get_subscribers();
1415        for task_id in subscribers {
1416            let _ = self.manager_ref.deliver_to_task(task_id, event.clone());
1417        }
1418        Ok(())
1419    }
1420
1421    /// Subscribe a task to this channel (legacy method for backward compatibility)
1422    pub fn subscribe(&self, task_id: u32) -> Result<Arc<EventSubscriptionObject>, EventError> {
1423        self.create_subscription(task_id)
1424    }
1425
1426    /// Unsubscribe a task from this channel (legacy method for backward compatibility)
1427    pub fn unsubscribe(&self, task_id: u32) -> Result<(), EventError> {
1428        let mut subscriptions = self.subscriptions.lock();
1429        subscriptions.retain(|_, sub| sub.task_id() != task_id);
1430        Ok(())
1431    }
1432
1433    /// Get a subscription by its ID
1434    pub fn get_subscription_by_id(
1435        &self,
1436        subscription_id: &str,
1437    ) -> Option<Arc<EventSubscriptionObject>> {
1438        self.subscriptions.lock().get(subscription_id).cloned()
1439    }
1440}
1441
1442/// EventSubscription implementation for KernelObject integration  
1443pub struct EventSubscriptionObject {
1444    subscription_id: String,
1445    channel_name: String,
1446    task_id: u32,
1447    /// Local registry of filters keyed by handler ID for this subscription
1448    filters: Mutex<HashMap<usize, EventFilter>>,
1449}
1450
1451impl EventSubscriptionObject {
1452    pub fn new(subscription_id: String, channel_name: String, task_id: u32) -> Self {
1453        Self {
1454            subscription_id,
1455            channel_name,
1456            task_id,
1457            filters: Mutex::new(HashMap::new()),
1458        }
1459    }
1460
1461    pub fn subscription_id(&self) -> &str {
1462        &self.subscription_id
1463    }
1464
1465    pub fn channel_name(&self) -> &str {
1466        &self.channel_name
1467    }
1468
1469    pub fn task_id(&self) -> u32 {
1470        self.task_id
1471    }
1472}
1473
1474impl crate::object::capability::EventSender for EventChannelObject {
1475    fn send_event(&self, event: Event) -> Result<(), &'static str> {
1476        self.manager_ref
1477            .send_event(event)
1478            .map_err(|_| "Failed to send event")?;
1479        Ok(())
1480    }
1481}
1482
1483impl crate::object::capability::EventReceiver for EventChannelObject {
1484    fn has_pending_events(&self) -> bool {
1485        // Check if any subscriber task has pending events for THIS channel specifically
1486        let subscriber_ids = self.get_subscribers();
1487        for tid in subscriber_ids {
1488            if let Some(task) =
1489                crate::sched::scheduler::get_scheduler().get_task_by_id(tid as usize)
1490            {
1491                let queue = task.event_queue.lock();
1492                for (_prio, q) in queue.events.iter() {
1493                    for ev in q.iter() {
1494                        if let EventDelivery::Channel { channel_id, .. } = &ev.delivery {
1495                            if channel_id == &self.name {
1496                                return true;
1497                            }
1498                        }
1499                    }
1500                }
1501            }
1502        }
1503        false
1504    }
1505}
1506
1507impl crate::object::capability::EventReceiver for EventSubscriptionObject {
1508    fn has_pending_events(&self) -> bool {
1509        // Only consider events delivered to this subscription's channel and matching its local filters
1510        if let Some(task) =
1511            crate::sched::scheduler::get_scheduler().get_task_by_id(self.task_id as usize)
1512        {
1513            let queue = task.event_queue.lock();
1514            let channel_name = self.channel_name.as_str();
1515            // Take a snapshot of local filters
1516            let local_filters: alloc::vec::Vec<EventFilter> = {
1517                let guard = self.filters.lock();
1518                guard.values().cloned().collect()
1519            };
1520            let use_filters = !local_filters.is_empty();
1521
1522            for (_prio, q) in queue.events.iter() {
1523                for ev in q.iter() {
1524                    if let EventDelivery::Channel { channel_id, .. } = &ev.delivery {
1525                        if channel_id == channel_name {
1526                            if use_filters {
1527                                if local_filters.iter().any(|f| f.matches(ev)) {
1528                                    return true;
1529                                }
1530                            } else {
1531                                return true; // No local filters => any event on this channel counts
1532                            }
1533                        }
1534                    }
1535                }
1536            }
1537        }
1538        false
1539    }
1540}
1541
1542impl crate::object::capability::EventSubscriber for EventSubscriptionObject {
1543    fn register_filter(&self, filter: EventFilter, handler_id: usize) -> Result<(), &'static str> {
1544        // Store locally only to avoid task-global filter pollution
1545        self.filters.lock().insert(handler_id, filter);
1546        Ok(())
1547    }
1548
1549    fn unregister_filter(&self, handler_id: usize) -> Result<(), &'static str> {
1550        self.filters.lock().remove(&handler_id);
1551        Ok(())
1552    }
1553
1554    fn get_filters(&self) -> Vec<(usize, EventFilter)> {
1555        // Return a snapshot of local filters
1556        self.filters
1557            .lock()
1558            .iter()
1559            .map(|(k, v)| (*k, v.clone()))
1560            .collect()
1561    }
1562}
1563
1564impl crate::object::capability::CloneOps for EventChannelObject {
1565    fn custom_clone(&self) -> crate::object::KernelObject {
1566        // Try to return the same Arc registered in EventManager
1567        let mgr = EventManager::get_manager();
1568        if let Some(arc) = mgr.channels.lock().get(self.name()).cloned() {
1569            crate::object::KernelObject::EventChannel(arc)
1570        } else {
1571            // Fallback: create or register via manager to ensure registry consistency
1572            let ko = mgr.create_channel(self.name.clone());
1573            ko
1574        }
1575    }
1576}
1577
1578impl crate::object::capability::CloneOps for EventSubscriptionObject {
1579    fn custom_clone(&self) -> crate::object::KernelObject {
1580        let mgr = EventManager::get_manager();
1581        // Resolve channel from manager and then subscription by id
1582        if let Some(ch) = mgr.channels.lock().get(self.channel_name()).cloned() {
1583            if let Some(sub) = ch.get_subscription_by_id(self.subscription_id()) {
1584                return crate::object::KernelObject::EventSubscription(sub);
1585            }
1586        }
1587        // Fallback: create a new subscription object (not ideal, but avoids panic)
1588        let fallback = alloc::sync::Arc::new(EventSubscriptionObject::new(
1589            self.subscription_id.clone(),
1590            self.channel_name.clone(),
1591            self.task_id,
1592        ));
1593        crate::object::KernelObject::EventSubscription(fallback)
1594    }
1595}
1596
1597/// Generate unique event ID
1598fn generate_event_id() -> u64 {
1599    static COUNTER: Mutex<u64> = Mutex::new(1);
1600    let mut counter = COUNTER.lock();
1601    let id = *counter;
1602    *counter += 1;
1603    id
1604}
1605
1606#[cfg(test)]
1607mod tests {
1608    use super::*;
1609    use crate::object::capability::EventSubscriber;
1610    use alloc::string::ToString; // bring trait into scope
1611
1612    #[test_case]
1613    fn test_event_creation() {
1614        let event = Event::new(
1615            EventDelivery::Direct {
1616                target: 123,
1617                priority: EventPriority::High,
1618                reliable: true,
1619            },
1620            EventContent::ProcessControl(ProcessControlType::Terminate),
1621            EventPayload::Empty,
1622        );
1623
1624        match event.delivery {
1625            EventDelivery::Direct {
1626                target,
1627                priority,
1628                reliable,
1629            } => {
1630                assert_eq!(target, 123);
1631                assert_eq!(priority, EventPriority::High);
1632                assert_eq!(reliable, true);
1633            }
1634            _ => panic!("Wrong delivery type"),
1635        }
1636
1637        match event.content {
1638            EventContent::ProcessControl(ProcessControlType::Terminate) => {}
1639            _ => panic!("Wrong content type"),
1640        }
1641
1642        assert_eq!(event.metadata.priority, EventPriority::High);
1643    }
1644
1645    #[test_case]
1646    fn test_event_convenience_functions() {
1647        // Test direct process control event
1648        let event = Event::direct_process_control(
1649            42,
1650            ProcessControlType::Kill,
1651            EventPriority::Critical,
1652            true,
1653        );
1654
1655        match event.delivery {
1656            EventDelivery::Direct {
1657                target,
1658                priority,
1659                reliable,
1660            } => {
1661                assert_eq!(target, 42);
1662                assert_eq!(priority, EventPriority::Critical);
1663                assert_eq!(reliable, true);
1664            }
1665            _ => panic!("Wrong delivery type"),
1666        }
1667
1668        match event.content {
1669            EventContent::ProcessControl(ProcessControlType::Kill) => {}
1670            _ => panic!("Wrong content type"),
1671        }
1672
1673        // Test immediate process control event
1674        let event = Event::immediate_process_control(99, ProcessControlType::Stop);
1675        match event.delivery {
1676            EventDelivery::Direct {
1677                target,
1678                priority,
1679                reliable,
1680            } => {
1681                assert_eq!(target, 99);
1682                assert_eq!(priority, EventPriority::High);
1683                assert_eq!(reliable, true);
1684            }
1685            _ => panic!("Wrong delivery type"),
1686        }
1687    }
1688
1689    #[test_case]
1690    fn test_event_channel_creation() {
1691        let event = Event::channel(
1692            "test_channel".to_string(),
1693            EventContent::Notification(NotificationType::TaskCompleted),
1694            true,
1695            EventPriority::Normal,
1696            EventPayload::Empty,
1697        );
1698
1699        match event.delivery {
1700            EventDelivery::Channel {
1701                channel_id,
1702                create_if_missing,
1703                priority,
1704            } => {
1705                assert_eq!(channel_id, "test_channel");
1706                assert_eq!(create_if_missing, true);
1707                assert_eq!(priority, EventPriority::Normal);
1708            }
1709            _ => panic!("Wrong delivery type"),
1710        }
1711
1712        match event.content {
1713            EventContent::Notification(NotificationType::TaskCompleted) => {}
1714            _ => panic!("Wrong content type"),
1715        }
1716    }
1717
1718    #[test_case]
1719    fn test_event_group_creation() {
1720        let event = Event::group(
1721            GroupTarget::AllTasks,
1722            EventContent::Message {
1723                message_type: 42,
1724                category: MessageCategory::Control,
1725            },
1726            EventPriority::Low,
1727            false,
1728            EventPayload::Empty,
1729        );
1730
1731        match event.delivery {
1732            EventDelivery::Group {
1733                group_target,
1734                priority,
1735                reliable,
1736            } => {
1737                assert_eq!(group_target, GroupTarget::AllTasks);
1738                assert_eq!(priority, EventPriority::Low);
1739                assert_eq!(reliable, false);
1740            }
1741            _ => panic!("Wrong delivery type"),
1742        }
1743
1744        match event.content {
1745            EventContent::Message {
1746                message_type,
1747                category,
1748            } => {
1749                assert_eq!(message_type, 42);
1750                assert_eq!(category, MessageCategory::Control);
1751            }
1752            _ => panic!("Wrong content type"),
1753        }
1754    }
1755
1756    #[test_case]
1757    fn test_event_broadcast_creation() {
1758        let event = Event::broadcast(
1759            EventContent::Custom {
1760                namespace: "test_namespace".to_string(),
1761                event_id: 100,
1762            },
1763            EventPriority::Normal,
1764            true,
1765            EventPayload::Bytes(alloc::vec![1, 2, 3, 4]),
1766        );
1767
1768        match event.delivery {
1769            EventDelivery::Broadcast { priority, reliable } => {
1770                assert_eq!(priority, EventPriority::Normal);
1771                assert_eq!(reliable, true);
1772            }
1773            _ => panic!("Wrong delivery type"),
1774        }
1775
1776        match event.content {
1777            EventContent::Custom {
1778                namespace,
1779                event_id,
1780            } => {
1781                assert_eq!(namespace, "test_namespace");
1782                assert_eq!(event_id, 100);
1783            }
1784            _ => panic!("Wrong content type"),
1785        }
1786
1787        match event.payload {
1788            EventPayload::Bytes(data) => {
1789                assert_eq!(data, alloc::vec![1, 2, 3, 4]);
1790            }
1791            _ => panic!("Wrong payload type"),
1792        }
1793    }
1794
1795    #[test_case]
1796    fn test_event_filter_event_type() {
1797        let filter = EventFilter::EventType(EventTypeFilter::AnyDirect);
1798
1799        let direct_event = Event::direct_process_control(
1800            123,
1801            ProcessControlType::Terminate,
1802            EventPriority::High,
1803            true,
1804        );
1805
1806        let channel_event = Event::channel(
1807            "test".to_string(),
1808            EventContent::ProcessControl(ProcessControlType::Terminate),
1809            false,
1810            EventPriority::High,
1811            EventPayload::Empty,
1812        );
1813
1814        assert_eq!(filter.matches(&direct_event), true);
1815        assert_eq!(filter.matches(&channel_event), false);
1816    }
1817
1818    #[test_case]
1819    fn test_event_filter_channel() {
1820        let filter = EventFilter::Channel("test_channel".to_string());
1821
1822        let matching_event = Event::channel(
1823            "test_channel".to_string(),
1824            EventContent::Notification(NotificationType::TaskCompleted),
1825            false,
1826            EventPriority::Normal,
1827            EventPayload::Empty,
1828        );
1829
1830        let non_matching_event = Event::channel(
1831            "other_channel".to_string(),
1832            EventContent::Notification(NotificationType::TaskCompleted),
1833            false,
1834            EventPriority::Normal,
1835            EventPayload::Empty,
1836        );
1837
1838        assert_eq!(filter.matches(&matching_event), true);
1839        assert_eq!(filter.matches(&non_matching_event), false);
1840    }
1841
1842    #[test_case]
1843    fn test_event_filter_sender() {
1844        let filter = EventFilter::Sender(42);
1845
1846        let mut matching_event =
1847            Event::immediate_process_control(123, ProcessControlType::Terminate);
1848        matching_event.metadata.sender = Some(42);
1849
1850        let mut non_matching_event =
1851            Event::immediate_process_control(123, ProcessControlType::Terminate);
1852        non_matching_event.metadata.sender = Some(99);
1853
1854        assert_eq!(filter.matches(&matching_event), true);
1855        assert_eq!(filter.matches(&non_matching_event), false);
1856    }
1857
1858    #[test_case]
1859    fn test_task_event_queue_basic() {
1860        let mut queue = TaskEventQueue::new();
1861
1862        assert_eq!(queue.is_empty(), true);
1863        assert_eq!(queue.len(), 0);
1864
1865        let event = Event::immediate_process_control(123, ProcessControlType::Terminate);
1866        assert_eq!(queue.enqueue(event.clone()), true);
1867
1868        assert_eq!(queue.is_empty(), false);
1869        assert_eq!(queue.len(), 1);
1870
1871        let dequeued = queue.dequeue();
1872        assert!(dequeued.is_some());
1873
1874        assert_eq!(queue.is_empty(), true);
1875        assert_eq!(queue.len(), 0);
1876    }
1877
1878    #[test_case]
1879    fn test_task_event_queue_priority_ordering() {
1880        let mut queue = TaskEventQueue::new();
1881
1882        // Add events in non-priority order
1883        let low_event =
1884            Event::direct_process_control(1, ProcessControlType::Stop, EventPriority::Low, true);
1885        let critical_event = Event::direct_process_control(
1886            2,
1887            ProcessControlType::Kill,
1888            EventPriority::Critical,
1889            true,
1890        );
1891        let high_event = Event::direct_process_control(
1892            3,
1893            ProcessControlType::Terminate,
1894            EventPriority::High,
1895            true,
1896        );
1897        let normal_event = Event::direct_process_control(
1898            4,
1899            ProcessControlType::Continue,
1900            EventPriority::Normal,
1901            true,
1902        );
1903
1904        queue.enqueue(low_event);
1905        queue.enqueue(critical_event);
1906        queue.enqueue(high_event);
1907        queue.enqueue(normal_event);
1908
1909        assert_eq!(queue.len(), 4);
1910
1911        // Should dequeue in priority order: Critical -> High -> Normal -> Low
1912        let first = queue.dequeue().unwrap();
1913        assert_eq!(first.metadata.priority, EventPriority::Critical);
1914
1915        let second = queue.dequeue().unwrap();
1916        assert_eq!(second.metadata.priority, EventPriority::High);
1917
1918        let third = queue.dequeue().unwrap();
1919        assert_eq!(third.metadata.priority, EventPriority::Normal);
1920
1921        let fourth = queue.dequeue().unwrap();
1922        assert_eq!(fourth.metadata.priority, EventPriority::Low);
1923
1924        assert_eq!(queue.len(), 0);
1925    }
1926
1927    #[test_case]
1928    fn test_event_manager_creation() {
1929        let manager = EventManager::new();
1930        assert!(manager.channels.lock().is_empty());
1931    }
1932
1933    #[test_case]
1934    fn test_process_control_type_variants_full() {
1935        // Test all ProcessControlType variants
1936        let variants = [
1937            ProcessControlType::Terminate,
1938            ProcessControlType::Kill,
1939            ProcessControlType::Stop,
1940            ProcessControlType::Continue,
1941            ProcessControlType::Interrupt,
1942            ProcessControlType::Quit,
1943            ProcessControlType::Hangup,
1944            ProcessControlType::ChildExit,
1945            ProcessControlType::User(0),
1946        ];
1947
1948        for &variant in &variants {
1949            let event = Event::immediate_process_control(123, variant);
1950            match event.content {
1951                EventContent::ProcessControl(received_variant) => {
1952                    assert_eq!(received_variant, variant);
1953                }
1954                _ => panic!("Wrong content type for variant {:?}", variant),
1955            }
1956        }
1957    }
1958
1959    #[test_case]
1960    fn test_notification_type_variants() {
1961        // Test all NotificationType variants
1962        let variants = [
1963            NotificationType::TaskCompleted,
1964            NotificationType::MemoryLow,
1965            NotificationType::DeviceConnected,
1966            NotificationType::DeviceDisconnected,
1967            NotificationType::FilesystemFull,
1968            NotificationType::NetworkChange,
1969        ];
1970
1971        for &variant in &variants {
1972            let event = Event::notification_to_task(123, variant);
1973            match event.content {
1974                EventContent::Notification(received_variant) => {
1975                    assert_eq!(received_variant, variant);
1976                }
1977                _ => panic!("Wrong content type for variant {:?}", variant),
1978            }
1979        }
1980    }
1981
1982    #[test_case]
1983    fn test_event_payload_variants() {
1984        // Test Empty payload
1985        let empty_event = Event::immediate_process_control(123, ProcessControlType::Terminate);
1986        match empty_event.payload {
1987            EventPayload::Empty => {}
1988            _ => panic!("Expected Empty payload"),
1989        }
1990
1991        // Test Integer payload
1992        let integer_event = Event::broadcast(
1993            EventContent::Custom {
1994                namespace: "test".to_string(),
1995                event_id: 42,
1996            },
1997            EventPriority::Normal,
1998            false,
1999            EventPayload::Integer(12345),
2000        );
2001        match integer_event.payload {
2002            EventPayload::Integer(value) => {
2003                assert_eq!(value, 12345);
2004            }
2005            _ => panic!("Expected Integer payload"),
2006        }
2007
2008        // Test Bytes payload
2009        let data = alloc::vec![1, 2, 3, 4, 5];
2010        let bytes_event = Event::broadcast(
2011            EventContent::Custom {
2012                namespace: "test".to_string(),
2013                event_id: 43,
2014            },
2015            EventPriority::Normal,
2016            false,
2017            EventPayload::Bytes(data.clone()),
2018        );
2019        match bytes_event.payload {
2020            EventPayload::Bytes(received_data) => {
2021                assert_eq!(received_data, data);
2022            }
2023            _ => panic!("Expected Bytes payload"),
2024        }
2025
2026        // Test String payload
2027        let text = "Hello, World!".to_string();
2028        let string_event = Event::broadcast(
2029            EventContent::Custom {
2030                namespace: "test".to_string(),
2031                event_id: 44,
2032            },
2033            EventPriority::Normal,
2034            false,
2035            EventPayload::String(text.clone()),
2036        );
2037        match string_event.payload {
2038            EventPayload::String(received_text) => {
2039                assert_eq!(received_text, text);
2040            }
2041            _ => panic!("Expected String payload"),
2042        }
2043    }
2044
2045    #[test_case]
2046    fn test_process_control_type_variants() {
2047        // Test all ProcessControlType variants
2048        let variants = [
2049            ProcessControlType::Kill,
2050            ProcessControlType::Terminate,
2051            ProcessControlType::Stop,
2052            ProcessControlType::Continue,
2053            ProcessControlType::Interrupt,
2054            ProcessControlType::Quit,
2055            ProcessControlType::Hangup,
2056            ProcessControlType::User(0),
2057            ProcessControlType::PipeBroken,
2058            ProcessControlType::Alarm,
2059            ProcessControlType::ChildExit,
2060            ProcessControlType::IoReady,
2061        ];
2062
2063        for &variant in &variants {
2064            let event = Event::immediate_process_control(123, variant);
2065            match event.content {
2066                EventContent::ProcessControl(received_variant) => {
2067                    assert_eq!(received_variant, variant);
2068                }
2069                _ => panic!("Wrong content type for variant {:?}", variant),
2070            }
2071        }
2072    }
2073
2074    #[test_case]
2075    fn test_notification_type_variants_notification() {
2076        // Test all NotificationType variants
2077        let variants = [
2078            NotificationType::TaskCompleted,
2079            NotificationType::MemoryLow,
2080            NotificationType::DeviceConnected,
2081            NotificationType::DeviceDisconnected,
2082            NotificationType::FilesystemFull,
2083            NotificationType::NetworkChange,
2084        ];
2085
2086        for &variant in &variants {
2087            let event = Event::notification_to_task(123, variant);
2088            match event.content {
2089                EventContent::Notification(received_variant) => {
2090                    assert_eq!(received_variant, variant);
2091                }
2092                _ => panic!("Wrong content type for variant {:?}", variant),
2093            }
2094        }
2095    }
2096
2097    #[test_case]
2098    fn test_event_payload_variants_extended() {
2099        // Test Empty payload
2100        let empty_event = Event::immediate_process_control(123, ProcessControlType::Terminate);
2101        match empty_event.payload {
2102            EventPayload::Empty => {}
2103            _ => panic!("Expected Empty payload"),
2104        }
2105
2106        // Test Integer payload
2107        let integer_event = Event::broadcast(
2108            EventContent::Custom {
2109                namespace: "test".to_string(),
2110                event_id: 42,
2111            },
2112            EventPriority::Normal,
2113            false,
2114            EventPayload::Integer(12345),
2115        );
2116        match integer_event.payload {
2117            EventPayload::Integer(value) => {
2118                assert_eq!(value, 12345);
2119            }
2120            _ => panic!("Expected Integer payload"),
2121        }
2122
2123        // Test Bytes payload
2124        let data = alloc::vec![1, 2, 3, 4, 5];
2125        let bytes_event = Event::broadcast(
2126            EventContent::Custom {
2127                namespace: "test".to_string(),
2128                event_id: 43,
2129            },
2130            EventPriority::Normal,
2131            false,
2132            EventPayload::Bytes(data.clone()),
2133        );
2134        match bytes_event.payload {
2135            EventPayload::Bytes(received_data) => {
2136                assert_eq!(received_data, data);
2137            }
2138            _ => panic!("Expected Bytes payload"),
2139        }
2140
2141        // Test String payload
2142        let text = "Hello, World!".to_string();
2143        let string_event = Event::broadcast(
2144            EventContent::Custom {
2145                namespace: "test".to_string(),
2146                event_id: 44,
2147            },
2148            EventPriority::Normal,
2149            false,
2150            EventPayload::String(text.clone()),
2151        );
2152        match string_event.payload {
2153            EventPayload::String(received_text) => {
2154                assert_eq!(received_text, text);
2155            }
2156            _ => panic!("Expected String payload"),
2157        }
2158    }
2159
2160    #[test_case]
2161    fn test_channel_subscription_management() {
2162        let manager = EventManager::get_manager();
2163        // Create channel via manager
2164        let ch = manager.create_channel("sub_test".to_string());
2165        let channel = match ch {
2166            crate::object::KernelObject::EventChannel(arc) => arc,
2167            _ => panic!("Expected EventChannel"),
2168        };
2169
2170        // Initially no subscribers
2171        assert_eq!(channel.get_subscribers().len(), 0);
2172
2173        // Subscribe task 1 and 2
2174        let _s1 = channel.subscribe(1).expect("subscribe(1)");
2175        let _s2 = channel.subscribe(2).expect("subscribe(2)");
2176        let mut subs = channel.get_subscribers();
2177        subs.sort();
2178        assert_eq!(subs, alloc::vec![1, 2]);
2179
2180        // Unsubscribe task 1
2181        channel.unsubscribe(1).expect("unsubscribe(1)");
2182        let subs = channel.get_subscribers();
2183        assert_eq!(subs, alloc::vec![2]);
2184
2185        // Remove remaining via remove_subscription using id
2186        let ids: alloc::vec::Vec<String> = channel
2187            .get_subscriptions()
2188            .into_iter()
2189            .map(|s| s.subscription_id().to_string())
2190            .collect();
2191        for id in ids {
2192            channel.remove_subscription(&id).unwrap();
2193        }
2194        assert_eq!(channel.get_subscribers().len(), 0);
2195    }
2196
2197    #[test_case]
2198    fn test_event_manager_subscription_creation() {
2199        let manager = EventManager::get_manager();
2200        // Create subscription through manager, it should create channel if missing
2201        let ko = manager
2202            .create_subscription("mgr_sub_test".to_string(), 42)
2203            .expect("create_subscription");
2204        let sub = match ko {
2205            crate::object::KernelObject::EventSubscription(arc) => arc,
2206            _ => panic!("Expected EventSubscription"),
2207        };
2208        assert_eq!(sub.channel_name(), "mgr_sub_test");
2209        assert_eq!(sub.task_id(), 42);
2210
2211        // Ensure channel was registered and contains the subscriber
2212        let channels = manager.channels.lock();
2213        let ch = channels
2214            .get("mgr_sub_test")
2215            .expect("channel exists")
2216            .clone();
2217        drop(channels);
2218        let subs = ch.get_subscribers();
2219        assert_eq!(subs, alloc::vec![42]);
2220    }
2221
2222    #[test_case]
2223    fn test_channel_broadcast_no_error() {
2224        let manager = EventManager::get_manager();
2225        let ch = manager.create_channel("bc_test".to_string());
2226        let channel = match ch {
2227            crate::object::KernelObject::EventChannel(arc) => arc,
2228            _ => panic!("Expected EventChannel"),
2229        };
2230        let _ = channel.subscribe(100).unwrap();
2231        let _ = channel.subscribe(200).unwrap();
2232
2233        // Since in cfg(test) deliver_to_task is a stub that returns Ok, we only
2234        // verify that broadcast completes without error when subscribers exist.
2235        let ev = Event::new_channel_event(
2236            "bc_test",
2237            EventContent::Notification(NotificationType::TaskCompleted),
2238            EventPayload::Empty,
2239        );
2240        channel.broadcast_to_subscribers(ev).expect("broadcast ok");
2241    }
2242
2243    #[test_case]
2244    fn test_subscription_filter_registration() {
2245        let manager = EventManager::get_manager();
2246        let ch = manager.create_channel("filter_test".to_string());
2247        let channel = match ch {
2248            crate::object::KernelObject::EventChannel(arc) => arc,
2249            _ => panic!("Expected EventChannel"),
2250        };
2251        let sub = channel.subscribe(7).expect("subscribe");
2252
2253        // Register two filters (local only now)
2254        sub.register_filter(EventFilter::All, 1).expect("reg1");
2255        sub.register_filter(EventFilter::Sender(7), 2)
2256            .expect("reg2");
2257        let filters = sub.get_filters();
2258        assert_eq!(filters.len(), 2);
2259
2260        // Unregister one
2261        sub.unregister_filter(1).expect("unreg1");
2262        let filters = sub.get_filters();
2263        assert_eq!(filters.len(), 1);
2264
2265        // Manager's global filters should not be polluted by subscription-local filters
2266        let globals = manager.get_filters_for_task(7);
2267        assert!(globals.is_empty());
2268    }
2269
2270    #[test_case]
2271    fn test_clone_returns_same_arc_objects() {
2272        use crate::object::capability::CloneOps;
2273        let mgr = EventManager::get_manager();
2274        let ch = match mgr.create_channel("clone_arc".to_string()) {
2275            crate::object::KernelObject::EventChannel(arc) => arc,
2276            _ => panic!("Expected channel"),
2277        };
2278        let sub = ch.subscribe(1234).expect("subscribe");
2279
2280        // Channel clone should resolve to the same Arc in manager registry
2281        let ch_clone = match CloneOps::custom_clone(&*ch) {
2282            crate::object::KernelObject::EventChannel(arc) => arc,
2283            _ => panic!("Expected channel"),
2284        };
2285        assert!(alloc::sync::Arc::ptr_eq(&ch, &ch_clone));
2286
2287        // Subscription clone should resolve to the same Arc if still registered
2288        let sub_clone = match CloneOps::custom_clone(&*sub) {
2289            crate::object::KernelObject::EventSubscription(arc) => arc,
2290            _ => panic!("Expected sub"),
2291        };
2292        assert!(alloc::sync::Arc::ptr_eq(&sub, &sub_clone));
2293    }
2294
2295    #[test_case]
2296    fn test_group_session_and_named_delivery_no_error() {
2297        let mgr = EventManager::get_manager();
2298        // Prepare tasks and register into scheduler
2299        for i in 0..2 {
2300            let task =
2301                crate::task::Task::new(format!("g_sess_{}", i), 1, crate::task::TaskType::Kernel);
2302            crate::sched::scheduler::get_scheduler().add_task(task, 0);
2303        }
2304
2305        // For tests, get_current_task_id() returns Some(1), so join operations will add task 1
2306        mgr.join_session(77).expect("join session");
2307        mgr.join_named_group("teamA".to_string())
2308            .expect("join named");
2309
2310        let ev_sess = Event::group(
2311            GroupTarget::Session(77),
2312            EventContent::Notification(NotificationType::DeviceConnected),
2313            EventPriority::Normal,
2314            false,
2315            EventPayload::Empty,
2316        );
2317        let ev_named = Event::group(
2318            GroupTarget::Custom("teamA".to_string()),
2319            EventContent::Notification(NotificationType::DeviceDisconnected),
2320            EventPriority::Normal,
2321            false,
2322            EventPayload::Empty,
2323        );
2324        assert!(mgr.send_event(ev_sess).is_ok());
2325        assert!(mgr.send_event(ev_named).is_ok());
2326
2327        mgr.leave_session(77).ok();
2328        mgr.leave_named_group("teamA").ok();
2329    }
2330}