I'm writing a WebSocket server where a web client connects to play chess against a multithreaded computer AI. The WebSocket server wants to pass a Logger
object into the AI code. The Logger
object is going to pipe down log lines from the AI to the web client. The Logger
must contain a reference to the client connection.
I'm confused about how lifetimes interact with threads. I've reproduced the problem with a Wrapper
struct parameterized by a type. The run_thread
function tries to unwrap the value and log it.
use std::fmt::Debug;
use std::thread;
struct Wrapper<T: Debug> {
val: T,
}
fn run_thread<T: Debug>(wrapper: Wrapper<T>) {
let thr = thread::spawn(move || {
println!("{:?}", wrapper.val);
});
thr.join();
}
fn main() {
run_thread(Wrapper::<i32> { val: -1 });
}
The wrapper
argument lives on the stack, and its lifetime doesn't extend past run_thread
's stack frame, even though the thread will be joined before the stack frame ends. I'd could copy the value off the stack:
use std::fmt::Debug;
use std::thread;
struct Wrapper<T: Debug + Send> {
val: T,
}
fn run_thread<T: Debug + Send + 'static>(wrapper: Wrapper<T>) {
let thr = thread::spawn(move || {
println!("{:?}", wrapper.val);
});
thr.join();
}
fn main() {
run_thread(Wrapper::<i32> { val: -1 });
}
This will not work if T
is a reference to a big object I don't want copied:
use std::fmt::Debug;
use std::thread;
struct Wrapper<T: Debug + Send> {
val: T,
}
fn run_thread<T: Debug + Send + 'static>(wrapper: Wrapper<T>) {
let thr = thread::spawn(move || {
println!("{:?}", wrapper.val);
});
thr.join();
}
fn main() {
let mut v = Vec::new();
for i in 0..1000 {
v.push(i);
}
run_thread(Wrapper { val: &v });
}
Which results in:
error: `v` does not live long enough
--> src/main.rs:22:32
|
22 | run_thread(Wrapper { val: &v });
| ^ does not live long enough
23 | }
| - borrowed value only lives until here
|
= note: borrowed value must be valid for the static lifetime...
The only solution I can think of is to use an Arc
.
use std::fmt::Debug;
use std::sync::Arc;
use std::thread;
struct Wrapper<T: Debug + Send + Sync + 'static> {
arc_val: Arc<T>,
}
fn run_thread<T: Debug + Send + Sync + 'static>(wrapper: &Wrapper<T>) {
let arc_val = wrapper.arc_val.clone();
let thr = thread::spawn(move || {
println!("{:?}", *arc_val);
});
thr.join();
}
fn main() {
let mut v = Vec::new();
for i in 0..1000 {
v.push(i);
}
let w = Wrapper { arc_val: Arc::new(v) };
run_thread(&w);
println!("{}", (*w.arc_val)[0]);
}
In my real program, it appears that both the Logger
and the connection object must be placed in Arc
wrappers. It seems annoying that the client is required to box the connection in an Arc
when it is internal to the library that the code is parallelized. This is especially annoying because the lifetime of the connection is guaranteed to be greater than the lifetime of the worker threads.
Have I missed something?
The thread support in the standard library allows the created threads to outlive the thread that created them; that's a good thing! However, if you were to pass a reference to a stack-allocated variable to one of these threads, there's no guarantee that the variable will still be valid by the time the thread executes. In other languages, this would allow the thread to access invalid memory, creating a pile of memory safety issues.
Fortunately, we aren't limited to the standard library. At least two crates provide scoped threads — threads that are guaranteed to exit before a certain scope ends. These can ensure that stack variables will be available for the entire duration of the thread:
There are also crates that abstract away the low-level details of "threads" but allow you to accomplish your goals:
Here are examples of each. Each example spawns a number of threads and mutates a local vector in place with no locking, no Arc
, and no cloning. Note that the mutation has a sleep
call to help verify that the calls are happening in parallel.
You can extend the examples to share a reference to any type which implements Sync
, such as a Mutex
or an Atomic*
. Using these would introduce locking, however.
use scoped_threadpool::Pool; // 0.1.9
use std::{thread, time::Duration};
fn main() {
let mut vec = vec![1, 2, 3, 4, 5];
let mut pool = Pool::new(vec.len() as u32);
pool.scoped(|scoped| {
for e in &mut vec {
scoped.execute(move || {
thread::sleep(Duration::from_secs(1));
*e += 1;
});
}
});
println!("{:?}", vec);
}
use crossbeam; // 0.6.0
use std::{thread, time::Duration};
fn main() {
let mut vec = vec![1, 2, 3, 4, 5];
crossbeam::scope(|scope| {
for e in &mut vec {
scope.spawn(move |_| {
thread::sleep(Duration::from_secs(1));
*e += 1;
});
}
})
.expect("A child thread panicked");
println!("{:?}", vec);
}
use rayon::iter::{IntoParallelRefMutIterator, ParallelIterator}; // 1.0.3
use std::{thread, time::Duration};
fn main() {
let mut vec = vec![1, 2, 3, 4, 5];
vec.par_iter_mut().for_each(|e| {
thread::sleep(Duration::from_secs(1));
*e += 1;
});
println!("{:?}", vec);
}
the client is required to box the connection in an
Arc
when it is internal to the library that the code is parallelized
Perhaps you can hide your parallelism better then? Could you accept the logger and then wrap it in an Arc
/ Mutex
before handing it off to your threads?