initial commit
This commit is contained in:
commit
38fe43541e
1
.gitignore
vendored
Normal file
1
.gitignore
vendored
Normal file
@ -0,0 +1 @@
|
||||
/target
|
7
Cargo.lock
generated
Normal file
7
Cargo.lock
generated
Normal file
@ -0,0 +1,7 @@
|
||||
# This file is automatically @generated by Cargo.
|
||||
# It is not intended for manual editing.
|
||||
version = 3
|
||||
|
||||
[[package]]
|
||||
name = "epore"
|
||||
version = "0.1.0"
|
6
Cargo.toml
Normal file
6
Cargo.toml
Normal file
@ -0,0 +1,6 @@
|
||||
[package]
|
||||
name = "epore"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
48
README.md
Normal file
48
README.md
Normal file
@ -0,0 +1,48 @@
|
||||
# Epore - Learning how to use epoll to Event Queue for non-blocking I/O
|
||||
|
||||
## Files Structure
|
||||
|
||||
- `ffi.rs`: This module will contain the code related to the syscalls we need to communicate with the host operating system.
|
||||
- `main.rs`: This is the example program itself
|
||||
- `poll.rs`: This module contains the main abstraction, which is a thin layer over epoll
|
||||
|
||||
## Overview
|
||||
|
||||
- `Poll`: Struct to interface with the OS's event notification system aka event queue (`io_uring`, `epoll`, `kqueue`, `IOCP`).
|
||||
|
||||
- `new()`: To create a new interface to OS's event queue.
|
||||
Similar to [`epoll_create`](https://man7.org/linux/man-pages/man2/epoll_create.2.html)
|
||||
- `registry()`: Returns a reference to the registry that we can use to register interest to be notified about new events.
|
||||
Similar to [`int epoll_ctl(int epfd, int op, int fd, struct epoll_event *_Nullable event);`](https://man7.org/linux/man-pages/man2/epoll_ctl.2.html)
|
||||
- `poll()`: blocks the thread it's called on until an event is ready or its times out, whichever occurs first.
|
||||
|
||||
- `Registry`: Struct to register interest in a certain `Event`.
|
||||
|
||||
- `Token`: Using `Token` to track which `TcpStream` socket generated the event.
|
||||
|
||||
### Sample Usage
|
||||
|
||||
```rust
|
||||
let queue = Poll::new().unwrap();
|
||||
let id = 1;
|
||||
|
||||
// register interest in events on a TcpStream
|
||||
queue.registry().register(&stream, id, ...).unwrap();
|
||||
|
||||
// store the to be tracked events
|
||||
let mut events = Vec::with_capacity(1);
|
||||
|
||||
// This will block the curren thread
|
||||
queue.poll(&mut events, None).unwrap();
|
||||
//...data is ready on one of the tracked streams
|
||||
```
|
||||
|
||||
## Notes
|
||||
|
||||
### `Registry` and `Poll` Relationship
|
||||
|
||||
We can see that the struct `Poll` has an internal struct `Registry` inside of it. By moving the struct `Registry` inside of the `Poll` struct, we can call `Registry::try_clone()` to get an owned Registry instance.
|
||||
|
||||
Therefore, we can pass the `Registry` to other threads with `Arc`, allowing multiple threads to register their interest to the same `Poll` instance even when `Poll` is blocking another thread while waiting for new events to happen in `Poll::poll`
|
||||
|
||||
`Poll::poll()` requires exclusive access since it takes a `&mut self`, so when we're waiting for events in `Poll::poll()`, there is no way to register interest from a different thread at the same time if we rely on using `Poll` to register.
|
30
src/ffi.rs
Normal file
30
src/ffi.rs
Normal file
@ -0,0 +1,30 @@
|
||||
pub const EPOLL_CTL_ADD: i32 = 1;
|
||||
pub const EPOLLIN: i32 = 0x1;
|
||||
pub const EPOLLET: i32 = 1 << 31;
|
||||
|
||||
// Here we have the syscalls
|
||||
// Unsafe !!!
|
||||
#[link(name = "c")]
|
||||
extern "C" {
|
||||
pub fn epoll_create(size: i32) -> i32;
|
||||
pub fn close(fd: i32) -> i32;
|
||||
pub fn epoll_ctl(epfd: i32, op: i32, fd: i32, event: *mut Event) -> i32;
|
||||
pub fn epoll_wait(epfd: i32, events: *mut Event, maxevents: i32, timeout: i32) -> i32;
|
||||
}
|
||||
|
||||
// Avoid padding by using repr(packed)
|
||||
#[derive(Debug)]
|
||||
#[repr(C)]
|
||||
// FIX #5
|
||||
#[cfg_attr(target_arch = "x86_64", repr(packed))]
|
||||
pub struct Event {
|
||||
pub(crate) events: u32,
|
||||
// Using `Token` a.k.a `epoll_data` to track which socket generated the event
|
||||
pub(crate) epoll_data: usize,
|
||||
}
|
||||
|
||||
impl Event {
|
||||
pub fn token(&self) -> usize {
|
||||
self.epoll_data
|
||||
}
|
||||
}
|
104
src/main.rs
Normal file
104
src/main.rs
Normal file
@ -0,0 +1,104 @@
|
||||
use std::{
|
||||
collections::HashSet,
|
||||
io::{self, Read, Result, Write},
|
||||
net::TcpStream,
|
||||
};
|
||||
|
||||
use ffi::Event;
|
||||
use poll::Poll;
|
||||
|
||||
mod ffi;
|
||||
mod poll;
|
||||
|
||||
fn get_req(path: &str) -> String {
|
||||
format!(
|
||||
"GET {path} HTTP/1.1\r\n\
|
||||
Host: localhost\r\n\
|
||||
Connection: close\r\n\
|
||||
\r\n"
|
||||
)
|
||||
}
|
||||
|
||||
fn handle_events(
|
||||
events: &[Event],
|
||||
streams: &mut [TcpStream],
|
||||
// FIX #4: accepts a set of handled events as argument
|
||||
handled: &mut HashSet<usize>,
|
||||
) -> Result<usize> {
|
||||
let mut handled_events = 0;
|
||||
for event in events {
|
||||
let index = event.token();
|
||||
let mut data = vec![0u8; 4096];
|
||||
|
||||
loop {
|
||||
match streams[index].read(&mut data) {
|
||||
Ok(n) if n == 0 => {
|
||||
// FIX #4
|
||||
// `insert` returns false if the value already existed in the set.
|
||||
if !handled.insert(index) {
|
||||
break;
|
||||
}
|
||||
handled_events += 1;
|
||||
break;
|
||||
}
|
||||
Ok(n) => {
|
||||
let txt = String::from_utf8_lossy(&data[..n]);
|
||||
|
||||
println!("RECEIVED: {:?}", event);
|
||||
println!("{txt}\n------\n");
|
||||
}
|
||||
Err(e) if e.kind() == io::ErrorKind::WouldBlock => break,
|
||||
// this was not in the book example, but it's a error condition
|
||||
// you probably want to handle in some way (either by breaking
|
||||
// out of the loop or trying a new read call immediately)
|
||||
Err(e) if e.kind() == io::ErrorKind::Interrupted => break,
|
||||
Err(e) => return Err(e),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(handled_events)
|
||||
}
|
||||
|
||||
fn main() -> Result<()> {
|
||||
let mut poll = Poll::new()?;
|
||||
let n_events = 5;
|
||||
|
||||
let mut streams = vec![];
|
||||
let addr = "localhost:8080";
|
||||
|
||||
for i in 0..n_events {
|
||||
let delay = (n_events - i) * 1000;
|
||||
let url_path = format!("/{delay}/request-{i}");
|
||||
let request = get_req(&url_path);
|
||||
let mut stream = std::net::TcpStream::connect(addr)?;
|
||||
stream.set_nonblocking(true)?;
|
||||
|
||||
stream.write_all(request.as_bytes())?;
|
||||
// NB! Token is equal to index in Vec
|
||||
poll.registry()
|
||||
.register(&stream, i, ffi::EPOLLIN | ffi::EPOLLET)?;
|
||||
|
||||
streams.push(stream);
|
||||
}
|
||||
|
||||
// FIX #4: store the handled IDs
|
||||
let mut handled_ids = HashSet::new();
|
||||
|
||||
let mut handled_events = 0;
|
||||
while handled_events < n_events {
|
||||
let mut events = Vec::with_capacity(10);
|
||||
poll.poll(&mut events, None)?;
|
||||
|
||||
if events.is_empty() {
|
||||
println!("TIMEOUT (OR SPURIOUS EVENT NOTIFICATION)");
|
||||
continue;
|
||||
}
|
||||
|
||||
// ------------------------------------------------------⌄ FIX #4 (new signature)
|
||||
handled_events += handle_events(&events, &mut streams, &mut handled_ids)?;
|
||||
}
|
||||
|
||||
println!("FINISHED");
|
||||
Ok(())
|
||||
}
|
87
src/poll.rs
Normal file
87
src/poll.rs
Normal file
@ -0,0 +1,87 @@
|
||||
use crate::ffi;
|
||||
use std::{
|
||||
io::{self, Result},
|
||||
net::TcpStream,
|
||||
os::fd::AsRawFd,
|
||||
};
|
||||
|
||||
// We can be interested in multiple events
|
||||
type Events = Vec<ffi::Event>;
|
||||
|
||||
// The file descriptor of our target (could be a TCP socket or a TcpStream in our case)
|
||||
pub struct Registry {
|
||||
raw_fd: i32,
|
||||
}
|
||||
|
||||
impl Registry {
|
||||
// Register interest
|
||||
pub fn register(&self, source: &TcpStream, token: usize, interests: i32) -> Result<()> {
|
||||
let mut event = ffi::Event {
|
||||
events: interests as u32,
|
||||
epoll_data: token,
|
||||
};
|
||||
|
||||
let op = ffi::EPOLL_CTL_ADD;
|
||||
|
||||
let res = unsafe { ffi::epoll_ctl(self.raw_fd, op, source.as_raw_fd(), &mut event) };
|
||||
|
||||
if res < 0 {
|
||||
return Err(io::Error::last_os_error());
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for Registry {
|
||||
fn drop(&mut self) {
|
||||
let res = unsafe { ffi::close(self.raw_fd) };
|
||||
|
||||
if res < 0 {
|
||||
let err = io::Error::last_os_error();
|
||||
eprintln!("ERROR: {err:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Poll {
|
||||
registry: Registry,
|
||||
}
|
||||
|
||||
impl Poll {
|
||||
pub fn new() -> Result<Self> {
|
||||
let res = unsafe { ffi::epoll_create(1) };
|
||||
if res < 0 {
|
||||
return Err(io::Error::last_os_error());
|
||||
}
|
||||
Ok(Self {
|
||||
registry: Registry { raw_fd: res },
|
||||
})
|
||||
}
|
||||
|
||||
pub fn registry(&self) -> &Registry {
|
||||
&self.registry
|
||||
}
|
||||
|
||||
pub fn poll(&mut self, events: &mut Events, timeout: Option<i32>) -> Result<()> {
|
||||
let fd = self.registry.raw_fd;
|
||||
|
||||
let timeout = timeout.unwrap_or(-1);
|
||||
|
||||
let max_events = events.capacity() as i32;
|
||||
|
||||
let res = unsafe { ffi::epoll_wait(fd, events.as_mut_ptr(), max_events, timeout) };
|
||||
|
||||
if res < 0 {
|
||||
return Err(io::Error::last_os_error());
|
||||
};
|
||||
|
||||
// when epoll_wait success, number of file descriptors
|
||||
// ready for the requested I/O operation, or zero if no file
|
||||
// descriptor became ready during the requested timeout
|
||||
// milliseconds
|
||||
unsafe { events.set_len(res as usize) };
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user