initial commit
This commit is contained in:
commit
a3ebe46350
1
.gitignore
vendored
Normal file
1
.gitignore
vendored
Normal file
@ -0,0 +1 @@
|
|||||||
|
/target
|
82
Cargo.lock
generated
Normal file
82
Cargo.lock
generated
Normal file
@ -0,0 +1,82 @@
|
|||||||
|
# This file is automatically @generated by Cargo.
|
||||||
|
# It is not intended for manual editing.
|
||||||
|
version = 3
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "bytes"
|
||||||
|
version = "1.6.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "514de17de45fdb8dc022b1a7975556c53c86f9f0aa5f534b98977b171857c2c9"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "cfg-if"
|
||||||
|
version = "1.0.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "epore"
|
||||||
|
version = "0.1.0"
|
||||||
|
dependencies = [
|
||||||
|
"bytes",
|
||||||
|
"rand",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "getrandom"
|
||||||
|
version = "0.2.15"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "c4567c8db10ae91089c99af84c68c38da3ec2f087c3f82960bcdbf3656b6f4d7"
|
||||||
|
dependencies = [
|
||||||
|
"cfg-if",
|
||||||
|
"libc",
|
||||||
|
"wasi",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "libc"
|
||||||
|
version = "0.2.155"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "97b3888a4aecf77e811145cadf6eef5901f4782c53886191b2f693f24761847c"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "ppv-lite86"
|
||||||
|
version = "0.2.17"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "rand"
|
||||||
|
version = "0.8.5"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404"
|
||||||
|
dependencies = [
|
||||||
|
"libc",
|
||||||
|
"rand_chacha",
|
||||||
|
"rand_core",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "rand_chacha"
|
||||||
|
version = "0.3.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88"
|
||||||
|
dependencies = [
|
||||||
|
"ppv-lite86",
|
||||||
|
"rand_core",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "rand_core"
|
||||||
|
version = "0.6.4"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c"
|
||||||
|
dependencies = [
|
||||||
|
"getrandom",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "wasi"
|
||||||
|
version = "0.11.0+wasi-snapshot-preview1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
|
8
Cargo.toml
Normal file
8
Cargo.toml
Normal file
@ -0,0 +1,8 @@
|
|||||||
|
[package]
|
||||||
|
name = "epore"
|
||||||
|
version = "0.1.0"
|
||||||
|
edition = "2021"
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
bytes = "1.6.0"
|
||||||
|
rand = "0.8.5"
|
48
README.md
Normal file
48
README.md
Normal file
@ -0,0 +1,48 @@
|
|||||||
|
# Epore - Learning how to use epoll to Event Queue for non-blocking I/O
|
||||||
|
|
||||||
|
## Files Structure
|
||||||
|
|
||||||
|
- `ffi.rs`: This module will contain the code related to the syscalls we need to communicate with the host operating system.
|
||||||
|
- `main.rs`: This is the example program itself
|
||||||
|
- `poll.rs`: This module contains the main abstraction, which is a thin layer over epoll
|
||||||
|
|
||||||
|
## Overview
|
||||||
|
|
||||||
|
- `Poll`: Struct to interface with the OS's event notification system aka event queue (`io_uring`, `epoll`, `kqueue`, `IOCP`).
|
||||||
|
|
||||||
|
- `new()`: To create a new interface to OS's event queue.
|
||||||
|
Similar to [`epoll_create`](https://man7.org/linux/man-pages/man2/epoll_create.2.html)
|
||||||
|
- `registry()`: Returns a reference to the registry that we can use to register interest to be notified about new events.
|
||||||
|
Similar to [`int epoll_ctl(int epfd, int op, int fd, struct epoll_event *_Nullable event);`](https://man7.org/linux/man-pages/man2/epoll_ctl.2.html)
|
||||||
|
- `poll()`: blocks the thread it's called on until an event is ready or its times out, whichever occurs first.
|
||||||
|
|
||||||
|
- `Registry`: Struct to register interest in a certain `Event`.
|
||||||
|
|
||||||
|
- `Token`: Using `Token` to track which `TcpStream` socket generated the event.
|
||||||
|
|
||||||
|
### Sample Usage
|
||||||
|
|
||||||
|
```rust
|
||||||
|
let queue = Poll::new().unwrap();
|
||||||
|
let id = 1;
|
||||||
|
|
||||||
|
// register interest in events on a TcpStream
|
||||||
|
queue.registry().register(&stream, id, ...).unwrap();
|
||||||
|
|
||||||
|
// store the to be tracked events
|
||||||
|
let mut events = Vec::with_capacity(1);
|
||||||
|
|
||||||
|
// This will block the curren thread
|
||||||
|
queue.poll(&mut events, None).unwrap();
|
||||||
|
//...data is ready on one of the tracked streams
|
||||||
|
```
|
||||||
|
|
||||||
|
## Notes
|
||||||
|
|
||||||
|
### `Registry` and `Poll` Relationship
|
||||||
|
|
||||||
|
We can see that the struct `Poll` has an internal struct `Registry` inside of it. By moving the struct `Registry` inside of the `Poll` struct, we can call `Registry::try_clone()` to get an owned Registry instance.
|
||||||
|
|
||||||
|
Therefore, we can pass the `Registry` to other threads with `Arc`, allowing multiple threads to register their interest to the same `Poll` instance even when `Poll` is blocking another thread while waiting for new events to happen in `Poll::poll`
|
||||||
|
|
||||||
|
`Poll::poll()` requires exclusive access since it takes a `&mut self`, so when we're waiting for events in `Poll::poll()`, there is no way to register interest from a different thread at the same time if we rely on using `Poll` to register.
|
39
src/ffi.rs
Normal file
39
src/ffi.rs
Normal file
@ -0,0 +1,39 @@
|
|||||||
|
// Register interest
|
||||||
|
pub const EPOLL_CTL_ADD: i32 = 1;
|
||||||
|
|
||||||
|
// Bit mask so express that
|
||||||
|
// we are interest when the data is available to READ
|
||||||
|
pub const EPOLLIN: i32 = 0x1;
|
||||||
|
|
||||||
|
// Bit mask for requests
|
||||||
|
// edge-triggered notification
|
||||||
|
// for the associated file descriptor.
|
||||||
|
// The default behavior for epoll is level-triggered.
|
||||||
|
pub const EPOLLET: i32 = 1 << 31;
|
||||||
|
|
||||||
|
// Here we have the syscalls
|
||||||
|
// Unsafe !!!
|
||||||
|
#[link(name = "c")]
|
||||||
|
extern "C" {
|
||||||
|
pub fn epoll_create(size: i32) -> i32;
|
||||||
|
pub fn close(fd: i32) -> i32;
|
||||||
|
pub fn epoll_ctl(epfd: i32, op: i32, fd: i32, event: *mut Event) -> i32;
|
||||||
|
pub fn epoll_wait(epfd: i32, events: *mut Event, maxevents: i32, timeout: i32) -> i32;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Avoid padding by using repr(packed)
|
||||||
|
// Data struct is different in Rust compared to C
|
||||||
|
#[derive(Debug)]
|
||||||
|
#[repr(C)]
|
||||||
|
#[cfg_attr(target_arch = "x86_64", repr(packed))]
|
||||||
|
pub struct Event {
|
||||||
|
pub(crate) events: u32,
|
||||||
|
// Using `Token` a.k.a `epoll_data` to track which socket generated the event
|
||||||
|
pub(crate) epoll_data: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Event {
|
||||||
|
pub fn token(&self) -> usize {
|
||||||
|
self.epoll_data
|
||||||
|
}
|
||||||
|
}
|
186
src/main.rs
Normal file
186
src/main.rs
Normal file
@ -0,0 +1,186 @@
|
|||||||
|
use core::panic;
|
||||||
|
use std::{
|
||||||
|
collections::HashSet,
|
||||||
|
io::{self, Read, Result, Write},
|
||||||
|
net::TcpStream,
|
||||||
|
usize,
|
||||||
|
};
|
||||||
|
|
||||||
|
use bytes::{BufMut, BytesMut};
|
||||||
|
use ffi::Event;
|
||||||
|
use poll::Poll;
|
||||||
|
use rand::Rng;
|
||||||
|
|
||||||
|
mod ffi;
|
||||||
|
mod poll;
|
||||||
|
|
||||||
|
// build our request as a buffer of bytes (&[u8])
|
||||||
|
//
|
||||||
|
// NOTE: BytesMut does implement AsRef so that
|
||||||
|
// it can be easily converted into &[u8] for write_all()
|
||||||
|
//
|
||||||
|
fn get_req(path: &str) -> BytesMut {
|
||||||
|
let mut buffer = BytesMut::new();
|
||||||
|
buffer.put(&b"GET "[..]);
|
||||||
|
buffer.put(path.as_bytes());
|
||||||
|
buffer.put(&b" HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n"[..]);
|
||||||
|
buffer
|
||||||
|
}
|
||||||
|
|
||||||
|
// Function call happens when an event is ready to be processed
|
||||||
|
fn handle_event(
|
||||||
|
events: &[Event],
|
||||||
|
tcp_streams: &mut [TcpStream],
|
||||||
|
handled_tokens: &mut HashSet<usize>,
|
||||||
|
) -> Result<usize> {
|
||||||
|
let mut handled_events_curr = 0;
|
||||||
|
|
||||||
|
for event in events {
|
||||||
|
// The token helps differentiate each I/O resource
|
||||||
|
// in our case, there are
|
||||||
|
// 200 TcpStreams aka 200 TCP socket file descriptors
|
||||||
|
let resource_index = event.token();
|
||||||
|
|
||||||
|
// This buffer can hold 4096 characters/bytes
|
||||||
|
let mut data_buffer_read_from_tcp_stream = vec![0u8; 4096];
|
||||||
|
|
||||||
|
loop {
|
||||||
|
match tcp_streams[resource_index].read(&mut data_buffer_read_from_tcp_stream) {
|
||||||
|
// data_buffer_read_from_tcp_stream is completely drained
|
||||||
|
// we can consider the event successfully handled
|
||||||
|
Ok(0) => {
|
||||||
|
if !handled_tokens.insert(resource_index) {
|
||||||
|
break;
|
||||||
|
};
|
||||||
|
handled_events_curr += 1;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
// data_buffer_read_from_tcp_stream is not completely drained
|
||||||
|
// we still have some data left in the buffer
|
||||||
|
Ok(n) => {
|
||||||
|
let txt = String::from_utf8_lossy(&data_buffer_read_from_tcp_stream[..n]);
|
||||||
|
println!("RECEIVED: {:?}", event);
|
||||||
|
println!("{txt}\n------\n");
|
||||||
|
}
|
||||||
|
|
||||||
|
// WouldBlock indicates that the data transfer is not complete,
|
||||||
|
// but there is no data ready right now.
|
||||||
|
// the transfer must be retried
|
||||||
|
Err(error) if error.kind() == io::ErrorKind::WouldBlock => break,
|
||||||
|
Err(error) if error.kind() == io::ErrorKind::Interrupted => break,
|
||||||
|
// return the error and break the loop
|
||||||
|
Err(error) => return Err(error),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(handled_events_curr)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn main() -> Result<()> {
|
||||||
|
// The Event "queue":
|
||||||
|
// not really,
|
||||||
|
// just the interface to Linux's epoll_queue
|
||||||
|
let mut epoll_interface_registry = Poll::new().expect("Can't run epoll_create.");
|
||||||
|
|
||||||
|
// aka how many requests do we want to send
|
||||||
|
// this is also how many TcpStreams we will create
|
||||||
|
// also the number of TCP socket file descriptors
|
||||||
|
let number_of_events: usize = 200;
|
||||||
|
|
||||||
|
let mut tcp_streams: Vec<TcpStream> = vec![];
|
||||||
|
|
||||||
|
let packet_addr: &str = "localhost:8080";
|
||||||
|
|
||||||
|
for request_id in 0..number_of_events {
|
||||||
|
let random_delay = rand::thread_rng().gen_range(1..=number_of_events / 5) * 1000;
|
||||||
|
|
||||||
|
// the TcpServer should return our GET request after random_delay seconds
|
||||||
|
let url_path = format!("/{random_delay}/request-{request_id}");
|
||||||
|
|
||||||
|
let request_buffer = get_req(&url_path);
|
||||||
|
|
||||||
|
let mut tcp_stream = TcpStream::connect(packet_addr).expect("Failed to create TcpStream.");
|
||||||
|
|
||||||
|
// nonblocking: Moves this TCP stream into or out of nonblocking mode.
|
||||||
|
// This will result in read, write, recv and send operations
|
||||||
|
// becoming nonblocking, i.e.,
|
||||||
|
// immediately returning from their calls.
|
||||||
|
// If the IO operation is successful,
|
||||||
|
// Ok is returned and no further action is required.
|
||||||
|
// If the IO operation could not be completed and needs to be retried,
|
||||||
|
// an error with kind io::ErrorKind::WouldBlock is returned.
|
||||||
|
//
|
||||||
|
// nodelay: If set, this option disables the Nagle algorithm.
|
||||||
|
// This means that segments are always sent as soon as possible,
|
||||||
|
// even if there is only a small amount of data.
|
||||||
|
// When not set, data is buffered until there is a sufficient amount to send out,
|
||||||
|
// thereby avoiding the frequent sending of small packets.
|
||||||
|
tcp_stream
|
||||||
|
.set_nonblocking(true)
|
||||||
|
.and_then(|_| tcp_stream.set_nodelay(true))
|
||||||
|
.expect("Failed to set TcpStream to nonblocking and nodelay");
|
||||||
|
|
||||||
|
tcp_stream
|
||||||
|
.write_all(request_buffer.as_ref())
|
||||||
|
.expect("Failed to write to TcpStream.");
|
||||||
|
|
||||||
|
// register interests
|
||||||
|
// for when data is ready to be READ
|
||||||
|
// from this TcpStream,
|
||||||
|
// edge-triggered by EPOLLET
|
||||||
|
epoll_interface_registry
|
||||||
|
.registry()
|
||||||
|
// the request_id is also the token for file descriptor ID purposes
|
||||||
|
.register(&tcp_stream, request_id, ffi::EPOLLET | ffi::EPOLLIN)
|
||||||
|
.unwrap_or_else(|_| {
|
||||||
|
panic!("Failed to register interests in the event queue for {request_id}")
|
||||||
|
});
|
||||||
|
|
||||||
|
tcp_streams.push(tcp_stream);
|
||||||
|
}
|
||||||
|
|
||||||
|
println!("Finished sending requests");
|
||||||
|
|
||||||
|
let mut handled_events = 0;
|
||||||
|
|
||||||
|
// track which resources has been handled
|
||||||
|
//
|
||||||
|
// while a vector is simpler,
|
||||||
|
// a hashset is much more efficient
|
||||||
|
// as it only tracks the resources that are handled
|
||||||
|
// avoid empty allocations
|
||||||
|
let mut handled_tokens: HashSet<usize> = HashSet::new();
|
||||||
|
|
||||||
|
// this loop will run for a while
|
||||||
|
while handled_events < number_of_events {
|
||||||
|
// too low of a number would limit
|
||||||
|
// how many events the OS could notify us
|
||||||
|
// on each wake up (see: EPOLLET)
|
||||||
|
let mut events_buffer: Vec<Event> = Vec::with_capacity(20);
|
||||||
|
|
||||||
|
// when epoll_wait success, number of file descriptors
|
||||||
|
// ready for the requested I/O operation, or zero if no file
|
||||||
|
// descriptor became ready during the requested timeout
|
||||||
|
// milliseconds
|
||||||
|
//
|
||||||
|
// timeout equal to zero (aka None) causes epoll_wait() to return
|
||||||
|
// immediately, even if no events are available.
|
||||||
|
//
|
||||||
|
// this should speed up our loop a lil bit
|
||||||
|
epoll_interface_registry.poll(&mut events_buffer, None)?;
|
||||||
|
|
||||||
|
if events_buffer.is_empty() {
|
||||||
|
println!("Timed out");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// the loop only reaches here when an event is handled
|
||||||
|
handled_events += handle_event(&events_buffer, &mut tcp_streams, &mut handled_tokens)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
println!("Finished receiving all responses");
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
89
src/poll.rs
Normal file
89
src/poll.rs
Normal file
@ -0,0 +1,89 @@
|
|||||||
|
use crate::ffi;
|
||||||
|
use std::{
|
||||||
|
io::{self, Result},
|
||||||
|
net::TcpStream,
|
||||||
|
os::fd::AsRawFd,
|
||||||
|
};
|
||||||
|
|
||||||
|
// We can be interested in multiple events
|
||||||
|
type Events = Vec<ffi::Event>;
|
||||||
|
|
||||||
|
// The file descriptor of our target (could be a TCP socket or a TcpStream in our case)
|
||||||
|
pub struct Registry {
|
||||||
|
raw_fd: i32,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Registry {
|
||||||
|
// Register interest by adding it
|
||||||
|
// TcpStream is a high level representation of a TCP socket file descriptor
|
||||||
|
// token is too differentiate from different file descriptor, as a label
|
||||||
|
pub fn register(&self, source: &TcpStream, token: usize, interests: i32) -> Result<()> {
|
||||||
|
match unsafe {
|
||||||
|
ffi::epoll_ctl(
|
||||||
|
self.raw_fd,
|
||||||
|
ffi::EPOLL_CTL_ADD,
|
||||||
|
source.as_raw_fd(),
|
||||||
|
&mut ffi::Event {
|
||||||
|
events: interests as u32,
|
||||||
|
epoll_data: token,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
} {
|
||||||
|
exit_code if exit_code < 0 => Err(io::Error::last_os_error()),
|
||||||
|
_ => Ok(()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Drop for Registry {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
let res = unsafe { ffi::close(self.raw_fd) };
|
||||||
|
|
||||||
|
if res < 0 {
|
||||||
|
let err = io::Error::last_os_error();
|
||||||
|
eprintln!("ERROR: {err:?}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct Poll {
|
||||||
|
registry: Registry,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Poll {
|
||||||
|
pub fn new() -> Result<Self> {
|
||||||
|
let res = unsafe { ffi::epoll_create(1) };
|
||||||
|
if res < 0 {
|
||||||
|
return Err(io::Error::last_os_error());
|
||||||
|
}
|
||||||
|
Ok(Self {
|
||||||
|
registry: Registry { raw_fd: res },
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn registry(&self) -> &Registry {
|
||||||
|
&self.registry
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn poll(&mut self, events: &mut Events, timeout: Option<i32>) -> Result<()> {
|
||||||
|
let fd = self.registry.raw_fd;
|
||||||
|
|
||||||
|
let timeout = timeout.unwrap_or(-1);
|
||||||
|
|
||||||
|
let max_events = events.capacity() as i32;
|
||||||
|
|
||||||
|
let res = unsafe { ffi::epoll_wait(fd, events.as_mut_ptr(), max_events, timeout) };
|
||||||
|
|
||||||
|
if res < 0 {
|
||||||
|
return Err(io::Error::last_os_error());
|
||||||
|
};
|
||||||
|
|
||||||
|
// when epoll_wait success, number of file descriptors
|
||||||
|
// ready for the requested I/O operation, or zero if no file
|
||||||
|
// descriptor became ready during the requested timeout
|
||||||
|
// milliseconds
|
||||||
|
unsafe { events.set_len(res as usize) };
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user