commit a3ebe46350ffcf07fed8fd7bcb82c7bd4b80c9b3 Author: minhtrannhat Date: Fri Jun 28 23:33:07 2024 -0400 initial commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ea8c4bf --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +/target diff --git a/Cargo.lock b/Cargo.lock new file mode 100644 index 0000000..292ab45 --- /dev/null +++ b/Cargo.lock @@ -0,0 +1,82 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 3 + +[[package]] +name = "bytes" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "514de17de45fdb8dc022b1a7975556c53c86f9f0aa5f534b98977b171857c2c9" + +[[package]] +name = "cfg-if" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" + +[[package]] +name = "epore" +version = "0.1.0" +dependencies = [ + "bytes", + "rand", +] + +[[package]] +name = "getrandom" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4567c8db10ae91089c99af84c68c38da3ec2f087c3f82960bcdbf3656b6f4d7" +dependencies = [ + "cfg-if", + "libc", + "wasi", +] + +[[package]] +name = "libc" +version = "0.2.155" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97b3888a4aecf77e811145cadf6eef5901f4782c53886191b2f693f24761847c" + +[[package]] +name = "ppv-lite86" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" + +[[package]] +name = "rand" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" +dependencies = [ + "libc", + "rand_chacha", + "rand_core", +] + +[[package]] +name = "rand_chacha" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" +dependencies = [ + "ppv-lite86", + "rand_core", +] + +[[package]] +name = "rand_core" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" +dependencies = [ + "getrandom", +] + +[[package]] +name = "wasi" +version = "0.11.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..f9d93c7 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,8 @@ +[package] +name = "epore" +version = "0.1.0" +edition = "2021" + +[dependencies] +bytes = "1.6.0" +rand = "0.8.5" diff --git a/README.md b/README.md new file mode 100644 index 0000000..880a605 --- /dev/null +++ b/README.md @@ -0,0 +1,48 @@ +# Epore - Learning how to use epoll to Event Queue for non-blocking I/O + +## Files Structure + +- `ffi.rs`: This module will contain the code related to the syscalls we need to communicate with the host operating system. +- `main.rs`: This is the example program itself +- `poll.rs`: This module contains the main abstraction, which is a thin layer over epoll + +## Overview + +- `Poll`: Struct to interface with the OS's event notification system aka event queue (`io_uring`, `epoll`, `kqueue`, `IOCP`). + + - `new()`: To create a new interface to OS's event queue. + Similar to [`epoll_create`](https://man7.org/linux/man-pages/man2/epoll_create.2.html) + - `registry()`: Returns a reference to the registry that we can use to register interest to be notified about new events. + Similar to [`int epoll_ctl(int epfd, int op, int fd, struct epoll_event *_Nullable event);`](https://man7.org/linux/man-pages/man2/epoll_ctl.2.html) + - `poll()`: blocks the thread it's called on until an event is ready or its times out, whichever occurs first. + +- `Registry`: Struct to register interest in a certain `Event`. + +- `Token`: Using `Token` to track which `TcpStream` socket generated the event. + +### Sample Usage + +```rust +let queue = Poll::new().unwrap(); +let id = 1; + +// register interest in events on a TcpStream +queue.registry().register(&stream, id, ...).unwrap(); + +// store the to be tracked events +let mut events = Vec::with_capacity(1); + +// This will block the curren thread +queue.poll(&mut events, None).unwrap(); +//...data is ready on one of the tracked streams +``` + +## Notes + +### `Registry` and `Poll` Relationship + +We can see that the struct `Poll` has an internal struct `Registry` inside of it. By moving the struct `Registry` inside of the `Poll` struct, we can call `Registry::try_clone()` to get an owned Registry instance. + +Therefore, we can pass the `Registry` to other threads with `Arc`, allowing multiple threads to register their interest to the same `Poll` instance even when `Poll` is blocking another thread while waiting for new events to happen in `Poll::poll` + +`Poll::poll()` requires exclusive access since it takes a `&mut self`, so when we're waiting for events in `Poll::poll()`, there is no way to register interest from a different thread at the same time if we rely on using `Poll` to register. diff --git a/src/ffi.rs b/src/ffi.rs new file mode 100644 index 0000000..a22bf49 --- /dev/null +++ b/src/ffi.rs @@ -0,0 +1,39 @@ +// Register interest +pub const EPOLL_CTL_ADD: i32 = 1; + +// Bit mask so express that +// we are interest when the data is available to READ +pub const EPOLLIN: i32 = 0x1; + +// Bit mask for requests +// edge-triggered notification +// for the associated file descriptor. +// The default behavior for epoll is level-triggered. +pub const EPOLLET: i32 = 1 << 31; + +// Here we have the syscalls +// Unsafe !!! +#[link(name = "c")] +extern "C" { + pub fn epoll_create(size: i32) -> i32; + pub fn close(fd: i32) -> i32; + pub fn epoll_ctl(epfd: i32, op: i32, fd: i32, event: *mut Event) -> i32; + pub fn epoll_wait(epfd: i32, events: *mut Event, maxevents: i32, timeout: i32) -> i32; +} + +// Avoid padding by using repr(packed) +// Data struct is different in Rust compared to C +#[derive(Debug)] +#[repr(C)] +#[cfg_attr(target_arch = "x86_64", repr(packed))] +pub struct Event { + pub(crate) events: u32, + // Using `Token` a.k.a `epoll_data` to track which socket generated the event + pub(crate) epoll_data: usize, +} + +impl Event { + pub fn token(&self) -> usize { + self.epoll_data + } +} diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..e21e563 --- /dev/null +++ b/src/main.rs @@ -0,0 +1,186 @@ +use core::panic; +use std::{ + collections::HashSet, + io::{self, Read, Result, Write}, + net::TcpStream, + usize, +}; + +use bytes::{BufMut, BytesMut}; +use ffi::Event; +use poll::Poll; +use rand::Rng; + +mod ffi; +mod poll; + +// build our request as a buffer of bytes (&[u8]) +// +// NOTE: BytesMut does implement AsRef so that +// it can be easily converted into &[u8] for write_all() +// +fn get_req(path: &str) -> BytesMut { + let mut buffer = BytesMut::new(); + buffer.put(&b"GET "[..]); + buffer.put(path.as_bytes()); + buffer.put(&b" HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n"[..]); + buffer +} + +// Function call happens when an event is ready to be processed +fn handle_event( + events: &[Event], + tcp_streams: &mut [TcpStream], + handled_tokens: &mut HashSet, +) -> Result { + let mut handled_events_curr = 0; + + for event in events { + // The token helps differentiate each I/O resource + // in our case, there are + // 200 TcpStreams aka 200 TCP socket file descriptors + let resource_index = event.token(); + + // This buffer can hold 4096 characters/bytes + let mut data_buffer_read_from_tcp_stream = vec![0u8; 4096]; + + loop { + match tcp_streams[resource_index].read(&mut data_buffer_read_from_tcp_stream) { + // data_buffer_read_from_tcp_stream is completely drained + // we can consider the event successfully handled + Ok(0) => { + if !handled_tokens.insert(resource_index) { + break; + }; + handled_events_curr += 1; + break; + } + + // data_buffer_read_from_tcp_stream is not completely drained + // we still have some data left in the buffer + Ok(n) => { + let txt = String::from_utf8_lossy(&data_buffer_read_from_tcp_stream[..n]); + println!("RECEIVED: {:?}", event); + println!("{txt}\n------\n"); + } + + // WouldBlock indicates that the data transfer is not complete, + // but there is no data ready right now. + // the transfer must be retried + Err(error) if error.kind() == io::ErrorKind::WouldBlock => break, + Err(error) if error.kind() == io::ErrorKind::Interrupted => break, + // return the error and break the loop + Err(error) => return Err(error), + } + } + } + + Ok(handled_events_curr) +} + +fn main() -> Result<()> { + // The Event "queue": + // not really, + // just the interface to Linux's epoll_queue + let mut epoll_interface_registry = Poll::new().expect("Can't run epoll_create."); + + // aka how many requests do we want to send + // this is also how many TcpStreams we will create + // also the number of TCP socket file descriptors + let number_of_events: usize = 200; + + let mut tcp_streams: Vec = vec![]; + + let packet_addr: &str = "localhost:8080"; + + for request_id in 0..number_of_events { + let random_delay = rand::thread_rng().gen_range(1..=number_of_events / 5) * 1000; + + // the TcpServer should return our GET request after random_delay seconds + let url_path = format!("/{random_delay}/request-{request_id}"); + + let request_buffer = get_req(&url_path); + + let mut tcp_stream = TcpStream::connect(packet_addr).expect("Failed to create TcpStream."); + + // nonblocking: Moves this TCP stream into or out of nonblocking mode. + // This will result in read, write, recv and send operations + // becoming nonblocking, i.e., + // immediately returning from their calls. + // If the IO operation is successful, + // Ok is returned and no further action is required. + // If the IO operation could not be completed and needs to be retried, + // an error with kind io::ErrorKind::WouldBlock is returned. + // + // nodelay: If set, this option disables the Nagle algorithm. + // This means that segments are always sent as soon as possible, + // even if there is only a small amount of data. + // When not set, data is buffered until there is a sufficient amount to send out, + // thereby avoiding the frequent sending of small packets. + tcp_stream + .set_nonblocking(true) + .and_then(|_| tcp_stream.set_nodelay(true)) + .expect("Failed to set TcpStream to nonblocking and nodelay"); + + tcp_stream + .write_all(request_buffer.as_ref()) + .expect("Failed to write to TcpStream."); + + // register interests + // for when data is ready to be READ + // from this TcpStream, + // edge-triggered by EPOLLET + epoll_interface_registry + .registry() + // the request_id is also the token for file descriptor ID purposes + .register(&tcp_stream, request_id, ffi::EPOLLET | ffi::EPOLLIN) + .unwrap_or_else(|_| { + panic!("Failed to register interests in the event queue for {request_id}") + }); + + tcp_streams.push(tcp_stream); + } + + println!("Finished sending requests"); + + let mut handled_events = 0; + + // track which resources has been handled + // + // while a vector is simpler, + // a hashset is much more efficient + // as it only tracks the resources that are handled + // avoid empty allocations + let mut handled_tokens: HashSet = HashSet::new(); + + // this loop will run for a while + while handled_events < number_of_events { + // too low of a number would limit + // how many events the OS could notify us + // on each wake up (see: EPOLLET) + let mut events_buffer: Vec = Vec::with_capacity(20); + + // when epoll_wait success, number of file descriptors + // ready for the requested I/O operation, or zero if no file + // descriptor became ready during the requested timeout + // milliseconds + // + // timeout equal to zero (aka None) causes epoll_wait() to return + // immediately, even if no events are available. + // + // this should speed up our loop a lil bit + epoll_interface_registry.poll(&mut events_buffer, None)?; + + if events_buffer.is_empty() { + println!("Timed out"); + continue; + } + + // the loop only reaches here when an event is handled + handled_events += handle_event(&events_buffer, &mut tcp_streams, &mut handled_tokens)?; + } + + println!("Finished receiving all responses"); + + Ok(()) +} diff --git a/src/poll.rs b/src/poll.rs new file mode 100644 index 0000000..e7fd86a --- /dev/null +++ b/src/poll.rs @@ -0,0 +1,89 @@ +use crate::ffi; +use std::{ + io::{self, Result}, + net::TcpStream, + os::fd::AsRawFd, +}; + +// We can be interested in multiple events +type Events = Vec; + +// The file descriptor of our target (could be a TCP socket or a TcpStream in our case) +pub struct Registry { + raw_fd: i32, +} + +impl Registry { + // Register interest by adding it + // TcpStream is a high level representation of a TCP socket file descriptor + // token is too differentiate from different file descriptor, as a label + pub fn register(&self, source: &TcpStream, token: usize, interests: i32) -> Result<()> { + match unsafe { + ffi::epoll_ctl( + self.raw_fd, + ffi::EPOLL_CTL_ADD, + source.as_raw_fd(), + &mut ffi::Event { + events: interests as u32, + epoll_data: token, + }, + ) + } { + exit_code if exit_code < 0 => Err(io::Error::last_os_error()), + _ => Ok(()), + } + } +} + +impl Drop for Registry { + fn drop(&mut self) { + let res = unsafe { ffi::close(self.raw_fd) }; + + if res < 0 { + let err = io::Error::last_os_error(); + eprintln!("ERROR: {err:?}"); + } + } +} + +pub struct Poll { + registry: Registry, +} + +impl Poll { + pub fn new() -> Result { + let res = unsafe { ffi::epoll_create(1) }; + if res < 0 { + return Err(io::Error::last_os_error()); + } + Ok(Self { + registry: Registry { raw_fd: res }, + }) + } + + pub fn registry(&self) -> &Registry { + &self.registry + } + + pub fn poll(&mut self, events: &mut Events, timeout: Option) -> Result<()> { + let fd = self.registry.raw_fd; + + let timeout = timeout.unwrap_or(-1); + + let max_events = events.capacity() as i32; + + let res = unsafe { ffi::epoll_wait(fd, events.as_mut_ptr(), max_events, timeout) }; + + if res < 0 { + return Err(io::Error::last_os_error()); + }; + + // when epoll_wait success, number of file descriptors + // ready for the requested I/O operation, or zero if no file + // descriptor became ready during the requested timeout + // milliseconds + unsafe { events.set_len(res as usize) }; + + Ok(()) + } +}