1use alloc::collections::BTreeMap;
11use alloc::{collections::VecDeque, format, string::String, sync::Arc, vec::Vec};
12use hashbrown::HashMap;
13use spin::Mutex;
14
15pub type TaskId = u32;
17pub type GroupId = u32;
19pub type SessionId = u32;
21
22#[derive(Debug, Clone)]
32pub struct Event {
33 pub delivery: EventDelivery,
35
36 pub content: EventContent,
38
39 pub payload: EventPayload,
41
42 pub metadata: EventMetadata,
44}
45
46#[derive(Debug, Clone)]
50pub enum EventDelivery {
51 Direct {
53 target: TaskId,
54 priority: EventPriority,
55 reliable: bool,
56 },
57
58 Channel {
60 channel_id: String,
61 create_if_missing: bool,
62 priority: EventPriority,
63 },
64
65 Group {
67 group_target: GroupTarget,
68 priority: EventPriority,
69 reliable: bool,
70 },
71
72 Broadcast {
74 priority: EventPriority,
75 reliable: bool,
76 },
77}
78
79#[derive(Debug, Clone)]
83pub enum EventContent {
84 ProcessControl(ProcessControlType),
86
87 Message {
89 message_type: u32,
90 category: MessageCategory,
91 },
92
93 Notification(NotificationType),
95
96 Custom {
98 namespace: String, event_id: u32,
100 },
101}
102
103#[derive(Debug, Clone, Copy, PartialEq, Eq)]
108pub enum ProcessControlType {
109 Terminate, Kill, Stop, Continue, Interrupt, Quit, Hangup, ChildExit, PipeBroken, Alarm, IoReady, User(u32), }
123
124#[derive(Debug, Clone, Copy, PartialEq, Eq)]
126pub enum MessageCategory {
127 Control, Data, Status, Error, Custom(u8), }
133
134#[derive(Debug, Clone, Copy, PartialEq, Eq)]
136pub enum NotificationType {
137 TaskCompleted,
138 MemoryLow,
139 DeviceConnected,
140 DeviceDisconnected,
141 FilesystemFull,
142 NetworkChange,
143 SystemShutdown,
144 }
146
147#[derive(Debug, Clone, PartialEq)]
149pub enum GroupTarget {
150 TaskGroup(GroupId),
152
153 AllTasks,
155
156 Session(SessionId),
158
159 Custom(String),
161}
162#[derive(Debug, Clone)]
164pub enum EventPayload {
165 Empty,
167
168 Integer(i64),
170
171 Bytes(Vec<u8>),
173
174 String(String),
176
177 Custom(Vec<u8>),
179}
180
181#[derive(Debug, Clone)]
183pub struct EventMetadata {
184 pub sender: Option<u32>,
186
187 pub priority: EventPriority,
189
190 pub timestamp: u64,
192
193 pub event_id: u64,
195}
196
197impl EventMetadata {
198 pub fn new() -> Self {
200 Self {
201 sender: None, priority: EventPriority::Normal,
203 timestamp: 0, event_id: generate_event_id(),
205 }
206 }
207
208 pub fn with_priority(priority: EventPriority) -> Self {
210 Self {
211 sender: None, priority,
213 timestamp: 0, event_id: generate_event_id(),
215 }
216 }
217}
218
219#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
221pub enum EventPriority {
222 Low = 1,
223 Normal = 2,
224 High = 3,
225 Critical = 4,
226}
227
228#[derive(Debug, Clone)]
230pub enum EventFilter {
231 All,
233
234 EventType(EventTypeFilter),
236
237 EventId(u32),
239
240 Channel(String),
242
243 Sender(u32),
245
246 Custom(fn(&Event) -> bool),
248}
249
250impl EventFilter {
251 pub fn matches(&self, event: &Event) -> bool {
253 match self {
254 EventFilter::All => true,
255
256 EventFilter::EventType(type_filter) => {
257 match type_filter {
258 EventTypeFilter::AnyDirect => {
259 matches!(event.delivery, EventDelivery::Direct { .. })
260 }
261 EventTypeFilter::AnyChannel => {
262 matches!(event.delivery, EventDelivery::Channel { .. })
263 }
264 EventTypeFilter::AnyGroup => {
265 matches!(event.delivery, EventDelivery::Group { .. })
266 }
267 EventTypeFilter::AnyBroadcast => {
268 matches!(event.delivery, EventDelivery::Broadcast { .. })
269 }
270
271 EventTypeFilter::Direct(content_id) => {
272 if let EventDelivery::Direct { .. } = &event.delivery {
273 match &event.content {
275 EventContent::ProcessControl(ptype) => {
276 let type_id = match ptype {
278 ProcessControlType::Terminate => 1,
279 ProcessControlType::Kill => 2,
280 ProcessControlType::Stop => 3,
281 ProcessControlType::Continue => 4,
282 ProcessControlType::Interrupt => 7,
283 _ => 0,
284 };
285 type_id == *content_id
286 }
287 EventContent::Custom { event_id, .. } => *event_id == *content_id,
288 _ => false,
289 }
290 } else {
291 false
292 }
293 }
294
295 EventTypeFilter::Channel(channel_name) => {
296 if let EventDelivery::Channel { channel_id, .. } = &event.delivery {
297 channel_id == channel_name
298 } else {
299 false
300 }
301 }
302
303 EventTypeFilter::Group(group_id) => {
304 if let EventDelivery::Group {
305 group_target: GroupTarget::TaskGroup(id),
306 ..
307 } = &event.delivery
308 {
309 id == group_id
310 } else {
311 false
312 }
313 }
314
315 EventTypeFilter::Broadcast(event_id) => {
316 if let EventDelivery::Broadcast { .. } = &event.delivery {
317 match &event.content {
319 EventContent::Custom { event_id: id, .. } => id == event_id,
320 _ => event.metadata.event_id == *event_id as u64,
321 }
322 } else {
323 false
324 }
325 }
326 }
327 }
328
329 EventFilter::EventId(event_id) => {
330 event.metadata.event_id == *event_id as u64
332 }
333
334 EventFilter::Channel(channel_name) => {
335 if let EventDelivery::Channel { channel_id, .. } = &event.delivery {
336 channel_id == channel_name
337 } else {
338 false
339 }
340 }
341
342 EventFilter::Sender(sender_id) => event.metadata.sender == Some(*sender_id),
343
344 EventFilter::Custom(filter_fn) => filter_fn(event),
345 }
346 }
347}
348
349#[derive(Debug, Clone)]
351pub enum EventTypeFilter {
352 AnyDirect,
354
355 AnyChannel,
357
358 AnyGroup,
360
361 AnyBroadcast,
363
364 Direct(u32),
366
367 Channel(String),
369
370 Group(GroupId),
372
373 Broadcast(u32),
375}
376
377#[derive(Debug, Clone)]
379pub struct DeliveryConfig {
380 pub buffer_size: usize,
382
383 pub timeout_ms: Option<u64>,
385
386 pub retry_count: u32,
388
389 pub failure_policy: FailurePolicy,
391}
392
393impl Default for DeliveryConfig {
394 fn default() -> Self {
395 Self {
396 buffer_size: 1024,
397 timeout_ms: Some(5000),
398 retry_count: 3,
399 failure_policy: FailurePolicy::Log,
400 }
401 }
402}
403
404#[derive(Debug, Clone)]
406pub enum FailurePolicy {
407 Ignore,
409
410 Log,
412
413 NotifySender,
415
416 SystemEvent,
418}
419
420#[derive(Debug, Clone)]
422pub enum EventError {
423 TargetNotFound,
425
426 PermissionDenied,
428
429 DeliveryFailed,
431
432 BufferFull,
434
435 Timeout,
437
438 InvalidConfiguration,
440
441 ChannelNotFound,
443
444 GroupNotFound,
446
447 Other(String),
449}
450
451#[derive(Debug, Clone)]
453pub struct EventConfig {
454 pub default_buffer_size: usize,
456
457 pub default_timeout_ms: u64,
459
460 pub max_channels: usize,
462
463 pub max_groups: usize,
465}
466
467impl Default for EventConfig {
468 fn default() -> Self {
469 Self {
470 default_buffer_size: 64,
471 default_timeout_ms: 1000,
472 max_channels: 1024,
473 max_groups: 256,
474 }
475 }
476}
477
478#[derive(Debug, Clone)]
480pub struct TaskEventQueue {
481 events: BTreeMap<EventPriority, VecDeque<Event>>,
483 total_count: usize,
485}
486
487impl TaskEventQueue {
488 pub fn new() -> Self {
489 Self {
490 events: BTreeMap::new(),
491 total_count: 0,
492 }
493 }
494
495 fn enqueue(&mut self, event: Event) -> bool {
497 let was_empty = self.total_count == 0;
498 let priority = event.metadata.priority;
499
500 self.events
501 .entry(priority)
502 .or_insert_with(VecDeque::new)
503 .push_back(event);
504 self.total_count += 1;
505
506 was_empty
507 }
508
509 pub fn dequeue(&mut self) -> Option<Event> {
511 let priority_to_dequeue = {
514 self.events
515 .iter()
516 .filter(|(_, queue)| !queue.is_empty())
517 .map(|(&priority, _)| priority)
518 .max()
519 }?;
520
521 if let Some(queue) = self.events.get_mut(&priority_to_dequeue) {
523 if let Some(event) = queue.pop_front() {
524 self.total_count -= 1;
525 if queue.is_empty() {
526 self.events.remove(&priority_to_dequeue);
527 }
528 return Some(event);
529 }
530 }
531
532 None
533 }
534
535 pub fn is_empty(&self) -> bool {
537 self.total_count == 0
538 }
539
540 pub fn len(&self) -> usize {
542 self.total_count
543 }
544}
545
546pub struct EventManager {
548 groups: Mutex<HashMap<GroupId, Vec<u32>>>,
550 sessions: Mutex<HashMap<SessionId, Vec<u32>>>,
552 named_groups: Mutex<HashMap<String, Vec<u32>>>,
554
555 configs: Mutex<HashMap<u32, DeliveryConfig>>,
557
558 task_filters: Mutex<HashMap<u32, Vec<(usize, EventFilter)>>>,
560
561 #[allow(dead_code)]
563 next_event_id: Mutex<u64>,
564
565 channels: Mutex<HashMap<String, Arc<EventChannelObject>>>,
567}
568
569impl EventManager {
570 pub fn new() -> Self {
572 Self {
573 groups: Mutex::new(HashMap::new()),
574 sessions: Mutex::new(HashMap::new()),
575 named_groups: Mutex::new(HashMap::new()),
576 configs: Mutex::new(HashMap::new()),
577 task_filters: Mutex::new(HashMap::new()),
578 next_event_id: Mutex::new(1),
579 channels: Mutex::new(HashMap::new()),
580 }
581 }
582
583 pub fn get_manager() -> &'static EventManager {
585 static INSTANCE: spin::once::Once<EventManager> = spin::once::Once::new();
586 INSTANCE.call_once(|| EventManager::new())
587 }
588
589 fn get_current_task_id(&self) -> Option<u32> {
591 #[cfg(test)]
592 {
593 return Some(1);
595 }
596 #[cfg(not(test))]
597 {
598 let cpu = crate::arch::get_cpu();
599 let cpu_id = cpu.get_cpuid();
600 let sched = crate::sched::scheduler::get_scheduler();
601 if let Some(task) = sched.get_current_task(cpu_id) {
602 return Some(task.get_id() as u32);
603 }
604 None
605 }
606 }
607
608 pub fn create_channel(&self, name: String) -> crate::object::KernelObject {
613 let mut channels = self.channels.lock();
614
615 let channel = channels
616 .entry(name.clone())
617 .or_insert_with(|| Arc::new(EventChannelObject::new(name.clone())))
618 .clone();
619
620 crate::object::KernelObject::EventChannel(channel)
621 }
622
623 pub fn create_subscription(
628 &self,
629 channel_name: String,
630 task_id: u32,
631 ) -> Result<crate::object::KernelObject, EventError> {
632 let mut channels = self.channels.lock();
634 let channel = channels
635 .entry(channel_name.clone())
636 .or_insert_with(|| Arc::new(EventChannelObject::new(channel_name.clone())))
637 .clone();
638 drop(channels);
639
640 let subscription = channel.create_subscription(task_id)?;
642 Ok(crate::object::KernelObject::EventSubscription(subscription))
643 }
644
645 pub fn send_event(&self, mut event: Event) -> Result<(), EventError> {
647 if event.metadata.sender.is_none() {
649 event.metadata.sender = self.get_current_task_id();
650 }
651 event.metadata.timestamp = crate::timer::get_tick();
653
654 match event.delivery.clone() {
655 EventDelivery::Direct {
656 target,
657 priority,
658 reliable,
659 } => self.deliver_direct(event, target, priority, reliable),
660 EventDelivery::Channel {
661 channel_id,
662 create_if_missing,
663 priority,
664 } => self.deliver_to_channel(event, &channel_id, create_if_missing, priority),
665 EventDelivery::Group {
666 group_target,
667 priority,
668 reliable,
669 } => self.deliver_to_group(event, &group_target, priority, reliable),
670 EventDelivery::Broadcast { priority, reliable } => {
671 self.deliver_broadcast(event, priority, reliable)
672 }
673 }
674 }
675
676 pub fn register_filter(&self, task_id: u32, filter: EventFilter) -> Result<(), EventError> {
678 let mut task_filters = self.task_filters.lock();
679 let filters = task_filters.entry(task_id).or_insert_with(Vec::new);
680 let handler_id = filters.len();
682 filters.push((handler_id, filter));
683 Ok(())
684 }
685
686 pub fn register_filter_with_id(
688 &self,
689 task_id: u32,
690 handler_id: usize,
691 filter: EventFilter,
692 ) -> Result<(), EventError> {
693 let mut task_filters = self.task_filters.lock();
694 let filters = task_filters.entry(task_id).or_insert_with(Vec::new);
695 if let Some(slot) = filters.iter_mut().find(|(hid, _)| *hid == handler_id) {
697 *slot = (handler_id, filter);
698 } else {
699 filters.push((handler_id, filter));
700 }
701 Ok(())
702 }
703
704 pub fn unregister_filter_by_id(
706 &self,
707 task_id: u32,
708 handler_id: usize,
709 ) -> Result<(), EventError> {
710 let mut task_filters = self.task_filters.lock();
711 if let Some(filters) = task_filters.get_mut(&task_id) {
712 filters.retain(|(hid, _)| *hid != handler_id);
713 }
714 Ok(())
715 }
716
717 pub fn get_filters_for_task(&self, task_id: u32) -> Vec<(usize, EventFilter)> {
719 let task_filters = self.task_filters.lock();
720 task_filters.get(&task_id).cloned().unwrap_or_default()
721 }
722
723 pub fn clear_filters(&self, task_id: u32) -> Result<(), EventError> {
725 let mut task_filters = self.task_filters.lock();
726 task_filters.remove(&task_id);
727 Ok(())
728 }
729
730 pub fn subscribe_channel(&self, channel: &str) -> Result<(), EventError> {
732 let current_task_id = self
734 .get_current_task_id()
735 .ok_or_else(|| EventError::Other("No current task".into()))?;
736
737 let mut channels = self.channels.lock();
739 let channel_obj = channels
740 .entry(channel.into())
741 .or_insert_with(|| Arc::new(EventChannelObject::new(channel.into())))
742 .clone();
743 drop(channels);
744
745 let _ = channel_obj.subscribe(current_task_id)?;
747 Ok(())
748 }
749
750 pub fn unsubscribe_channel(&self, channel: &str) -> Result<(), EventError> {
752 let current_task_id = self
753 .get_current_task_id()
754 .ok_or_else(|| EventError::Other("No current task".into()))?;
755
756 let channels = self.channels.lock();
757 if let Some(channel_obj) = channels.get(channel) {
758 channel_obj.unsubscribe(current_task_id)
759 } else {
760 Err(EventError::ChannelNotFound)
761 }
762 }
763
764 pub fn join_group(&self, group_id: GroupId) -> Result<(), EventError> {
766 let current_task_id = self
767 .get_current_task_id()
768 .ok_or_else(|| EventError::Other("No current task".into()))?;
769
770 let mut groups = self.groups.lock();
771 let group_members = groups.entry(group_id).or_insert_with(Vec::new);
772
773 if !group_members.contains(¤t_task_id) {
774 group_members.push(current_task_id);
775 }
776
777 Ok(())
778 }
779
780 pub fn leave_group(&self, group_id: GroupId) -> Result<(), EventError> {
782 let current_task_id = self
783 .get_current_task_id()
784 .ok_or_else(|| EventError::Other("No current task".into()))?;
785
786 let mut groups = self.groups.lock();
787 if let Some(group_members) = groups.get_mut(&group_id) {
788 group_members.retain(|&task_id| task_id != current_task_id);
789 }
790
791 Ok(())
792 }
793
794 pub fn join_session(&self, session_id: SessionId) -> Result<(), EventError> {
796 let current_task_id = self
797 .get_current_task_id()
798 .ok_or_else(|| EventError::Other("No current task".into()))?;
799 let mut sessions = self.sessions.lock();
800 let members = sessions.entry(session_id).or_insert_with(Vec::new);
801 if !members.contains(¤t_task_id) {
802 members.push(current_task_id);
803 }
804 Ok(())
805 }
806
807 pub fn leave_session(&self, session_id: SessionId) -> Result<(), EventError> {
809 let current_task_id = self
810 .get_current_task_id()
811 .ok_or_else(|| EventError::Other("No current task".into()))?;
812 let mut sessions = self.sessions.lock();
813 if let Some(members) = sessions.get_mut(&session_id) {
814 members.retain(|&tid| tid != current_task_id);
815 }
816 Ok(())
817 }
818
819 pub fn join_named_group(&self, name: String) -> Result<(), EventError> {
821 let current_task_id = self
822 .get_current_task_id()
823 .ok_or_else(|| EventError::Other("No current task".into()))?;
824 let mut named = self.named_groups.lock();
825 let members = named.entry(name).or_insert_with(Vec::new);
826 if !members.contains(¤t_task_id) {
827 members.push(current_task_id);
828 }
829 Ok(())
830 }
831
832 pub fn leave_named_group(&self, name: &str) -> Result<(), EventError> {
834 let current_task_id = self
835 .get_current_task_id()
836 .ok_or_else(|| EventError::Other("No current task".into()))?;
837 let mut named = self.named_groups.lock();
838 if let Some(members) = named.get_mut(name) {
839 members.retain(|&tid| tid != current_task_id);
840 }
841 Ok(())
842 }
843
844 pub fn configure_delivery(&self, config: DeliveryConfig) -> Result<(), EventError> {
846 let current_task_id = self
847 .get_current_task_id()
848 .ok_or_else(|| EventError::Other("No current task".into()))?;
849
850 let mut configs = self.configs.lock();
851 configs.insert(current_task_id, config);
852
853 Ok(())
854 }
855
856 fn get_task_config_or_default(&self, task_id: u32) -> DeliveryConfig {
858 self.configs
859 .lock()
860 .get(&task_id)
861 .cloned()
862 .unwrap_or_else(DeliveryConfig::default)
863 }
864
865 fn handle_delivery_failure(&self, sender: Option<u32>, err: &EventError, event: &Event) {
867 let policy = match sender {
869 Some(sid) => self.get_task_config_or_default(sid).failure_policy.clone(),
870 None => FailurePolicy::Log,
871 };
872
873 match policy {
874 FailurePolicy::Ignore => { }
875 FailurePolicy::Log => {
876 crate::early_println!(
877 "[EventManager] Delivery failure: {:?}, sender={:?}, delivery={:?}",
878 err,
879 sender,
880 event.delivery
881 );
882 }
883 FailurePolicy::NotifySender => {
884 if let Some(sid) = sender {
886 let notice = Event::direct_custom(
887 sid,
888 "system".into(),
889 0x1001,
890 EventPriority::Low,
891 false,
892 EventPayload::String(format!("Delivery failed: {:?}", err)),
893 );
894 let _ = self.deliver_to_task(sid, notice);
895 } else {
896 crate::early_println!(
898 "[EventManager] Delivery failure without sender: {:?}",
899 err
900 );
901 }
902 }
903 FailurePolicy::SystemEvent => {
904 crate::early_println!(
906 "[EventManager] SystemEvent policy: delivery failure: {:?}, sender={:?}",
907 err,
908 sender
909 );
910 }
911 }
912 }
913
914 fn deliver_direct(
918 &self,
919 event: Event,
920 target: TaskId,
921 _priority: EventPriority,
922 _reliable: bool,
923 ) -> Result<(), EventError> {
924 let mut result = self.deliver_to_task(target, event.clone());
926 if result.is_err() && _reliable {
927 let retries = match event.metadata.sender {
929 Some(sender) => self.get_task_config_or_default(sender).retry_count,
930 None => DeliveryConfig::default().retry_count,
931 };
932 let mut attempts = 0u32;
933 while attempts < retries {
934 result = self.deliver_to_task(target, event.clone());
936 if result.is_ok() {
937 break;
938 }
939 attempts += 1;
940 }
941 if let Err(ref e) = result {
942 self.handle_delivery_failure(event.metadata.sender, e, &event);
943 }
944 } else if let Err(ref e) = result {
945 self.handle_delivery_failure(event.metadata.sender, e, &event);
947 }
948 result
949 }
950
951 fn deliver_to_channel(
953 &self,
954 event: Event,
955 channel_id: &str,
956 create_if_missing: bool,
957 _priority: EventPriority,
958 ) -> Result<(), EventError> {
959 let mut channels = self.channels.lock();
960
961 if let Some(channel) = channels.get(channel_id) {
962 let channel = channel.clone();
963 drop(channels);
964 let subscribers = channel.get_subscribers();
966 for task_id in subscribers {
967 if let Err(e) = self.deliver_to_task(task_id, event.clone()) {
968 self.handle_delivery_failure(event.metadata.sender, &e, &event);
969 }
970 }
971 Ok(())
972 } else if create_if_missing {
973 let channel = Arc::new(EventChannelObject::new(channel_id.into()));
975 channels.insert(channel_id.into(), channel.clone());
976 drop(channels);
977 let subscribers = channel.get_subscribers();
978 for task_id in subscribers {
979 if let Err(e) = self.deliver_to_task(task_id, event.clone()) {
980 self.handle_delivery_failure(event.metadata.sender, &e, &event);
981 }
982 }
983 Ok(())
984 } else {
985 Err(EventError::ChannelNotFound)
986 }
987 }
988
989 fn deliver_to_group(
991 &self,
992 event: Event,
993 group_target: &GroupTarget,
994 _priority: EventPriority,
995 _reliable: bool,
996 ) -> Result<(), EventError> {
997 match group_target {
998 GroupTarget::TaskGroup(group_id) => {
999 let groups = self.groups.lock();
1000 if let Some(members) = groups.get(group_id) {
1001 let targets: alloc::vec::Vec<u32> = members.iter().cloned().collect();
1002 drop(groups);
1003 for &task_id in &targets {
1004 if let Err(e) = self.deliver_to_task(task_id, event.clone()) {
1005 self.handle_delivery_failure(event.metadata.sender, &e, &event);
1006 }
1007 }
1008 Ok(())
1009 } else {
1010 Err(EventError::GroupNotFound)
1011 }
1012 }
1013 GroupTarget::AllTasks => {
1014 let sched = crate::sched::scheduler::get_scheduler();
1015 let all_ids: alloc::vec::Vec<u32> = sched
1016 .get_all_task_ids()
1017 .into_iter()
1018 .map(|x| x as u32)
1019 .collect();
1020 for tid in all_ids {
1021 if let Err(e) = self.deliver_to_task(tid, event.clone()) {
1022 self.handle_delivery_failure(event.metadata.sender, &e, &event);
1023 }
1024 }
1025 Ok(())
1026 }
1027 GroupTarget::Session(session_id) => {
1028 let sessions = self.sessions.lock();
1029 if let Some(members) = sessions.get(session_id) {
1030 let targets: alloc::vec::Vec<u32> = members.iter().cloned().collect();
1031 drop(sessions);
1032 for &task_id in &targets {
1033 if let Err(e) = self.deliver_to_task(task_id, event.clone()) {
1034 self.handle_delivery_failure(event.metadata.sender, &e, &event);
1035 }
1036 }
1037 Ok(())
1038 } else {
1039 Err(EventError::GroupNotFound)
1040 }
1041 }
1042 GroupTarget::Custom(name) => {
1043 let named = self.named_groups.lock();
1044 if let Some(members) = named.get(name) {
1045 let targets: alloc::vec::Vec<u32> = members.iter().cloned().collect();
1046 drop(named);
1047 for &task_id in &targets {
1048 if let Err(e) = self.deliver_to_task(task_id, event.clone()) {
1049 self.handle_delivery_failure(event.metadata.sender, &e, &event);
1050 }
1051 }
1052 Ok(())
1053 } else {
1054 Err(EventError::GroupNotFound)
1055 }
1056 }
1057 }
1058 }
1059
1060 fn deliver_broadcast(
1062 &self,
1063 event: Event,
1064 _priority: EventPriority,
1065 _reliable: bool,
1066 ) -> Result<(), EventError> {
1067 let sched = crate::sched::scheduler::get_scheduler();
1069 let all_ids: alloc::vec::Vec<u32> = sched
1070 .get_all_task_ids()
1071 .into_iter()
1072 .map(|x| x as u32)
1073 .collect();
1074 for tid in all_ids {
1075 if let Err(e) = self.deliver_to_task(tid, event.clone()) {
1076 self.handle_delivery_failure(event.metadata.sender, &e, &event);
1077 }
1078 }
1079 Ok(())
1080 }
1081
1082 #[cfg(not(test))]
1084 pub fn deliver_to_task(&self, task_id: u32, event: Event) -> Result<(), EventError> {
1085 let task_filters = self.task_filters.lock();
1087 if let Some(filters) = task_filters.get(&task_id) {
1088 if !filters.is_empty() {
1090 let matches = filters.iter().any(|(_, filter)| filter.matches(&event));
1091 if !matches {
1092 return Ok(());
1094 }
1095 }
1096 }
1098 drop(task_filters); if let Some(task) =
1102 crate::sched::scheduler::get_scheduler().get_task_by_id(task_id as usize)
1103 {
1104 let cfg = self.get_task_config_or_default(task_id);
1106 let mut queue = task.event_queue.lock();
1107 if queue.len() >= cfg.buffer_size {
1108 return Err(EventError::BufferFull);
1109 }
1110 queue.enqueue(event);
1112 Ok(())
1113 } else {
1114 Err(EventError::TargetNotFound)
1115 }
1116 }
1117 #[cfg(test)]
1118 pub fn deliver_to_task(&self, _task_id: u32, _event: Event) -> Result<(), EventError> {
1119 Ok(())
1122 }
1123
1124 #[deprecated(note = "Use Task.process_pending_events() instead")]
1127 pub fn dequeue_event_for_task(&self, task_id: u32) -> Option<Event> {
1128 if let Some(task) =
1129 crate::sched::scheduler::get_scheduler().get_task_by_id(task_id as usize)
1130 {
1131 let mut queue = task.event_queue.lock();
1132 queue.dequeue()
1133 } else {
1134 None
1135 }
1136 }
1137
1138 pub fn get_pending_event_count(&self, task_id: u32) -> usize {
1140 if let Some(task) =
1141 crate::sched::scheduler::get_scheduler().get_task_by_id(task_id as usize)
1142 {
1143 let queue = task.event_queue.lock();
1144 queue.len()
1145 } else {
1146 0
1147 }
1148 }
1149
1150 pub fn has_pending_events(&self, task_id: u32) -> bool {
1152 if let Some(task) =
1153 crate::sched::scheduler::get_scheduler().get_task_by_id(task_id as usize)
1154 {
1155 let queue = task.event_queue.lock();
1156 !queue.is_empty()
1157 } else {
1158 false
1159 }
1160 }
1161
1162 pub fn get_channel(&self, name: &str) -> Option<alloc::sync::Arc<EventChannelObject>> {
1164 self.channels.lock().get(name).cloned()
1165 }
1166
1167 pub fn remove_subscription_from_channel(
1169 &self,
1170 channel_name: &str,
1171 subscription_id: &str,
1172 ) -> Result<(), EventError> {
1173 if let Some(ch) = self.channels.lock().get(channel_name).cloned() {
1174 ch.remove_subscription(subscription_id)
1175 } else {
1176 Err(EventError::ChannelNotFound)
1177 }
1178 }
1179}
1180
1181impl Event {
1183 pub fn new(delivery: EventDelivery, content: EventContent, payload: EventPayload) -> Self {
1185 let priority = match &delivery {
1187 EventDelivery::Direct { priority, .. } => *priority,
1188 EventDelivery::Channel { priority, .. } => *priority,
1189 EventDelivery::Group { priority, .. } => *priority,
1190 EventDelivery::Broadcast { priority, .. } => *priority,
1191 };
1192
1193 Self {
1194 delivery,
1195 content,
1196 payload,
1197 metadata: EventMetadata::with_priority(priority),
1198 }
1199 }
1200
1201 pub fn direct_process_control(
1203 target: TaskId,
1204 ptype: ProcessControlType,
1205 priority: EventPriority,
1206 reliable: bool,
1207 ) -> Self {
1208 Self::new(
1209 EventDelivery::Direct {
1210 target,
1211 priority,
1212 reliable,
1213 },
1214 EventContent::ProcessControl(ptype),
1215 EventPayload::Empty,
1216 )
1217 }
1218
1219 pub fn direct_custom(
1221 target: TaskId,
1222 namespace: String,
1223 event_id: u32,
1224 priority: EventPriority,
1225 reliable: bool,
1226 payload: EventPayload,
1227 ) -> Self {
1228 Self::new(
1229 EventDelivery::Direct {
1230 target,
1231 priority,
1232 reliable,
1233 },
1234 EventContent::Custom {
1235 namespace,
1236 event_id,
1237 },
1238 payload,
1239 )
1240 }
1241
1242 pub fn channel(
1244 channel_id: String,
1245 content: EventContent,
1246 create_if_missing: bool,
1247 priority: EventPriority,
1248 payload: EventPayload,
1249 ) -> Self {
1250 Self::new(
1251 EventDelivery::Channel {
1252 channel_id,
1253 create_if_missing,
1254 priority,
1255 },
1256 content,
1257 payload,
1258 )
1259 }
1260
1261 pub fn group(
1263 group_target: GroupTarget,
1264 content: EventContent,
1265 priority: EventPriority,
1266 reliable: bool,
1267 payload: EventPayload,
1268 ) -> Self {
1269 Self::new(
1270 EventDelivery::Group {
1271 group_target,
1272 priority,
1273 reliable,
1274 },
1275 content,
1276 payload,
1277 )
1278 }
1279
1280 pub fn broadcast(
1282 content: EventContent,
1283 priority: EventPriority,
1284 reliable: bool,
1285 payload: EventPayload,
1286 ) -> Self {
1287 Self::new(
1288 EventDelivery::Broadcast { priority, reliable },
1289 content,
1290 payload,
1291 )
1292 }
1293
1294 pub fn immediate_process_control(task_id: u32, ptype: ProcessControlType) -> Self {
1298 Self::direct_process_control(task_id, ptype, EventPriority::High, true)
1299 }
1300
1301 pub fn notification_to_task(task_id: u32, ntype: NotificationType) -> Self {
1303 Self::new(
1304 EventDelivery::Direct {
1305 target: task_id,
1306 priority: EventPriority::Normal,
1307 reliable: false,
1308 },
1309 EventContent::Notification(ntype),
1310 EventPayload::Empty,
1311 )
1312 }
1313
1314 pub fn new_channel_event(channel: &str, content: EventContent, payload: EventPayload) -> Self {
1316 Self::channel(
1317 channel.into(),
1318 content,
1319 false,
1320 EventPriority::Normal,
1321 payload,
1322 )
1323 }
1324
1325 pub fn new_group_broadcast(
1327 group_target: GroupTarget,
1328 content: EventContent,
1329 payload: EventPayload,
1330 ) -> Self {
1331 Self::group(group_target, content, EventPriority::Normal, false, payload)
1332 }
1333
1334 pub fn immediate_broadcast(content: EventContent) -> Self {
1336 Self::broadcast(content, EventPriority::High, true, EventPayload::Empty)
1337 }
1338
1339 pub fn notification_to_group(group_id: GroupId, ntype: NotificationType) -> Self {
1341 Self::group(
1342 GroupTarget::TaskGroup(group_id),
1343 EventContent::Notification(ntype),
1344 EventPriority::Normal,
1345 false,
1346 EventPayload::Empty,
1347 )
1348 }
1349}
1350
1351pub struct EventChannelObject {
1353 name: String,
1354 subscriptions: Mutex<HashMap<String, Arc<EventSubscriptionObject>>>,
1356 #[allow(dead_code)]
1357 manager_ref: &'static EventManager,
1358}
1359
1360impl EventChannelObject {
1361 pub fn new(name: String) -> Self {
1362 Self {
1363 name,
1364 subscriptions: Mutex::new(HashMap::new()),
1365 manager_ref: EventManager::get_manager(),
1366 }
1367 }
1368
1369 pub fn name(&self) -> &str {
1370 &self.name
1371 }
1372
1373 pub fn create_subscription(
1375 &self,
1376 task_id: u32,
1377 ) -> Result<Arc<EventSubscriptionObject>, EventError> {
1378 let subscription_id = format!("sub_{}_task_{}", self.name, task_id);
1379 let subscription = Arc::new(EventSubscriptionObject::new(
1380 subscription_id.clone(),
1381 self.name.clone(),
1382 task_id,
1383 ));
1384
1385 let mut subscriptions = self.subscriptions.lock();
1386 subscriptions.insert(subscription_id, subscription.clone());
1387
1388 Ok(subscription)
1389 }
1390
1391 pub fn remove_subscription(&self, subscription_id: &str) -> Result<(), EventError> {
1393 let mut subscriptions = self.subscriptions.lock();
1394 subscriptions.remove(subscription_id);
1395 Ok(())
1396 }
1397
1398 pub fn get_subscriptions(&self) -> Vec<Arc<EventSubscriptionObject>> {
1400 self.subscriptions.lock().values().cloned().collect()
1401 }
1402
1403 pub fn get_subscribers(&self) -> Vec<u32> {
1405 self.subscriptions
1406 .lock()
1407 .values()
1408 .map(|sub| sub.task_id())
1409 .collect()
1410 }
1411
1412 pub fn broadcast_to_subscribers(&self, event: Event) -> Result<(), EventError> {
1414 let subscribers = self.get_subscribers();
1415 for task_id in subscribers {
1416 let _ = self.manager_ref.deliver_to_task(task_id, event.clone());
1417 }
1418 Ok(())
1419 }
1420
1421 pub fn subscribe(&self, task_id: u32) -> Result<Arc<EventSubscriptionObject>, EventError> {
1423 self.create_subscription(task_id)
1424 }
1425
1426 pub fn unsubscribe(&self, task_id: u32) -> Result<(), EventError> {
1428 let mut subscriptions = self.subscriptions.lock();
1429 subscriptions.retain(|_, sub| sub.task_id() != task_id);
1430 Ok(())
1431 }
1432
1433 pub fn get_subscription_by_id(
1435 &self,
1436 subscription_id: &str,
1437 ) -> Option<Arc<EventSubscriptionObject>> {
1438 self.subscriptions.lock().get(subscription_id).cloned()
1439 }
1440}
1441
1442pub struct EventSubscriptionObject {
1444 subscription_id: String,
1445 channel_name: String,
1446 task_id: u32,
1447 filters: Mutex<HashMap<usize, EventFilter>>,
1449}
1450
1451impl EventSubscriptionObject {
1452 pub fn new(subscription_id: String, channel_name: String, task_id: u32) -> Self {
1453 Self {
1454 subscription_id,
1455 channel_name,
1456 task_id,
1457 filters: Mutex::new(HashMap::new()),
1458 }
1459 }
1460
1461 pub fn subscription_id(&self) -> &str {
1462 &self.subscription_id
1463 }
1464
1465 pub fn channel_name(&self) -> &str {
1466 &self.channel_name
1467 }
1468
1469 pub fn task_id(&self) -> u32 {
1470 self.task_id
1471 }
1472}
1473
1474impl crate::object::capability::EventSender for EventChannelObject {
1475 fn send_event(&self, event: Event) -> Result<(), &'static str> {
1476 self.manager_ref
1477 .send_event(event)
1478 .map_err(|_| "Failed to send event")?;
1479 Ok(())
1480 }
1481}
1482
1483impl crate::object::capability::EventReceiver for EventChannelObject {
1484 fn has_pending_events(&self) -> bool {
1485 let subscriber_ids = self.get_subscribers();
1487 for tid in subscriber_ids {
1488 if let Some(task) =
1489 crate::sched::scheduler::get_scheduler().get_task_by_id(tid as usize)
1490 {
1491 let queue = task.event_queue.lock();
1492 for (_prio, q) in queue.events.iter() {
1493 for ev in q.iter() {
1494 if let EventDelivery::Channel { channel_id, .. } = &ev.delivery {
1495 if channel_id == &self.name {
1496 return true;
1497 }
1498 }
1499 }
1500 }
1501 }
1502 }
1503 false
1504 }
1505}
1506
1507impl crate::object::capability::EventReceiver for EventSubscriptionObject {
1508 fn has_pending_events(&self) -> bool {
1509 if let Some(task) =
1511 crate::sched::scheduler::get_scheduler().get_task_by_id(self.task_id as usize)
1512 {
1513 let queue = task.event_queue.lock();
1514 let channel_name = self.channel_name.as_str();
1515 let local_filters: alloc::vec::Vec<EventFilter> = {
1517 let guard = self.filters.lock();
1518 guard.values().cloned().collect()
1519 };
1520 let use_filters = !local_filters.is_empty();
1521
1522 for (_prio, q) in queue.events.iter() {
1523 for ev in q.iter() {
1524 if let EventDelivery::Channel { channel_id, .. } = &ev.delivery {
1525 if channel_id == channel_name {
1526 if use_filters {
1527 if local_filters.iter().any(|f| f.matches(ev)) {
1528 return true;
1529 }
1530 } else {
1531 return true; }
1533 }
1534 }
1535 }
1536 }
1537 }
1538 false
1539 }
1540}
1541
1542impl crate::object::capability::EventSubscriber for EventSubscriptionObject {
1543 fn register_filter(&self, filter: EventFilter, handler_id: usize) -> Result<(), &'static str> {
1544 self.filters.lock().insert(handler_id, filter);
1546 Ok(())
1547 }
1548
1549 fn unregister_filter(&self, handler_id: usize) -> Result<(), &'static str> {
1550 self.filters.lock().remove(&handler_id);
1551 Ok(())
1552 }
1553
1554 fn get_filters(&self) -> Vec<(usize, EventFilter)> {
1555 self.filters
1557 .lock()
1558 .iter()
1559 .map(|(k, v)| (*k, v.clone()))
1560 .collect()
1561 }
1562}
1563
1564impl crate::object::capability::CloneOps for EventChannelObject {
1565 fn custom_clone(&self) -> crate::object::KernelObject {
1566 let mgr = EventManager::get_manager();
1568 if let Some(arc) = mgr.channels.lock().get(self.name()).cloned() {
1569 crate::object::KernelObject::EventChannel(arc)
1570 } else {
1571 let ko = mgr.create_channel(self.name.clone());
1573 ko
1574 }
1575 }
1576}
1577
1578impl crate::object::capability::CloneOps for EventSubscriptionObject {
1579 fn custom_clone(&self) -> crate::object::KernelObject {
1580 let mgr = EventManager::get_manager();
1581 if let Some(ch) = mgr.channels.lock().get(self.channel_name()).cloned() {
1583 if let Some(sub) = ch.get_subscription_by_id(self.subscription_id()) {
1584 return crate::object::KernelObject::EventSubscription(sub);
1585 }
1586 }
1587 let fallback = alloc::sync::Arc::new(EventSubscriptionObject::new(
1589 self.subscription_id.clone(),
1590 self.channel_name.clone(),
1591 self.task_id,
1592 ));
1593 crate::object::KernelObject::EventSubscription(fallback)
1594 }
1595}
1596
1597fn generate_event_id() -> u64 {
1599 static COUNTER: Mutex<u64> = Mutex::new(1);
1600 let mut counter = COUNTER.lock();
1601 let id = *counter;
1602 *counter += 1;
1603 id
1604}
1605
1606#[cfg(test)]
1607mod tests {
1608 use super::*;
1609 use crate::object::capability::EventSubscriber;
1610 use alloc::string::ToString; #[test_case]
1613 fn test_event_creation() {
1614 let event = Event::new(
1615 EventDelivery::Direct {
1616 target: 123,
1617 priority: EventPriority::High,
1618 reliable: true,
1619 },
1620 EventContent::ProcessControl(ProcessControlType::Terminate),
1621 EventPayload::Empty,
1622 );
1623
1624 match event.delivery {
1625 EventDelivery::Direct {
1626 target,
1627 priority,
1628 reliable,
1629 } => {
1630 assert_eq!(target, 123);
1631 assert_eq!(priority, EventPriority::High);
1632 assert_eq!(reliable, true);
1633 }
1634 _ => panic!("Wrong delivery type"),
1635 }
1636
1637 match event.content {
1638 EventContent::ProcessControl(ProcessControlType::Terminate) => {}
1639 _ => panic!("Wrong content type"),
1640 }
1641
1642 assert_eq!(event.metadata.priority, EventPriority::High);
1643 }
1644
1645 #[test_case]
1646 fn test_event_convenience_functions() {
1647 let event = Event::direct_process_control(
1649 42,
1650 ProcessControlType::Kill,
1651 EventPriority::Critical,
1652 true,
1653 );
1654
1655 match event.delivery {
1656 EventDelivery::Direct {
1657 target,
1658 priority,
1659 reliable,
1660 } => {
1661 assert_eq!(target, 42);
1662 assert_eq!(priority, EventPriority::Critical);
1663 assert_eq!(reliable, true);
1664 }
1665 _ => panic!("Wrong delivery type"),
1666 }
1667
1668 match event.content {
1669 EventContent::ProcessControl(ProcessControlType::Kill) => {}
1670 _ => panic!("Wrong content type"),
1671 }
1672
1673 let event = Event::immediate_process_control(99, ProcessControlType::Stop);
1675 match event.delivery {
1676 EventDelivery::Direct {
1677 target,
1678 priority,
1679 reliable,
1680 } => {
1681 assert_eq!(target, 99);
1682 assert_eq!(priority, EventPriority::High);
1683 assert_eq!(reliable, true);
1684 }
1685 _ => panic!("Wrong delivery type"),
1686 }
1687 }
1688
1689 #[test_case]
1690 fn test_event_channel_creation() {
1691 let event = Event::channel(
1692 "test_channel".to_string(),
1693 EventContent::Notification(NotificationType::TaskCompleted),
1694 true,
1695 EventPriority::Normal,
1696 EventPayload::Empty,
1697 );
1698
1699 match event.delivery {
1700 EventDelivery::Channel {
1701 channel_id,
1702 create_if_missing,
1703 priority,
1704 } => {
1705 assert_eq!(channel_id, "test_channel");
1706 assert_eq!(create_if_missing, true);
1707 assert_eq!(priority, EventPriority::Normal);
1708 }
1709 _ => panic!("Wrong delivery type"),
1710 }
1711
1712 match event.content {
1713 EventContent::Notification(NotificationType::TaskCompleted) => {}
1714 _ => panic!("Wrong content type"),
1715 }
1716 }
1717
1718 #[test_case]
1719 fn test_event_group_creation() {
1720 let event = Event::group(
1721 GroupTarget::AllTasks,
1722 EventContent::Message {
1723 message_type: 42,
1724 category: MessageCategory::Control,
1725 },
1726 EventPriority::Low,
1727 false,
1728 EventPayload::Empty,
1729 );
1730
1731 match event.delivery {
1732 EventDelivery::Group {
1733 group_target,
1734 priority,
1735 reliable,
1736 } => {
1737 assert_eq!(group_target, GroupTarget::AllTasks);
1738 assert_eq!(priority, EventPriority::Low);
1739 assert_eq!(reliable, false);
1740 }
1741 _ => panic!("Wrong delivery type"),
1742 }
1743
1744 match event.content {
1745 EventContent::Message {
1746 message_type,
1747 category,
1748 } => {
1749 assert_eq!(message_type, 42);
1750 assert_eq!(category, MessageCategory::Control);
1751 }
1752 _ => panic!("Wrong content type"),
1753 }
1754 }
1755
1756 #[test_case]
1757 fn test_event_broadcast_creation() {
1758 let event = Event::broadcast(
1759 EventContent::Custom {
1760 namespace: "test_namespace".to_string(),
1761 event_id: 100,
1762 },
1763 EventPriority::Normal,
1764 true,
1765 EventPayload::Bytes(alloc::vec![1, 2, 3, 4]),
1766 );
1767
1768 match event.delivery {
1769 EventDelivery::Broadcast { priority, reliable } => {
1770 assert_eq!(priority, EventPriority::Normal);
1771 assert_eq!(reliable, true);
1772 }
1773 _ => panic!("Wrong delivery type"),
1774 }
1775
1776 match event.content {
1777 EventContent::Custom {
1778 namespace,
1779 event_id,
1780 } => {
1781 assert_eq!(namespace, "test_namespace");
1782 assert_eq!(event_id, 100);
1783 }
1784 _ => panic!("Wrong content type"),
1785 }
1786
1787 match event.payload {
1788 EventPayload::Bytes(data) => {
1789 assert_eq!(data, alloc::vec![1, 2, 3, 4]);
1790 }
1791 _ => panic!("Wrong payload type"),
1792 }
1793 }
1794
1795 #[test_case]
1796 fn test_event_filter_event_type() {
1797 let filter = EventFilter::EventType(EventTypeFilter::AnyDirect);
1798
1799 let direct_event = Event::direct_process_control(
1800 123,
1801 ProcessControlType::Terminate,
1802 EventPriority::High,
1803 true,
1804 );
1805
1806 let channel_event = Event::channel(
1807 "test".to_string(),
1808 EventContent::ProcessControl(ProcessControlType::Terminate),
1809 false,
1810 EventPriority::High,
1811 EventPayload::Empty,
1812 );
1813
1814 assert_eq!(filter.matches(&direct_event), true);
1815 assert_eq!(filter.matches(&channel_event), false);
1816 }
1817
1818 #[test_case]
1819 fn test_event_filter_channel() {
1820 let filter = EventFilter::Channel("test_channel".to_string());
1821
1822 let matching_event = Event::channel(
1823 "test_channel".to_string(),
1824 EventContent::Notification(NotificationType::TaskCompleted),
1825 false,
1826 EventPriority::Normal,
1827 EventPayload::Empty,
1828 );
1829
1830 let non_matching_event = Event::channel(
1831 "other_channel".to_string(),
1832 EventContent::Notification(NotificationType::TaskCompleted),
1833 false,
1834 EventPriority::Normal,
1835 EventPayload::Empty,
1836 );
1837
1838 assert_eq!(filter.matches(&matching_event), true);
1839 assert_eq!(filter.matches(&non_matching_event), false);
1840 }
1841
1842 #[test_case]
1843 fn test_event_filter_sender() {
1844 let filter = EventFilter::Sender(42);
1845
1846 let mut matching_event =
1847 Event::immediate_process_control(123, ProcessControlType::Terminate);
1848 matching_event.metadata.sender = Some(42);
1849
1850 let mut non_matching_event =
1851 Event::immediate_process_control(123, ProcessControlType::Terminate);
1852 non_matching_event.metadata.sender = Some(99);
1853
1854 assert_eq!(filter.matches(&matching_event), true);
1855 assert_eq!(filter.matches(&non_matching_event), false);
1856 }
1857
1858 #[test_case]
1859 fn test_task_event_queue_basic() {
1860 let mut queue = TaskEventQueue::new();
1861
1862 assert_eq!(queue.is_empty(), true);
1863 assert_eq!(queue.len(), 0);
1864
1865 let event = Event::immediate_process_control(123, ProcessControlType::Terminate);
1866 assert_eq!(queue.enqueue(event.clone()), true);
1867
1868 assert_eq!(queue.is_empty(), false);
1869 assert_eq!(queue.len(), 1);
1870
1871 let dequeued = queue.dequeue();
1872 assert!(dequeued.is_some());
1873
1874 assert_eq!(queue.is_empty(), true);
1875 assert_eq!(queue.len(), 0);
1876 }
1877
1878 #[test_case]
1879 fn test_task_event_queue_priority_ordering() {
1880 let mut queue = TaskEventQueue::new();
1881
1882 let low_event =
1884 Event::direct_process_control(1, ProcessControlType::Stop, EventPriority::Low, true);
1885 let critical_event = Event::direct_process_control(
1886 2,
1887 ProcessControlType::Kill,
1888 EventPriority::Critical,
1889 true,
1890 );
1891 let high_event = Event::direct_process_control(
1892 3,
1893 ProcessControlType::Terminate,
1894 EventPriority::High,
1895 true,
1896 );
1897 let normal_event = Event::direct_process_control(
1898 4,
1899 ProcessControlType::Continue,
1900 EventPriority::Normal,
1901 true,
1902 );
1903
1904 queue.enqueue(low_event);
1905 queue.enqueue(critical_event);
1906 queue.enqueue(high_event);
1907 queue.enqueue(normal_event);
1908
1909 assert_eq!(queue.len(), 4);
1910
1911 let first = queue.dequeue().unwrap();
1913 assert_eq!(first.metadata.priority, EventPriority::Critical);
1914
1915 let second = queue.dequeue().unwrap();
1916 assert_eq!(second.metadata.priority, EventPriority::High);
1917
1918 let third = queue.dequeue().unwrap();
1919 assert_eq!(third.metadata.priority, EventPriority::Normal);
1920
1921 let fourth = queue.dequeue().unwrap();
1922 assert_eq!(fourth.metadata.priority, EventPriority::Low);
1923
1924 assert_eq!(queue.len(), 0);
1925 }
1926
1927 #[test_case]
1928 fn test_event_manager_creation() {
1929 let manager = EventManager::new();
1930 assert!(manager.channels.lock().is_empty());
1931 }
1932
1933 #[test_case]
1934 fn test_process_control_type_variants_full() {
1935 let variants = [
1937 ProcessControlType::Terminate,
1938 ProcessControlType::Kill,
1939 ProcessControlType::Stop,
1940 ProcessControlType::Continue,
1941 ProcessControlType::Interrupt,
1942 ProcessControlType::Quit,
1943 ProcessControlType::Hangup,
1944 ProcessControlType::ChildExit,
1945 ProcessControlType::User(0),
1946 ];
1947
1948 for &variant in &variants {
1949 let event = Event::immediate_process_control(123, variant);
1950 match event.content {
1951 EventContent::ProcessControl(received_variant) => {
1952 assert_eq!(received_variant, variant);
1953 }
1954 _ => panic!("Wrong content type for variant {:?}", variant),
1955 }
1956 }
1957 }
1958
1959 #[test_case]
1960 fn test_notification_type_variants() {
1961 let variants = [
1963 NotificationType::TaskCompleted,
1964 NotificationType::MemoryLow,
1965 NotificationType::DeviceConnected,
1966 NotificationType::DeviceDisconnected,
1967 NotificationType::FilesystemFull,
1968 NotificationType::NetworkChange,
1969 ];
1970
1971 for &variant in &variants {
1972 let event = Event::notification_to_task(123, variant);
1973 match event.content {
1974 EventContent::Notification(received_variant) => {
1975 assert_eq!(received_variant, variant);
1976 }
1977 _ => panic!("Wrong content type for variant {:?}", variant),
1978 }
1979 }
1980 }
1981
1982 #[test_case]
1983 fn test_event_payload_variants() {
1984 let empty_event = Event::immediate_process_control(123, ProcessControlType::Terminate);
1986 match empty_event.payload {
1987 EventPayload::Empty => {}
1988 _ => panic!("Expected Empty payload"),
1989 }
1990
1991 let integer_event = Event::broadcast(
1993 EventContent::Custom {
1994 namespace: "test".to_string(),
1995 event_id: 42,
1996 },
1997 EventPriority::Normal,
1998 false,
1999 EventPayload::Integer(12345),
2000 );
2001 match integer_event.payload {
2002 EventPayload::Integer(value) => {
2003 assert_eq!(value, 12345);
2004 }
2005 _ => panic!("Expected Integer payload"),
2006 }
2007
2008 let data = alloc::vec![1, 2, 3, 4, 5];
2010 let bytes_event = Event::broadcast(
2011 EventContent::Custom {
2012 namespace: "test".to_string(),
2013 event_id: 43,
2014 },
2015 EventPriority::Normal,
2016 false,
2017 EventPayload::Bytes(data.clone()),
2018 );
2019 match bytes_event.payload {
2020 EventPayload::Bytes(received_data) => {
2021 assert_eq!(received_data, data);
2022 }
2023 _ => panic!("Expected Bytes payload"),
2024 }
2025
2026 let text = "Hello, World!".to_string();
2028 let string_event = Event::broadcast(
2029 EventContent::Custom {
2030 namespace: "test".to_string(),
2031 event_id: 44,
2032 },
2033 EventPriority::Normal,
2034 false,
2035 EventPayload::String(text.clone()),
2036 );
2037 match string_event.payload {
2038 EventPayload::String(received_text) => {
2039 assert_eq!(received_text, text);
2040 }
2041 _ => panic!("Expected String payload"),
2042 }
2043 }
2044
2045 #[test_case]
2046 fn test_process_control_type_variants() {
2047 let variants = [
2049 ProcessControlType::Kill,
2050 ProcessControlType::Terminate,
2051 ProcessControlType::Stop,
2052 ProcessControlType::Continue,
2053 ProcessControlType::Interrupt,
2054 ProcessControlType::Quit,
2055 ProcessControlType::Hangup,
2056 ProcessControlType::User(0),
2057 ProcessControlType::PipeBroken,
2058 ProcessControlType::Alarm,
2059 ProcessControlType::ChildExit,
2060 ProcessControlType::IoReady,
2061 ];
2062
2063 for &variant in &variants {
2064 let event = Event::immediate_process_control(123, variant);
2065 match event.content {
2066 EventContent::ProcessControl(received_variant) => {
2067 assert_eq!(received_variant, variant);
2068 }
2069 _ => panic!("Wrong content type for variant {:?}", variant),
2070 }
2071 }
2072 }
2073
2074 #[test_case]
2075 fn test_notification_type_variants_notification() {
2076 let variants = [
2078 NotificationType::TaskCompleted,
2079 NotificationType::MemoryLow,
2080 NotificationType::DeviceConnected,
2081 NotificationType::DeviceDisconnected,
2082 NotificationType::FilesystemFull,
2083 NotificationType::NetworkChange,
2084 ];
2085
2086 for &variant in &variants {
2087 let event = Event::notification_to_task(123, variant);
2088 match event.content {
2089 EventContent::Notification(received_variant) => {
2090 assert_eq!(received_variant, variant);
2091 }
2092 _ => panic!("Wrong content type for variant {:?}", variant),
2093 }
2094 }
2095 }
2096
2097 #[test_case]
2098 fn test_event_payload_variants_extended() {
2099 let empty_event = Event::immediate_process_control(123, ProcessControlType::Terminate);
2101 match empty_event.payload {
2102 EventPayload::Empty => {}
2103 _ => panic!("Expected Empty payload"),
2104 }
2105
2106 let integer_event = Event::broadcast(
2108 EventContent::Custom {
2109 namespace: "test".to_string(),
2110 event_id: 42,
2111 },
2112 EventPriority::Normal,
2113 false,
2114 EventPayload::Integer(12345),
2115 );
2116 match integer_event.payload {
2117 EventPayload::Integer(value) => {
2118 assert_eq!(value, 12345);
2119 }
2120 _ => panic!("Expected Integer payload"),
2121 }
2122
2123 let data = alloc::vec![1, 2, 3, 4, 5];
2125 let bytes_event = Event::broadcast(
2126 EventContent::Custom {
2127 namespace: "test".to_string(),
2128 event_id: 43,
2129 },
2130 EventPriority::Normal,
2131 false,
2132 EventPayload::Bytes(data.clone()),
2133 );
2134 match bytes_event.payload {
2135 EventPayload::Bytes(received_data) => {
2136 assert_eq!(received_data, data);
2137 }
2138 _ => panic!("Expected Bytes payload"),
2139 }
2140
2141 let text = "Hello, World!".to_string();
2143 let string_event = Event::broadcast(
2144 EventContent::Custom {
2145 namespace: "test".to_string(),
2146 event_id: 44,
2147 },
2148 EventPriority::Normal,
2149 false,
2150 EventPayload::String(text.clone()),
2151 );
2152 match string_event.payload {
2153 EventPayload::String(received_text) => {
2154 assert_eq!(received_text, text);
2155 }
2156 _ => panic!("Expected String payload"),
2157 }
2158 }
2159
2160 #[test_case]
2161 fn test_channel_subscription_management() {
2162 let manager = EventManager::get_manager();
2163 let ch = manager.create_channel("sub_test".to_string());
2165 let channel = match ch {
2166 crate::object::KernelObject::EventChannel(arc) => arc,
2167 _ => panic!("Expected EventChannel"),
2168 };
2169
2170 assert_eq!(channel.get_subscribers().len(), 0);
2172
2173 let _s1 = channel.subscribe(1).expect("subscribe(1)");
2175 let _s2 = channel.subscribe(2).expect("subscribe(2)");
2176 let mut subs = channel.get_subscribers();
2177 subs.sort();
2178 assert_eq!(subs, alloc::vec![1, 2]);
2179
2180 channel.unsubscribe(1).expect("unsubscribe(1)");
2182 let subs = channel.get_subscribers();
2183 assert_eq!(subs, alloc::vec![2]);
2184
2185 let ids: alloc::vec::Vec<String> = channel
2187 .get_subscriptions()
2188 .into_iter()
2189 .map(|s| s.subscription_id().to_string())
2190 .collect();
2191 for id in ids {
2192 channel.remove_subscription(&id).unwrap();
2193 }
2194 assert_eq!(channel.get_subscribers().len(), 0);
2195 }
2196
2197 #[test_case]
2198 fn test_event_manager_subscription_creation() {
2199 let manager = EventManager::get_manager();
2200 let ko = manager
2202 .create_subscription("mgr_sub_test".to_string(), 42)
2203 .expect("create_subscription");
2204 let sub = match ko {
2205 crate::object::KernelObject::EventSubscription(arc) => arc,
2206 _ => panic!("Expected EventSubscription"),
2207 };
2208 assert_eq!(sub.channel_name(), "mgr_sub_test");
2209 assert_eq!(sub.task_id(), 42);
2210
2211 let channels = manager.channels.lock();
2213 let ch = channels
2214 .get("mgr_sub_test")
2215 .expect("channel exists")
2216 .clone();
2217 drop(channels);
2218 let subs = ch.get_subscribers();
2219 assert_eq!(subs, alloc::vec![42]);
2220 }
2221
2222 #[test_case]
2223 fn test_channel_broadcast_no_error() {
2224 let manager = EventManager::get_manager();
2225 let ch = manager.create_channel("bc_test".to_string());
2226 let channel = match ch {
2227 crate::object::KernelObject::EventChannel(arc) => arc,
2228 _ => panic!("Expected EventChannel"),
2229 };
2230 let _ = channel.subscribe(100).unwrap();
2231 let _ = channel.subscribe(200).unwrap();
2232
2233 let ev = Event::new_channel_event(
2236 "bc_test",
2237 EventContent::Notification(NotificationType::TaskCompleted),
2238 EventPayload::Empty,
2239 );
2240 channel.broadcast_to_subscribers(ev).expect("broadcast ok");
2241 }
2242
2243 #[test_case]
2244 fn test_subscription_filter_registration() {
2245 let manager = EventManager::get_manager();
2246 let ch = manager.create_channel("filter_test".to_string());
2247 let channel = match ch {
2248 crate::object::KernelObject::EventChannel(arc) => arc,
2249 _ => panic!("Expected EventChannel"),
2250 };
2251 let sub = channel.subscribe(7).expect("subscribe");
2252
2253 sub.register_filter(EventFilter::All, 1).expect("reg1");
2255 sub.register_filter(EventFilter::Sender(7), 2)
2256 .expect("reg2");
2257 let filters = sub.get_filters();
2258 assert_eq!(filters.len(), 2);
2259
2260 sub.unregister_filter(1).expect("unreg1");
2262 let filters = sub.get_filters();
2263 assert_eq!(filters.len(), 1);
2264
2265 let globals = manager.get_filters_for_task(7);
2267 assert!(globals.is_empty());
2268 }
2269
2270 #[test_case]
2271 fn test_clone_returns_same_arc_objects() {
2272 use crate::object::capability::CloneOps;
2273 let mgr = EventManager::get_manager();
2274 let ch = match mgr.create_channel("clone_arc".to_string()) {
2275 crate::object::KernelObject::EventChannel(arc) => arc,
2276 _ => panic!("Expected channel"),
2277 };
2278 let sub = ch.subscribe(1234).expect("subscribe");
2279
2280 let ch_clone = match CloneOps::custom_clone(&*ch) {
2282 crate::object::KernelObject::EventChannel(arc) => arc,
2283 _ => panic!("Expected channel"),
2284 };
2285 assert!(alloc::sync::Arc::ptr_eq(&ch, &ch_clone));
2286
2287 let sub_clone = match CloneOps::custom_clone(&*sub) {
2289 crate::object::KernelObject::EventSubscription(arc) => arc,
2290 _ => panic!("Expected sub"),
2291 };
2292 assert!(alloc::sync::Arc::ptr_eq(&sub, &sub_clone));
2293 }
2294
2295 #[test_case]
2296 fn test_group_session_and_named_delivery_no_error() {
2297 let mgr = EventManager::get_manager();
2298 for i in 0..2 {
2300 let task =
2301 crate::task::Task::new(format!("g_sess_{}", i), 1, crate::task::TaskType::Kernel);
2302 crate::sched::scheduler::get_scheduler().add_task(task, 0);
2303 }
2304
2305 mgr.join_session(77).expect("join session");
2307 mgr.join_named_group("teamA".to_string())
2308 .expect("join named");
2309
2310 let ev_sess = Event::group(
2311 GroupTarget::Session(77),
2312 EventContent::Notification(NotificationType::DeviceConnected),
2313 EventPriority::Normal,
2314 false,
2315 EventPayload::Empty,
2316 );
2317 let ev_named = Event::group(
2318 GroupTarget::Custom("teamA".to_string()),
2319 EventContent::Notification(NotificationType::DeviceDisconnected),
2320 EventPriority::Normal,
2321 false,
2322 EventPayload::Empty,
2323 );
2324 assert!(mgr.send_event(ev_sess).is_ok());
2325 assert!(mgr.send_event(ev_named).is_ok());
2326
2327 mgr.leave_session(77).ok();
2328 mgr.leave_named_group("teamA").ok();
2329 }
2330}