asterinas/kernel/src/net/socket/ip/stream/connected.rs

209 lines
7.1 KiB
Rust
Raw Normal View History

2024-01-03 03:22:36 +00:00
// SPDX-License-Identifier: MPL-2.0
2024-10-24 09:24:10 +00:00
use core::sync::atomic::{AtomicBool, Ordering};
2023-06-13 09:49:44 +00:00
2024-09-06 10:49:37 +00:00
use aster_bigtcp::{
errors::tcp::{RecvError, SendError},
2024-12-02 15:53:19 +00:00
socket::{NeedIfacePoll, TcpStateCheck},
2024-09-06 10:49:37 +00:00
wire::IpEndpoint,
};
2024-12-02 15:53:19 +00:00
use super::StreamObserver;
2023-05-31 02:48:16 +00:00
use crate::{
2024-09-06 10:49:37 +00:00
events::IoEvents,
2023-05-31 02:48:16 +00:00
net::{
2024-11-16 15:09:20 +00:00
iface::{BoundTcpSocket, Iface},
2023-05-31 02:48:16 +00:00
socket::util::{send_recv_flags::SendRecvFlags, shutdown_cmd::SockShutdownCmd},
},
prelude::*,
2024-01-07 15:55:23 +00:00
process::signal::Pollee,
util::{MultiRead, MultiWrite},
2023-05-31 02:48:16 +00:00
};
pub struct ConnectedStream {
2024-09-20 03:33:20 +00:00
bound_socket: BoundTcpSocket,
2023-05-31 02:48:16 +00:00
remote_endpoint: IpEndpoint,
2024-01-09 15:42:26 +00:00
/// Indicates whether this connection is "new" in a `connect()` system call.
///
/// If the connection is not new, `connect()` will fail with the error code `EISCONN`,
/// otherwise it will succeed. This means that `connect()` will succeed _exactly_ once,
/// regardless of whether the connection is established synchronously or asynchronously.
///
/// If the connection is established synchronously, the synchronous `connect()` will succeed
/// and any subsequent `connect()` will fail; otherwise, the first `connect()` after the
/// connection is established asynchronously will succeed and any subsequent `connect()` will
/// fail.
is_new_connection: bool,
2024-10-24 09:24:10 +00:00
/// Indicates if the receiving side of this socket is closed.
///
/// The receiving side may be closed if this side disables reading
/// or if the peer side closes its sending half.
is_receiving_closed: AtomicBool,
/// Indicates if the sending side of this socket is closed.
///
/// The sending side can only be closed if this side disables writing.
is_sending_closed: AtomicBool,
2023-05-31 02:48:16 +00:00
}
impl ConnectedStream {
2024-01-09 15:42:26 +00:00
pub fn new(
2024-09-20 03:33:20 +00:00
bound_socket: BoundTcpSocket,
2024-01-09 15:42:26 +00:00
remote_endpoint: IpEndpoint,
is_new_connection: bool,
) -> Self {
2024-01-07 15:55:23 +00:00
Self {
2023-05-31 02:48:16 +00:00
bound_socket,
remote_endpoint,
2024-01-09 15:42:26 +00:00
is_new_connection,
2024-10-24 09:24:10 +00:00
is_receiving_closed: AtomicBool::new(false),
is_sending_closed: AtomicBool::new(false),
2024-01-07 15:55:23 +00:00
}
2023-05-31 02:48:16 +00:00
}
2024-10-24 09:24:10 +00:00
pub fn shutdown(&self, cmd: SockShutdownCmd, pollee: &Pollee) -> Result<()> {
2024-11-13 15:39:55 +00:00
let mut events = IoEvents::empty();
2024-10-24 09:24:10 +00:00
if cmd.shut_read() {
self.is_receiving_closed.store(true, Ordering::Relaxed);
2024-11-13 15:39:55 +00:00
events |= IoEvents::IN | IoEvents::RDHUP;
2024-10-24 09:24:10 +00:00
}
if cmd.shut_write() {
self.is_sending_closed.store(true, Ordering::Relaxed);
self.bound_socket.close();
2024-11-13 15:39:55 +00:00
events |= IoEvents::OUT | IoEvents::HUP;
2024-10-24 09:24:10 +00:00
}
2024-11-13 15:39:55 +00:00
pollee.notify(events);
2023-05-31 02:48:16 +00:00
Ok(())
}
2024-11-16 15:09:20 +00:00
pub fn try_recv(
&self,
writer: &mut dyn MultiWrite,
_flags: SendRecvFlags,
) -> Result<(usize, NeedIfacePoll)> {
2024-09-20 03:33:20 +00:00
let result = self.bound_socket.recv(|socket_buffer| {
match writer.write(&mut VmReader::from(&*socket_buffer)) {
Ok(len) => (len, Ok(len)),
Err(e) => (0, Err(e)),
}
});
2024-03-20 03:25:18 +00:00
match result {
2024-11-16 15:09:20 +00:00
Ok((Ok(0), need_poll)) if self.is_receiving_closed.load(Ordering::Relaxed) => {
Ok((0, need_poll))
}
Ok((Ok(0), need_poll)) => {
debug_assert!(!*need_poll);
return_errno_with_message!(Errno::EAGAIN, "the receive buffer is empty")
}
Ok((Ok(recv_bytes), need_poll)) => Ok((recv_bytes, need_poll)),
Ok((Err(e), need_poll)) => {
debug_assert!(!*need_poll);
Err(e)
}
Err(RecvError::Finished) => Ok((0, NeedIfacePoll::FALSE)),
Err(RecvError::InvalidState) => {
return_errno_with_message!(Errno::ECONNRESET, "the connection is reset")
}
2023-05-31 02:48:16 +00:00
}
}
2024-11-16 15:09:20 +00:00
pub fn try_send(
&self,
reader: &mut dyn MultiRead,
_flags: SendRecvFlags,
) -> Result<(usize, NeedIfacePoll)> {
2024-09-20 03:33:20 +00:00
let result = self.bound_socket.send(|socket_buffer| {
match reader.read(&mut VmWriter::from(socket_buffer)) {
Ok(len) => (len, Ok(len)),
Err(e) => (0, Err(e)),
}
});
2024-03-20 03:25:18 +00:00
match result {
2024-11-16 15:09:20 +00:00
Ok((Ok(0), need_poll)) => {
debug_assert!(!*need_poll);
return_errno_with_message!(Errno::EAGAIN, "the send buffer is full")
}
Ok((Ok(sent_bytes), need_poll)) => Ok((sent_bytes, need_poll)),
Ok((Err(e), need_poll)) => {
debug_assert!(!*need_poll);
Err(e)
}
Err(SendError::InvalidState) => {
// FIXME: `EPIPE` is another possibility, which means that the socket is shut down
// for writing. In that case, we should also trigger a `SIGPIPE` if `MSG_NOSIGNAL`
// is not specified.
return_errno_with_message!(Errno::ECONNRESET, "the connection is reset");
}
2024-01-07 15:55:23 +00:00
}
}
2024-01-07 15:55:23 +00:00
pub fn local_endpoint(&self) -> IpEndpoint {
self.bound_socket.local_endpoint().unwrap()
2023-05-31 02:48:16 +00:00
}
2024-01-07 15:55:23 +00:00
pub fn remote_endpoint(&self) -> IpEndpoint {
self.remote_endpoint
2023-05-31 02:48:16 +00:00
}
2024-11-16 15:09:20 +00:00
pub fn iface(&self) -> &Arc<Iface> {
self.bound_socket.iface()
}
2024-01-09 15:42:26 +00:00
pub fn check_new(&mut self) -> Result<()> {
if !self.is_new_connection {
return_errno_with_message!(Errno::EISCONN, "the socket is already connected");
}
self.is_new_connection = false;
Ok(())
}
2024-11-13 15:39:55 +00:00
pub(super) fn check_io_events(&self) -> IoEvents {
2024-09-20 03:33:20 +00:00
self.bound_socket.raw_with(|socket| {
2024-11-13 15:39:55 +00:00
if socket.is_peer_closed() {
2024-10-24 09:24:10 +00:00
// Only the sending side of peer socket is closed
self.is_receiving_closed.store(true, Ordering::Relaxed);
2024-11-13 15:39:55 +00:00
} else if socket.is_closed() {
2024-10-24 09:24:10 +00:00
// The sending side of both peer socket and this socket are closed
self.is_receiving_closed.store(true, Ordering::Relaxed);
self.is_sending_closed.store(true, Ordering::Relaxed);
}
let is_receiving_closed = self.is_receiving_closed.load(Ordering::Relaxed);
let is_sending_closed = self.is_sending_closed.load(Ordering::Relaxed);
2024-11-13 15:39:55 +00:00
let mut events = IoEvents::empty();
2024-10-24 09:24:10 +00:00
// If the receiving side is closed, always add events IN and RDHUP;
// otherwise, check if the socket can receive.
if is_receiving_closed {
2024-11-13 15:39:55 +00:00
events |= IoEvents::IN | IoEvents::RDHUP;
2024-10-24 09:24:10 +00:00
} else if socket.can_recv() {
2024-11-13 15:39:55 +00:00
events |= IoEvents::IN;
}
2024-10-24 09:24:10 +00:00
// If the sending side is closed, always add an OUT event;
// otherwise, check if the socket can send.
if is_sending_closed || socket.can_send() {
2024-11-13 15:39:55 +00:00
events |= IoEvents::OUT;
}
2024-10-24 09:24:10 +00:00
// If both sending and receiving sides are closed, add a HUP event.
if is_receiving_closed && is_sending_closed {
2024-11-13 15:39:55 +00:00
events |= IoEvents::HUP;
2024-10-24 09:24:10 +00:00
}
2024-11-13 15:39:55 +00:00
events
})
2023-05-31 02:48:16 +00:00
}
2023-06-13 09:49:44 +00:00
2024-12-02 15:53:19 +00:00
pub(super) fn set_observer(&self, observer: StreamObserver) {
2024-01-07 15:55:23 +00:00
self.bound_socket.set_observer(observer)
}
}