From 753ff1d911c1dac95ea82050d3c59007d5d1fc35 Mon Sep 17 00:00:00 2001 From: minhtrannhat Date: Fri, 28 Jun 2024 23:33:07 -0400 Subject: [PATCH] initial commit --- .gitignore | 1 + Cargo.lock | 7 ++++ Cargo.toml | 6 ++++ README.md | 48 ++++++++++++++++++++++++++ src/ffi.rs | 30 +++++++++++++++++ src/main.rs | 97 +++++++++++++++++++++++++++++++++++++++++++++++++++++ src/poll.rs | 87 +++++++++++++++++++++++++++++++++++++++++++++++ 7 files changed, 276 insertions(+) create mode 100644 .gitignore create mode 100644 Cargo.lock create mode 100644 Cargo.toml create mode 100644 README.md create mode 100644 src/ffi.rs create mode 100644 src/main.rs create mode 100644 src/poll.rs diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ea8c4bf --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +/target diff --git a/Cargo.lock b/Cargo.lock new file mode 100644 index 0000000..5cd4a91 --- /dev/null +++ b/Cargo.lock @@ -0,0 +1,7 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 3 + +[[package]] +name = "epore" +version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..b413d52 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,6 @@ +[package] +name = "epore" +version = "0.1.0" +edition = "2021" + +[dependencies] diff --git a/README.md b/README.md new file mode 100644 index 0000000..880a605 --- /dev/null +++ b/README.md @@ -0,0 +1,48 @@ +# Epore - Learning how to use epoll to Event Queue for non-blocking I/O + +## Files Structure + +- `ffi.rs`: This module will contain the code related to the syscalls we need to communicate with the host operating system. +- `main.rs`: This is the example program itself +- `poll.rs`: This module contains the main abstraction, which is a thin layer over epoll + +## Overview + +- `Poll`: Struct to interface with the OS's event notification system aka event queue (`io_uring`, `epoll`, `kqueue`, `IOCP`). + + - `new()`: To create a new interface to OS's event queue. + Similar to [`epoll_create`](https://man7.org/linux/man-pages/man2/epoll_create.2.html) + - `registry()`: Returns a reference to the registry that we can use to register interest to be notified about new events. + Similar to [`int epoll_ctl(int epfd, int op, int fd, struct epoll_event *_Nullable event);`](https://man7.org/linux/man-pages/man2/epoll_ctl.2.html) + - `poll()`: blocks the thread it's called on until an event is ready or its times out, whichever occurs first. + +- `Registry`: Struct to register interest in a certain `Event`. + +- `Token`: Using `Token` to track which `TcpStream` socket generated the event. + +### Sample Usage + +```rust +let queue = Poll::new().unwrap(); +let id = 1; + +// register interest in events on a TcpStream +queue.registry().register(&stream, id, ...).unwrap(); + +// store the to be tracked events +let mut events = Vec::with_capacity(1); + +// This will block the curren thread +queue.poll(&mut events, None).unwrap(); +//...data is ready on one of the tracked streams +``` + +## Notes + +### `Registry` and `Poll` Relationship + +We can see that the struct `Poll` has an internal struct `Registry` inside of it. By moving the struct `Registry` inside of the `Poll` struct, we can call `Registry::try_clone()` to get an owned Registry instance. + +Therefore, we can pass the `Registry` to other threads with `Arc`, allowing multiple threads to register their interest to the same `Poll` instance even when `Poll` is blocking another thread while waiting for new events to happen in `Poll::poll` + +`Poll::poll()` requires exclusive access since it takes a `&mut self`, so when we're waiting for events in `Poll::poll()`, there is no way to register interest from a different thread at the same time if we rely on using `Poll` to register. diff --git a/src/ffi.rs b/src/ffi.rs new file mode 100644 index 0000000..358ebd4 --- /dev/null +++ b/src/ffi.rs @@ -0,0 +1,30 @@ +pub const EPOLL_CTL_ADD: i32 = 1; +pub const EPOLLIN: i32 = 0x1; +pub const EPOLLET: i32 = 1 << 31; + +// Here we have the syscalls +// Unsafe !!! +#[link(name = "c")] +extern "C" { + pub fn epoll_create(size: i32) -> i32; + pub fn close(fd: i32) -> i32; + pub fn epoll_ctl(epfd: i32, op: i32, fd: i32, event: *mut Event) -> i32; + pub fn epoll_wait(epfd: i32, events: *mut Event, maxevents: i32, timeout: i32) -> i32; +} + +// Avoid padding by using repr(packed) +#[derive(Debug)] +#[repr(C)] +// FIX #5 +#[cfg_attr(target_arch = "x86_64", repr(packed))] +pub struct Event { + pub(crate) events: u32, + // Using `Token` a.k.a `epoll_data` to track which socket generated the event + pub(crate) epoll_data: usize, +} + +impl Event { + pub fn token(&self) -> usize { + self.epoll_data + } +} diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..f909caa --- /dev/null +++ b/src/main.rs @@ -0,0 +1,97 @@ +use std::{ + collections::HashSet, + io::{self, Read, Result, Write}, + net::TcpStream, +}; + +use ffi::Event; +use poll::Poll; + +mod ffi; +mod poll; + +fn get_req(path: &str) -> String { + format!( + "GET {path} HTTP/1.1\r\n\ + Host: localhost\r\n\ + Connection: close\r\n\ + \r\n" + ) +} + +fn handle_events( + events: &[Event], + streams: &mut [TcpStream], + handled: &mut HashSet, +) -> Result { + let mut handled_events = 0; + for event in events { + let index = event.token(); + let mut data = vec![0u8; 4096]; + + loop { + match streams[index].read(&mut data) { + Ok(n) if n == 0 => { + // FIX #4 + if !handled.insert(index) { + break; + } + handled_events += 1; + break; + } + Ok(n) => { + let txt = String::from_utf8_lossy(&data[..n]); + + println!("RECEIVED: {:?}", event); + println!("{txt}\n------\n"); + } + Err(e) if e.kind() == io::ErrorKind::WouldBlock => break, + Err(e) if e.kind() == io::ErrorKind::Interrupted => break, + Err(e) => return Err(e), + } + } + } + + Ok(handled_events) +} + +fn main() -> Result<()> { + let mut poll = Poll::new()?; + let n_events = 5; + + let mut streams = vec![]; + let addr = "localhost:8080"; + + for i in 0..n_events { + let delay = (n_events - i) * 1000; + let url_path = format!("/{delay}/request-{i}"); + let request = get_req(&url_path); + let mut stream = std::net::TcpStream::connect(addr)?; + stream.set_nonblocking(true)?; + + stream.write_all(request.as_bytes())?; + // NB! Token is equal to index in Vec + poll.registry() + .register(&stream, i, ffi::EPOLLIN | ffi::EPOLLET)?; + + streams.push(stream); + } + + let mut handled_ids = HashSet::new(); + + let mut handled_events = 0; + while handled_events < n_events { + let mut events = Vec::with_capacity(10); + poll.poll(&mut events, None)?; + + if events.is_empty() { + println!("TIMEOUT (OR SPURIOUS EVENT NOTIFICATION)"); + continue; + } + + handled_events += handle_events(&events, &mut streams, &mut handled_ids)?; + } + + println!("FINISHED"); + Ok(()) +} diff --git a/src/poll.rs b/src/poll.rs new file mode 100644 index 0000000..a49d3c4 --- /dev/null +++ b/src/poll.rs @@ -0,0 +1,87 @@ +use crate::ffi; +use std::{ + io::{self, Result}, + net::TcpStream, + os::fd::AsRawFd, +}; + +// We can be interested in multiple events +type Events = Vec; + +// The file descriptor of our target (could be a TCP socket or a TcpStream in our case) +pub struct Registry { + raw_fd: i32, +} + +impl Registry { + // Register interest + pub fn register(&self, source: &TcpStream, token: usize, interests: i32) -> Result<()> { + let mut event = ffi::Event { + events: interests as u32, + epoll_data: token, + }; + + let op = ffi::EPOLL_CTL_ADD; + + let res = unsafe { ffi::epoll_ctl(self.raw_fd, op, source.as_raw_fd(), &mut event) }; + + if res < 0 { + return Err(io::Error::last_os_error()); + } + + Ok(()) + } +} + +impl Drop for Registry { + fn drop(&mut self) { + let res = unsafe { ffi::close(self.raw_fd) }; + + if res < 0 { + let err = io::Error::last_os_error(); + eprintln!("ERROR: {err:?}"); + } + } +} + +pub struct Poll { + registry: Registry, +} + +impl Poll { + pub fn new() -> Result { + let res = unsafe { ffi::epoll_create(1) }; + if res < 0 { + return Err(io::Error::last_os_error()); + } + Ok(Self { + registry: Registry { raw_fd: res }, + }) + } + + pub fn registry(&self) -> &Registry { + &self.registry + } + + pub fn poll(&mut self, events: &mut Events, timeout: Option) -> Result<()> { + let fd = self.registry.raw_fd; + + let timeout = timeout.unwrap_or(-1); + + let max_events = events.capacity() as i32; + + let res = unsafe { ffi::epoll_wait(fd, events.as_mut_ptr(), max_events, timeout) }; + + if res < 0 { + return Err(io::Error::last_os_error()); + }; + + // when epoll_wait success, number of file descriptors + // ready for the requested I/O operation, or zero if no file + // descriptor became ready during the requested timeout + // milliseconds + unsafe { events.set_len(res as usize) }; + + Ok(()) + } +}