Bridging between Tokio and Python
In the last few months, I have been working on new project. It is a rust library, that provides both a Rust, and Python library. I chose to build the Rust library with tokio I’m using Postgres to store state, and my preferred database library for Rust is sqlx. I also like Tokio because I wanted a smooth integration with the surrounding Tokio ecosystem for building networked applications, like Axum and Tonic.
I also wanted to use this library in Python applications. I frequently work with both Python and Rust, and I wanted an opportunity to learn how to use pyo3. The docs for pyo3 are solid and it explains how to do most of what I needed to do which was integrate tokio and pyo3. The other tool that I needed to use was maturin. Like pyo3, the documentation is excellent, and I was impressed at how thorough the project layout docs were as they took the time to cover both a pure rust package, and a mixed python/rust package.
The application and worker
My library contains an Application struct. The application takes configuration data, and acts as a factory for activities. The most important of these activities is the worker that consumes scheduled tasks and executes them. The Python library uses a Storage object from the core library that contains the core operations for creating, and running tasks. What I initially wanted to do was:
- // In the Python binding
- use pyo3::{exceptions::PyValueError, prelude::*};
- use crate::config::Config;
- use coreapp::storage::Storage;
- #[pyclass]
- struct Application {
- config: Config;
- storage: Arc<Storage>;
- }
- #[pymethods]
- impl Application {
- #[new]
- fn py_new(config: Config) -> Self {
- let storage = Storage::new(config.clone().into());
- Application {
- config: config,
- storage: Arc::new(storage),
- }
- }
- fn create_worker(&self, worker_id: String) -> Worker {
- Worker::new(
- config: self.config.clone(),
- storage: self.storage.clone(),
- worker_id,
- )
- }
- }
- #[pyclass]
- struct Worker {
- config: Config;
- storage: Arc<Storage>;
- worker_id: String;
- }
- #[pymethods]
- impl Worker {
- #[new]
- fn py_new(config: Config, storage: Arc<Storage>, worker_id: String) -> Self {
- Worker {
- config,
- storage,
- worker_id,
- }
- }
- /// Mark a run as complete.
- fn complete_run(&self, run_id: String, run_result: Vec<u8>) -> PyResult<()> {
- self.storage
- .complete_run(run_id, &run_result)
- .map_err(|e| PyValueError::new_err(format!("Could not complete_run: {e:?}")))
- }
- /// Other methods
- }
The first problem I encountered was with the Worker::new method. pyo3 enforces a few rules on values that pass between rust and Python. The most relevant to my scenarios was that rust structs need to implement IntoPyObject, and I can’t do that for Tokio because of the orphan rule. Instead, I needed to remove the Python bound constructor and directly initialize the struct:
- #[pymethods]
- impl Application {
- fn create_worker(&self, worker_id: String) -> Worker {
- Worker {
- config: self.config.clone(),
- storage: self.storage.clone(),
- worker_id,
- }
- }
- }
This approach let me use Rust’s smart pointers (like Arc) to share state with other application components. The drawback of this is that the Worker can’t ever be constructed in Python, it requires Rust structs, which can only be made from within Rust code. At first I was concerned that this would be a problem, but in practice it hasn’t mattered. In fact it has helped constrain the design and encourage simplicity in the boundaries between Python and Rust.
Using tokio in sync Python
Having an application class that acts like a factory for other components has been working out quite well as it has made passing a Tokio runtime around straightforward. For a period of time I had an intermediary struct that encapsulated the runtime + storage interaction, but it was so much boilerplate code. I found passing the runtime and storage around as a pair of parameters more ergonomic:
- #[pyclass]
- struct Application {
- config: Config;
- storage: Arc<Storage>;
- runtime: Arc<tokio::runtime::Runtime>;
- }
- #[pymethods]
- impl Application {
- #[new]
- fn py_new(config: Config) -> Self {
- let runtime = tokio::runtime::Builder::new_current_thread()
- .enable_all()
- .build()
- .unwrap();
- let storage = runtime.block_on(async {
- Storage::new(config.clone().into())
- });
- Application {
- config: config,
- storage: Arc::new(storage),
- runtime: Arc::new(runtime),
- }
- }
Then later when a worker needs to be created, the runtime and storage can be shared:
- #[pymethods]
- impl Application {
- // other methods...
- fn create_worker(&self, worker_id: String) -> WorkerInner {
- WorkerInner {
- config: self.config.clone(),
- storage: self.storage.clone(),
- runtime: self.runtime.clone(),
- worker_id,
- }
- }
- fn spawn_task(
- &self,
- task_name: &str,
- params: &[u8],
- ) -> PyResult<SpawnResult> {
- let result = self.runtime.block_on(
- self.storage.spawn_task(task_name, params)
- );
- result
- .map(|v| v.into())
- .map_err(|v| PyValueError::new_err(format!("Could not spawn task: {v:?}")))
- }
- }
The spawn_task method shows how the runtime and storage get used from a synchronous Python interface. The core library doesn’t return an error type that is compatible with Python, but that is solvable with map_err, and the Ok value is translated from the core API, to a Python value object using the Into trait. I am happy that I ended up with an implementation that isn’t too complicated given that I’m building a Python extension in Rust. I wanted to share this solution as I didn’t easily find another example when I was researching how to solve my initial problem.
I wanted to share this, as when I was initially stuck, I didn’t find any results, and it took some time to solve. I likely could have solved this faster with an agent, but I wouldn’t have learned something new, and been able to write about it. I hope to have this library ready as a development preview soon.
There are no comments, be the first!