Unpack `MessageQueue`

This commit is contained in:
Ruihan Li 2025-09-03 22:23:28 +08:00 committed by Tate, Hongliang Tian
parent f0935beb18
commit b57c94d05d
5 changed files with 45 additions and 40 deletions

View File

@ -11,11 +11,14 @@ use crate::{
pub struct BoundNetlink<Message: 'static> {
pub(in crate::net::socket::netlink) handle: BoundHandle<Message>,
pub(in crate::net::socket::netlink) remote_addr: NetlinkSocketAddr,
pub(in crate::net::socket::netlink) receive_queue: MessageQueue<Message>,
pub(in crate::net::socket::netlink) receive_queue: Arc<Mutex<MessageQueue<Message>>>,
}
impl<Message: 'static> BoundNetlink<Message> {
pub(super) fn new(handle: BoundHandle<Message>, message_queue: MessageQueue<Message>) -> Self {
pub(super) fn new(
handle: BoundHandle<Message>,
message_queue: Arc<Mutex<MessageQueue<Message>>>,
) -> Self {
Self {
handle,
remote_addr: NetlinkSocketAddr::new_unspecified(),
@ -43,7 +46,7 @@ impl<Message: 'static> BoundNetlink<Message> {
pub(in crate::net::socket::netlink) fn check_io_events_common(&self) -> IoEvents {
let mut events = IoEvents::OUT;
let receive_queue = self.receive_queue.0.lock();
let receive_queue = self.receive_queue.lock();
if !receive_queue.is_empty() {
events |= IoEvents::IN;
}

View File

@ -6,9 +6,7 @@ use crate::{
events::IoEvents,
net::socket::{
netlink::{
common::bound::BoundNetlink,
receiver::{MessageQueue, MessageReceiver},
table::SupportedNetlinkProtocol,
common::bound::BoundNetlink, receiver::MessageQueue, table::SupportedNetlinkProtocol,
GroupIdSet, NetlinkSocketAddr,
},
util::datagram_common,
@ -55,7 +53,8 @@ impl<P: SupportedNetlinkProtocol> datagram_common::Unbound for UnboundNetlink<P>
pollee: &Pollee,
_options: Self::BindOptions,
) -> Result<Self::Bound> {
let message_queue = MessageQueue::<P::Message>::new();
let (message_queue, message_receiver) =
MessageQueue::<P::Message>::new_pair(pollee.clone());
let bound_handle = {
let endpoint = {
@ -63,8 +62,7 @@ impl<P: SupportedNetlinkProtocol> datagram_common::Unbound for UnboundNetlink<P>
endpoint.add_groups(self.groups);
endpoint
};
let receiver = MessageReceiver::new(message_queue.clone(), pollee.clone());
<P as SupportedNetlinkProtocol>::bind(&endpoint, receiver)?
<P as SupportedNetlinkProtocol>::bind(&endpoint, message_receiver)?
};
Ok(BoundNetlink::new(bound_handle, message_queue))
@ -75,7 +73,8 @@ impl<P: SupportedNetlinkProtocol> datagram_common::Unbound for UnboundNetlink<P>
_remote_endpoint: &Self::Endpoint,
pollee: &Pollee,
) -> Result<Self::Bound> {
let message_queue = MessageQueue::<P::Message>::new();
let (message_queue, message_receiver) =
MessageQueue::<P::Message>::new_pair(pollee.clone());
let bound_handle = {
let endpoint = {
@ -83,8 +82,7 @@ impl<P: SupportedNetlinkProtocol> datagram_common::Unbound for UnboundNetlink<P>
endpoint.add_groups(self.groups);
endpoint
};
let receiver = MessageReceiver::new(message_queue.clone(), pollee.clone());
<P as SupportedNetlinkProtocol>::bind(&endpoint, receiver)?
<P as SupportedNetlinkProtocol>::bind(&endpoint, message_receiver)?
};
Ok(BoundNetlink::new(bound_handle, message_queue))

View File

@ -67,10 +67,10 @@ impl datagram_common::Bound for BoundNetlinkUevent {
warn!("unsupported flags: {:?}", flags);
}
let mut receive_queue = self.receive_queue.0.lock();
let mut receive_queue = self.receive_queue.lock();
let Some(response) = receive_queue.front() else {
return_errno_with_message!(Errno::EAGAIN, "nothing to receive");
let Some(response) = receive_queue.peek() else {
return_errno_with_message!(Errno::EAGAIN, "the receive buffer is empty");
};
let len = {
@ -83,7 +83,7 @@ impl datagram_common::Bound for BoundNetlinkUevent {
let remote = *response.src_addr();
if !flags.contains(SendRecvFlags::MSG_PEEK) {
receive_queue.pop_front().unwrap();
receive_queue.dequeue().unwrap();
}
Ok((len, remote))

View File

@ -3,41 +3,45 @@
use crate::{events::IoEvents, prelude::*, process::signal::Pollee};
pub struct MessageReceiver<Message> {
message_queue: MessageQueue<Message>,
message_queue: Arc<Mutex<MessageQueue<Message>>>,
pollee: Pollee,
}
pub(super) struct MessageQueue<Message>(pub(super) Arc<Mutex<VecDeque<Message>>>);
impl<Message> Clone for MessageQueue<Message> {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
pub(super) struct MessageQueue<Message>(VecDeque<Message>);
impl<Message> MessageQueue<Message> {
pub(super) fn new() -> Self {
Self(Arc::new(Mutex::new(VecDeque::new())))
pub(super) fn new_pair(pollee: Pollee) -> (Arc<Mutex<Self>>, MessageReceiver<Message>) {
let queue = Arc::new(Mutex::new(Self(VecDeque::new())));
let receiver = MessageReceiver {
message_queue: queue.clone(),
pollee,
};
(queue, receiver)
}
fn enqueue(&self, message: Message) -> Result<()> {
pub(super) fn is_empty(&self) -> bool {
self.0.is_empty()
}
pub(super) fn peek(&self) -> Option<&Message> {
self.0.front()
}
pub(super) fn dequeue(&mut self) -> Option<Message> {
self.0.pop_front()
}
pub(self) fn enqueue(&mut self, message: Message) -> Result<()> {
// FIXME: We should verify the socket buffer length to ensure
// that adding the message doesn't exceed the buffer capacity.
self.0.lock().push_back(message);
self.0.push_back(message);
Ok(())
}
}
impl<Message> MessageReceiver<Message> {
pub(super) const fn new(message_queue: MessageQueue<Message>, pollee: Pollee) -> Self {
Self {
message_queue,
pollee,
}
}
pub(super) fn enqueue_message(&self, message: Message) -> Result<()> {
self.message_queue.enqueue(message)?;
self.message_queue.lock().enqueue(message)?;
self.pollee.notify(IoEvents::IN);
Ok(())

View File

@ -99,10 +99,10 @@ impl datagram_common::Bound for BoundNetlinkRoute {
warn!("unsupported flags: {:?}", flags);
}
let mut receive_queue = self.receive_queue.0.lock();
let mut receive_queue = self.receive_queue.lock();
let Some(response) = receive_queue.front() else {
return_errno_with_message!(Errno::EAGAIN, "nothing to receive");
let Some(response) = receive_queue.peek() else {
return_errno_with_message!(Errno::EAGAIN, "the receive buffer is empty");
};
let len = {
@ -113,7 +113,7 @@ impl datagram_common::Bound for BoundNetlinkRoute {
response.write_to(writer)?;
if !flags.contains(SendRecvFlags::MSG_PEEK) {
receive_queue.pop_front().unwrap();
receive_queue.dequeue().unwrap();
}
// TODO: The message can only come from kernel socket currently.