From f437dc624466945f1831f9c09d7b73c7b7b5fce7 Mon Sep 17 00:00:00 2001 From: Jianfeng Jiang Date: Wed, 31 May 2023 10:47:52 +0800 Subject: [PATCH] Add network iface --- .../jinux-std/src/net/iface/any_socket.rs | 232 ++++++++++++++++++ .../libs/jinux-std/src/net/iface/common.rs | 181 ++++++++++++++ .../libs/jinux-std/src/net/iface/loopback.rs | 70 ++++++ services/libs/jinux-std/src/net/iface/mod.rs | 79 ++++++ services/libs/jinux-std/src/net/iface/time.rs | 6 + services/libs/jinux-std/src/net/iface/util.rs | 79 ++++++ services/libs/jinux-std/src/net/mod.rs | 35 +++ 7 files changed, 682 insertions(+) create mode 100644 services/libs/jinux-std/src/net/iface/any_socket.rs create mode 100644 services/libs/jinux-std/src/net/iface/common.rs create mode 100644 services/libs/jinux-std/src/net/iface/loopback.rs create mode 100644 services/libs/jinux-std/src/net/iface/mod.rs create mode 100644 services/libs/jinux-std/src/net/iface/time.rs create mode 100644 services/libs/jinux-std/src/net/iface/util.rs create mode 100644 services/libs/jinux-std/src/net/mod.rs diff --git a/services/libs/jinux-std/src/net/iface/any_socket.rs b/services/libs/jinux-std/src/net/iface/any_socket.rs new file mode 100644 index 000000000..a962fce14 --- /dev/null +++ b/services/libs/jinux-std/src/net/iface/any_socket.rs @@ -0,0 +1,232 @@ +use crate::{ + fs::utils::{IoEvents, Pollee, Poller}, + prelude::*, +}; + +use super::Iface; +use super::{IpAddress, IpEndpoint}; + +pub type RawTcpSocket = smoltcp::socket::tcp::Socket<'static>; +pub type RawUdpSocket = smoltcp::socket::udp::Socket<'static>; +pub type RawSocketHandle = smoltcp::iface::SocketHandle; + +pub struct AnyUnboundSocket { + socket_family: AnyRawSocket, + pollee: Pollee, +} + +pub(super) enum AnyRawSocket { + Tcp(RawTcpSocket), + Udp(RawUdpSocket), +} + +pub(super) enum SocketFamily { + Tcp, + Udp, +} + +impl AnyUnboundSocket { + pub fn new_tcp() -> Self { + let raw_tcp_socket = { + let rx_buffer = smoltcp::socket::tcp::SocketBuffer::new(vec![0u8; RECV_BUF_LEN]); + let tx_buffer = smoltcp::socket::tcp::SocketBuffer::new(vec![0u8; SEND_BUF_LEN]); + RawTcpSocket::new(rx_buffer, tx_buffer) + }; + let pollee = Pollee::new(IoEvents::empty()); + AnyUnboundSocket { + socket_family: AnyRawSocket::Tcp(raw_tcp_socket), + pollee, + } + } + + pub fn new_udp() -> Self { + let raw_udp_socket = { + let metadata = smoltcp::socket::udp::PacketMetadata::EMPTY; + let rx_buffer = smoltcp::socket::udp::PacketBuffer::new( + vec![metadata; UDP_METADATA_LEN], + vec![0u8; UDP_RECEIVE_PAYLOAD_LEN], + ); + let tx_buffer = smoltcp::socket::udp::PacketBuffer::new( + vec![metadata; UDP_METADATA_LEN], + vec![0u8; UDP_RECEIVE_PAYLOAD_LEN], + ); + RawUdpSocket::new(rx_buffer, tx_buffer) + }; + AnyUnboundSocket { + socket_family: AnyRawSocket::Udp(raw_udp_socket), + pollee: Pollee::new(IoEvents::empty()), + } + } + + pub(super) fn raw_socket_family(self) -> AnyRawSocket { + self.socket_family + } + + pub(super) fn socket_family(&self) -> SocketFamily { + match &self.socket_family { + AnyRawSocket::Tcp(_) => SocketFamily::Tcp, + AnyRawSocket::Udp(_) => SocketFamily::Udp, + } + } + + pub fn poll(&self, mask: IoEvents, poller: Option<&Poller>) -> IoEvents { + self.pollee.poll(mask, poller) + } + + pub(super) fn pollee(&self) -> Pollee { + self.pollee.clone() + } +} + +pub struct AnyBoundSocket { + iface: Arc, + handle: smoltcp::iface::SocketHandle, + port: u16, + pollee: Pollee, + socket_family: SocketFamily, + weak_self: Weak, +} + +impl AnyBoundSocket { + pub(super) fn new( + iface: Arc, + handle: smoltcp::iface::SocketHandle, + port: u16, + pollee: Pollee, + socket_family: SocketFamily, + ) -> Arc { + Arc::new_cyclic(|weak_self| Self { + iface, + handle, + port, + pollee, + socket_family, + weak_self: weak_self.clone(), + }) + } + + pub fn local_endpoint(&self) -> Option { + let ip_addr = { + let ipv4_addr = self.iface.ipv4_addr()?; + IpAddress::Ipv4(ipv4_addr) + }; + Some(IpEndpoint::new(ip_addr, self.port)) + } + + pub fn raw_with, R, F: FnMut(&mut T) -> R>( + &self, + mut f: F, + ) -> R { + let mut sockets = self.iface.sockets(); + let socket = sockets.get_mut::(self.handle); + f(socket) + } + + /// Try to connect to a remote endpoint. Tcp socket only. + pub fn do_connect(&self, remote_endpoint: IpEndpoint) -> Result<()> { + let mut sockets = self.iface.sockets(); + let socket = sockets.get_mut::(self.handle); + let port = self.port; + let mut iface_inner = self.iface.iface_inner(); + let cx = iface_inner.context(); + socket + .connect(cx, remote_endpoint, port) + .map_err(|_| Error::with_message(Errno::ENOBUFS, "send connection request failed"))?; + Ok(()) + } + + pub fn update_socket_state(&self) { + let handle = &self.handle; + let pollee = &self.pollee; + let sockets = self.iface().sockets(); + match self.socket_family { + SocketFamily::Tcp => { + let socket = sockets.get::(*handle); + update_tcp_socket_state(socket, pollee); + } + SocketFamily::Udp => { + let udp_socket = sockets.get::(*handle); + update_udp_socket_state(udp_socket, pollee); + } + } + } + + pub fn iface(&self) -> &Arc { + &self.iface + } + + pub fn poll(&self, mask: IoEvents, poller: Option<&Poller>) -> IoEvents { + self.pollee.poll(mask, poller) + } + + pub(super) fn weak_ref(&self) -> Weak { + self.weak_self.clone() + } + + fn close(&self) { + match self.socket_family { + SocketFamily::Tcp => self.raw_with(|socket: &mut RawTcpSocket| socket.close()), + SocketFamily::Udp => self.raw_with(|socket: &mut RawUdpSocket| socket.close()), + } + } +} + +impl Drop for AnyBoundSocket { + fn drop(&mut self) { + self.close(); + self.iface.common().remove_socket(self.handle); + self.iface.common().release_port(self.port); + self.iface.common().remove_bound_socket(self.weak_ref()); + } +} + +fn update_tcp_socket_state(socket: &RawTcpSocket, pollee: &Pollee) { + if socket.can_recv() { + pollee.add_events(IoEvents::IN); + } else { + pollee.del_events(IoEvents::IN); + } + + if socket.can_send() { + pollee.add_events(IoEvents::OUT); + } else { + pollee.del_events(IoEvents::OUT); + } + + if socket.may_recv() { + pollee.del_events(IoEvents::RDHUP); + } else { + // The receice half was closed + pollee.add_events(IoEvents::RDHUP); + } + + if socket.is_open() { + pollee.del_events(IoEvents::HUP); + } else { + // The socket is closed + pollee.add_events(IoEvents::HUP); + } +} + +fn update_udp_socket_state(socket: &RawUdpSocket, pollee: &Pollee) { + if socket.can_recv() { + pollee.add_events(IoEvents::IN); + } else { + pollee.del_events(IoEvents::IN); + } + + if socket.can_send() { + pollee.add_events(IoEvents::OUT); + } else { + pollee.del_events(IoEvents::OUT); + } +} + +// For TCP +const RECV_BUF_LEN: usize = 65536; +const SEND_BUF_LEN: usize = 65536; + +// For UDP +const UDP_METADATA_LEN: usize = 256; +const UDP_SEND_PAYLOAD_LEN: usize = 65536; +const UDP_RECEIVE_PAYLOAD_LEN: usize = 65536; diff --git a/services/libs/jinux-std/src/net/iface/common.rs b/services/libs/jinux-std/src/net/iface/common.rs new file mode 100644 index 000000000..e120fbd81 --- /dev/null +++ b/services/libs/jinux-std/src/net/iface/common.rs @@ -0,0 +1,181 @@ +use core::sync::atomic::{AtomicU64, Ordering}; + +use super::Ipv4Address; +use crate::prelude::*; +use keyable_arc::KeyableWeak; +use smoltcp::{ + iface::{SocketHandle, SocketSet}, + phy::Device, + wire::IpCidr, +}; + +use super::{ + any_socket::{AnyBoundSocket, AnyRawSocket, AnyUnboundSocket}, + time::get_network_timestamp, + util::BindPortConfig, + Iface, +}; + +pub struct IfaceCommon { + interface: Mutex, + sockets: Mutex>, + used_ports: RwLock>, + /// The time should do next poll. We stores the total microseconds since system boots up. + next_poll_at_ms: AtomicU64, + bound_sockets: RwLock>>, +} + +impl IfaceCommon { + pub(super) fn new(interface: smoltcp::iface::Interface) -> Self { + let socket_set = SocketSet::new(Vec::new()); + let used_ports = BTreeMap::new(); + Self { + interface: Mutex::new(interface), + sockets: Mutex::new(socket_set), + used_ports: RwLock::new(used_ports), + next_poll_at_ms: AtomicU64::new(0), + bound_sockets: RwLock::new(BTreeSet::new()), + } + } + + pub(super) fn interface(&self) -> MutexGuard { + self.interface.lock() + } + + pub(super) fn sockets(&self) -> MutexGuard> { + self.sockets.lock() + } + + pub(super) fn ipv4_addr(&self) -> Option { + self.interface.lock().ipv4_addr() + } + + pub(super) fn netmask(&self) -> Option { + let interface = self.interface.lock(); + let ip_addrs = interface.ip_addrs(); + ip_addrs.first().map(|cidr| match cidr { + IpCidr::Ipv4(ipv4_cidr) => ipv4_cidr.netmask(), + }) + } + + /// Alloc an unused port range from 49152 ~ 65535 (According to smoltcp docs) + fn alloc_ephemeral_port(&self) -> Result { + let mut used_ports = self.used_ports.write(); + for port in IP_LOCAL_PORT_START..=IP_LOCAL_PORT_END { + if !used_ports.contains_key(&port) { + used_ports.insert(port, 0); + return Ok(port); + } + } + return_errno_with_message!(Errno::EAGAIN, "cannot find unused high port"); + } + + fn bind_port(&self, port: u16, can_reuse: bool) -> Result<()> { + let mut used_ports = self.used_ports.write(); + if let Some(used_times) = used_ports.get_mut(&port) { + if *used_times == 0 || can_reuse { + *used_times = *used_times + 1; + } else { + return_errno_with_message!(Errno::EADDRINUSE, "cannot bind port"); + } + } else { + used_ports.insert(port, 1); + } + Ok(()) + } + + /// Release port number so the port can be used again. For reused port, the port may still be in use. + pub(super) fn release_port(&self, port: u16) { + let mut used_ports = self.used_ports.write(); + if let Some(used_times) = used_ports.remove(&port) { + if used_times != 1 { + used_ports.insert(port, used_times - 1); + } + } + } + + pub(super) fn bind_socket( + &self, + iface: Arc, + socket: AnyUnboundSocket, + config: BindPortConfig, + ) -> core::result::Result, (Error, AnyUnboundSocket)> { + let port = if let Some(port) = config.port() { + port + } else { + match self.alloc_ephemeral_port() { + Ok(port) => port, + Err(e) => return Err((e, socket)), + } + }; + if let Some(e) = self.bind_port(port, config.can_reuse()).err() { + return Err((e, socket)); + } + let socket_family = socket.socket_family(); + let pollee = socket.pollee(); + let mut sockets = self.sockets.lock(); + let handle = match socket.raw_socket_family() { + AnyRawSocket::Tcp(tcp_socket) => sockets.add(tcp_socket), + AnyRawSocket::Udp(udp_socket) => sockets.add(udp_socket), + }; + let bound_socket = AnyBoundSocket::new(iface, handle, port, pollee, socket_family); + self.insert_bound_socket(&bound_socket).unwrap(); + Ok(bound_socket) + } + + /// Remove a socket from the interface + pub(super) fn remove_socket(&self, handle: SocketHandle) { + self.sockets.lock().remove(handle); + } + + pub(super) fn poll(&self, device: &mut D) { + let mut interface = self.interface.lock(); + let timestamp = get_network_timestamp(); + let has_events = { + let mut sockets = self.sockets.lock(); + interface.poll(timestamp, device, &mut sockets) + // drop sockets here to avoid deadlock + }; + if has_events { + self.bound_sockets.read().iter().for_each(|bound_socket| { + let bound_socket = bound_socket.upgrade().unwrap(); + bound_socket.update_socket_state(); + }); + } + + let sockets = self.sockets.lock(); + if let Some(instant) = interface.poll_at(timestamp, &sockets) { + self.next_poll_at_ms + .store(instant.total_millis() as u64, Ordering::SeqCst); + } else { + self.next_poll_at_ms.store(0, Ordering::SeqCst); + } + } + + pub(super) fn next_poll_at_ms(&self) -> Option { + let millis = self.next_poll_at_ms.load(Ordering::SeqCst); + if millis == 0 { + None + } else { + Some(millis) + } + } + + fn insert_bound_socket(&self, socket: &Arc) -> Result<()> { + let weak_ref = KeyableWeak::from(Arc::downgrade(&socket)); + let mut bound_sockets = self.bound_sockets.write(); + if bound_sockets.contains(&weak_ref) { + return_errno_with_message!(Errno::EINVAL, "the socket is already bound"); + } + bound_sockets.insert(weak_ref); + Ok(()) + } + + pub(super) fn remove_bound_socket(&self, socket: Weak) { + let weak_ref = KeyableWeak::from(socket); + self.bound_sockets.write().remove(&weak_ref); + } +} + +const IP_LOCAL_PORT_START: u16 = 49152; +const IP_LOCAL_PORT_END: u16 = 65535; diff --git a/services/libs/jinux-std/src/net/iface/loopback.rs b/services/libs/jinux-std/src/net/iface/loopback.rs new file mode 100644 index 000000000..61a802448 --- /dev/null +++ b/services/libs/jinux-std/src/net/iface/loopback.rs @@ -0,0 +1,70 @@ +use super::{IpAddress, Ipv4Address}; +use crate::prelude::*; +use smoltcp::{ + iface::{Config, Routes}, + phy::{Loopback, Medium}, + wire::IpCidr, +}; + +use super::{common::IfaceCommon, internal::IfaceInternal, Iface}; + +pub const LOOPBACK_ADDRESS: IpAddress = { + let ipv4_addr = Ipv4Address::new(127, 0, 0, 1); + IpAddress::Ipv4(ipv4_addr) +}; +pub const LOOPBACK_ADDRESS_PREFIX_LEN: u8 = 8; // mask: 255.0.0.0 + +pub struct IfaceLoopback { + driver: Mutex, + common: IfaceCommon, + weak_self: Weak, +} + +impl IfaceLoopback { + pub fn new() -> Arc { + let mut loopback = Loopback::new(Medium::Ip); + let interface = { + let routes = Routes::new(); + let config = Config::new(); + let mut interface = smoltcp::iface::Interface::new(config, &mut loopback); + interface.update_ip_addrs(|ip_addrs| { + debug_assert!(ip_addrs.len() == 0); + let ip_addr = IpCidr::new(LOOPBACK_ADDRESS, LOOPBACK_ADDRESS_PREFIX_LEN); + ip_addrs.push(ip_addr).unwrap(); + }); + interface + }; + println!("Loopback ipaddr: {}", interface.ipv4_addr().unwrap()); + let common = IfaceCommon::new(interface); + Arc::new_cyclic(|weak| Self { + driver: Mutex::new(loopback), + common, + weak_self: weak.clone(), + }) + } +} + +impl IfaceInternal for IfaceLoopback { + fn common(&self) -> &IfaceCommon { + &self.common + } + + fn arc_self(&self) -> Arc { + self.weak_self.upgrade().unwrap() + } +} + +impl Iface for IfaceLoopback { + fn name(&self) -> &str { + "lo" + } + + fn mac_addr(&self) -> Option { + None + } + + fn poll(&self) { + let mut device = self.driver.lock(); + self.common.poll(&mut *device); + } +} diff --git a/services/libs/jinux-std/src/net/iface/mod.rs b/services/libs/jinux-std/src/net/iface/mod.rs new file mode 100644 index 000000000..7298f1bf0 --- /dev/null +++ b/services/libs/jinux-std/src/net/iface/mod.rs @@ -0,0 +1,79 @@ +use self::common::IfaceCommon; +use crate::prelude::*; +use smoltcp::iface::SocketSet; + +mod any_socket; +mod common; +mod loopback; +mod time; +mod util; + +pub use any_socket::{AnyBoundSocket, AnyUnboundSocket, RawTcpSocket, RawUdpSocket}; +pub use loopback::IfaceLoopback; +pub use smoltcp::wire::{EthernetAddress, IpAddress, IpEndpoint, IpListenEndpoint, Ipv4Address}; +pub use util::{spawn_background_poll_thread, BindPortConfig}; + +/// Network interface. +/// +/// A network interface (abbreviated as iface) is a hardware or software component that connects a device or computer to a network. +/// Network interfaces can be physical components like Ethernet ports or wireless adapters, +/// or they can be virtual interfaces created by software such as virtual private network (VPN) connections. +pub trait Iface: internal::IfaceInternal + Send + Sync { + /// The iface name. For linux, usually the driver name followed by a unit number. + fn name(&self) -> &str; + + /// The optional mac address + fn mac_addr(&self) -> Option; + + /// Transmit packets queued in the iface, and receive packets queued in the iface. + /// It any event happens, this function will also update socket status. + fn poll(&self); + + /// Bind a socket to the iface. So the packet for this socket will be dealt with by the interface. + /// If port is None, the iface will pick up an empheral port for the socket. + /// FIXME: The reason for binding socket and interface together is because there are limitations inside smoltcp. + /// See discussion at https://github.com/smoltcp-rs/smoltcp/issues/779. + fn bind_socket( + &self, + socket: AnyUnboundSocket, + config: BindPortConfig, + ) -> core::result::Result, (Error, AnyUnboundSocket)> { + let common = self.common(); + let socket_type_inner = socket.socket_family(); + common.bind_socket(self.arc_self(), socket, config) + } + + /// The optional ipv4 address + /// FIXME: An interface indeed support multiple addresses + fn ipv4_addr(&self) -> Option { + self.common().ipv4_addr() + } + + /// The netmask. + /// FIXME: The netmask and IP address should be one-to-one if there are multiple ip address + fn netmask(&self) -> Option { + self.common().netmask() + } +} + +mod internal { + use super::*; + + /// A helper trait + pub trait IfaceInternal { + fn common(&self) -> &IfaceCommon; + /// The inner socket set + fn sockets(&self) -> MutexGuard> { + self.common().sockets() + } + /// The inner iface. + fn iface_inner(&self) -> MutexGuard { + self.common().interface() + } + /// The time we should do another poll. + fn next_poll_at_ms(&self) -> Option { + self.common().next_poll_at_ms() + } + fn arc_self(&self) -> Arc; + } +} diff --git a/services/libs/jinux-std/src/net/iface/time.rs b/services/libs/jinux-std/src/net/iface/time.rs new file mode 100644 index 000000000..4bcc1ff49 --- /dev/null +++ b/services/libs/jinux-std/src/net/iface/time.rs @@ -0,0 +1,6 @@ +use jinux_frame::timer::read_monotonic_milli_seconds; + +pub(super) fn get_network_timestamp() -> smoltcp::time::Instant { + let millis = read_monotonic_milli_seconds(); + smoltcp::time::Instant::from_millis(millis as i64) +} diff --git a/services/libs/jinux-std/src/net/iface/util.rs b/services/libs/jinux-std/src/net/iface/util.rs new file mode 100644 index 000000000..320ddd250 --- /dev/null +++ b/services/libs/jinux-std/src/net/iface/util.rs @@ -0,0 +1,79 @@ +use jinux_frame::timer::read_monotonic_milli_seconds; + +use crate::{ + prelude::*, + thread::{kernel_thread::KernelThreadExt, Thread}, +}; + +use super::Iface; + +pub enum BindPortConfig { + CanReuse(u16), + Specified(u16), + Ephemeral, +} + +impl BindPortConfig { + pub fn new(port: u16, can_reuse: bool) -> Result { + let config = if port != 0 { + if can_reuse { + Self::CanReuse(port) + } else { + Self::Specified(port) + } + } else { + if can_reuse { + return_errno_with_message!(Errno::EINVAL, "invalid bind port config"); + } else { + Self::Ephemeral + } + }; + Ok(config) + } + + pub(super) fn can_reuse(&self) -> bool { + if let Self::CanReuse(_) = self { + true + } else { + false + } + } + + pub(super) fn port(&self) -> Option { + match self { + Self::CanReuse(port) | Self::Specified(port) => Some(*port), + Self::Ephemeral => None, + } + } +} + +pub fn spawn_background_poll_thread(iface: Arc) { + // FIXME: use timer or wait_timeout when timer is enable. + let task_fn = move || { + debug!("spawn background poll thread"); + loop { + let next_poll_time = if let Some(next_poll_time) = iface.next_poll_at_ms() { + trace!("next poll time = {:?}", next_poll_time); + next_poll_time + } else { + Thread::yield_now(); + continue; + }; + let now = read_monotonic_milli_seconds(); + trace!("now = {:?}", now); + if now > next_poll_time { + // FIXME: now is later than next poll time. This may cause problem. + iface.poll(); + continue; + } + let duration = next_poll_time - now; + // FIXME: choose a suitable time interval + if duration < 10 { + iface.poll(); + } else { + Thread::yield_now(); + } + } + }; + Thread::spawn_kernel_thread(task_fn); +} diff --git a/services/libs/jinux-std/src/net/mod.rs b/services/libs/jinux-std/src/net/mod.rs new file mode 100644 index 000000000..db3af95de --- /dev/null +++ b/services/libs/jinux-std/src/net/mod.rs @@ -0,0 +1,35 @@ +use crate::{ + net::iface::{Iface, IfaceLoopback}, + prelude::*, +}; +use spin::Once; + +use self::iface::spawn_background_poll_thread; + +pub static IFACES: Once>> = Once::new(); + +pub mod iface; +pub mod socket; + +pub fn init() { + IFACES.call_once(|| { + let iface_loopback = IfaceLoopback::new(); + vec![iface_loopback] + }); + poll_ifaces(); +} + +/// Lazy init should be called after spawning init thread. +pub fn lazy_init() { + for iface in IFACES.get().unwrap() { + spawn_background_poll_thread(iface.clone()); + } +} + +/// Poll iface +pub fn poll_ifaces() { + let ifaces = IFACES.get().unwrap(); + for iface in ifaces.iter() { + iface.poll(); + } +}