I'm trying to move some data around b/w different threads but am getting the ole Copy trait-not-implemented error. Here's some code:
use std::future::Future;
use std::marker::PhantomData;
use std::sync::{Arc, Mutex};
/// Start external crates mocked here
#[derive(Clone, PartialEq, Eq)]
pub struct DecodeError {
inner: Box<Inner>,
}
#[derive(Clone, PartialEq, Eq)]
struct Inner {}
#[derive(Clone)]
pub struct Connection {}
pub trait Message: core::fmt::Debug + Send + Sync {
fn decode<B>(mut buf: B) -> Result<Self, DecodeError>
where
B: bytes::Buf,
Self: Default,
{
// do stuff
let mut message = Self::default();
Ok(message)
}
}
#[derive(Clone, Debug, Default)]
pub struct Request {}
impl Message for Request {}
#[derive(Clone, Debug, Default)]
pub struct Response {}
impl Message for Response {}
pub struct OtherResponse {}
pub enum ReplyError {
InvalidData,
}
pub struct EventMessage {
data: Vec<u8>,
}
pub struct Subscription {}
impl Subscription {
pub async fn next(&self) -> Option<EventMessage> {
Some(EventMessage { data: vec![] })
}
}
/// End external crates mocked here
#[derive(Clone)]
pub struct Publisher<T> {
connection: Connection,
subject: String,
resource_type: PhantomData<*const T>,
}
#[derive(Debug)]
pub enum PublishError {
SerializeError(String),
PublishError(String),
}
pub type PublishResult<T> = std::result::Result<T, PublishError>;
impl<T: Message> Publisher<T> {
pub fn new(connection: Connection, subject: String) -> Self {
let resource_type = PhantomData;
Publisher {
connection: connection,
subject,
resource_type,
}
}
pub async fn publish(&self, msg: T) -> PublishResult<()>
where
T: Message,
{
// do stuff to msg
Ok(())
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let node = Node::new("127.0.0.1", "node".into())
.await
.expect("connecting to NATS");
let p: Publisher<Request> = node.get_publisher("TOPIC".into());
let _submission_replyer: AsynkReplyer<Request, Response> = node
.get_replyer("request".into(), move |req: Arc<Mutex<Request>>| async {
let mut req = req.clone().lock().unwrap();
p.clone().publish(*req);
Ok(Response {})
})
.await;
Ok(())
}
pub struct Node {
name: String,
connection: Connection,
}
pub type ReplyResult<T> = std::result::Result<T, ReplyError>;
impl Node {
pub async fn new(_nats_url: &str, name: String) -> std::io::Result<Self> {
env_logger::init();
let connection = Connection {};
Ok(Node { name, connection })
}
pub fn get_publisher<T>(&self, subject: String) -> Publisher<T>
where
T: Message + Default,
{
Publisher::new(self.connection.clone(), subject)
}
pub async fn get_replyer<Req, Resp, Fut>(
&self,
subject: String,
callback: impl Fn(Arc<Mutex<Req>>) -> Fut + Send + Sync + 'static + Copy,
) -> AsynkReplyer<Req, Resp>
where
Req: Message + Default + 'static,
Resp: Message + Default,
Fut: Future<Output = ReplyResult<Resp>> + Send,
{
AsynkReplyer::new(&self.connection, subject, callback).await
}
}
pub struct AsynkReplyer<Req, Resp> {
request_type: PhantomData<Req>,
response_type: PhantomData<Resp>,
}
impl<Req: Message + Default + 'static, Resp: Message + Default> AsynkReplyer<Req, Resp> {
pub async fn new<Fut>(
connection: &Connection,
subject: String,
callback: impl Fn(Arc<Mutex<Req>>) -> Fut + Send + Sync + 'static + Copy,
) -> AsynkReplyer<Req, Resp>
where
Fut: Future<Output = ReplyResult<Resp>> + Send,
{
Self::start_subscription_handler(Subscription {}, callback).await;
AsynkReplyer {
request_type: PhantomData,
response_type: PhantomData,
}
}
pub async fn start_subscription_handler<Fut>(
subscription: Subscription,
callback: impl Fn(Arc<Mutex<Req>>) -> Fut + Send + Sync + 'static + Copy,
) where
Fut: Future<Output = ReplyResult<Resp>> + Send,
{
tokio::spawn(async move {
loop {
match subscription.next().await {
Some(msg) => {
Self::handle_request(msg, callback).await;
}
None => {
break;
}
}
}
});
}
/// Decodes + spins up another task to handle the request
pub async fn handle_request<Fut>(
msg: EventMessage,
callback: impl Fn(Arc<Mutex<Req>>) -> Fut + Send + Sync + 'static + Copy,
) -> ReplyResult<()>
where
Fut: Future<Output = ReplyResult<Resp>> + Send,
{
let decoded = Req::decode(msg.data.as_slice()).map_err(|_| ReplyError::InvalidData)?;
tokio::spawn(async move {
match callback(Arc::new(Mutex::new(decoded))).await {
Ok(response) => {
// do stuff
}
Err(e) => {}
}
});
Ok(())
}
}
error:
error[E0277]: the trait bound `Publisher<Request>: std::marker::Copy` is not satisfied in `[closure@src/main.rs:93:40: 97:10]`
--> src/main.rs:93:10
|
93 | .get_replyer("request".into(), move |req: Arc<Mutex<Request>>| async {
| __________^^^^^^^^^^^___________________-
| | |
| | within `[closure@src/main.rs:93:40: 97:10]`, the trait `std::marker::Copy` is not implemented for `Publisher<Request>`
94 | | let mut req = req.clone().lock().unwrap();
95 | | p.clone().publish(*req);
96 | | Ok(Response {})
97 | | })
| |_________- within this `[closure@src/main.rs:93:40: 97:10]`
|
= note: required because it appears within the type `[closure@src/main.rs:93:40: 97:10]`
error[E0277]: `*const Request` cannot be sent between threads safely
--> src/main.rs:93:10
|
93 | .get_replyer("request".into(), move |req: Arc<Mutex<Request>>| async {
| __________^^^^^^^^^^^___________________-
| | |
| | `*const Request` cannot be sent between threads safely
94 | | let mut req = req.clone().lock().unwrap();
95 | | p.clone().publish(*req);
96 | | Ok(Response {})
97 | | })
| |_________- within this `[closure@src/main.rs:93:40: 97:10]`
|
= help: within `[closure@src/main.rs:93:40: 97:10]`, the trait `Send` is not implemented for `*const Request`
= note: required because it appears within the type `PhantomData<*const Request>`
note: required because it appears within the type `Publisher<Request>`
--> src/main.rs:53:12
|
53 | pub struct Publisher<T> {
| ^^^^^^^^^
= note: required because it appears within the type `[closure@src/main.rs:93:40: 97:10]`
error[E0277]: `*const Request` cannot be shared between threads safely
--> src/main.rs:93:10
|
93 | .get_replyer("request".into(), move |req: Arc<Mutex<Request>>| async {
| __________^^^^^^^^^^^___________________-
| | |
| | `*const Request` cannot be shared between threads safely
94 | | let mut req = req.clone().lock().unwrap();
95 | | p.clone().publish(*req);
96 | | Ok(Response {})
97 | | })
| |_________- within this `[closure@src/main.rs:93:40: 97:10]`
|
= help: within `[closure@src/main.rs:93:40: 97:10]`, the trait `Sync` is not implemented for `*const Request`
= note: required because it appears within the type `PhantomData<*const Request>`
note: required because it appears within the type `Publisher<Request>`
--> src/main.rs:53:12
|
53 | pub struct Publisher<T> {
| ^^^^^^^^^
= note: required because it appears within the type `[closure@src/main.rs:93:40: 97:10]`
I can't (or can I) add the Copy attribute on the Publisher struct but that wont work since not all of its fields implement Copy. Despite this I've commented out the fields in Publisher that don't impl Copy and added the attribute to it just to see, and with that approach I get:
the trait `std::marker::Copy` is not implemented for `Request`
Request is a protobuf based struct compiled using the prost lib. I'm not able to add the Copy attribute to that because of some of its fields not implementing Copy such as String and Timestamp.
I'm wondering if the design here is just inherently bad or if there's a simple fix.
It seems to me, you've constrained that the
FnisCopybecause you are passing it to multipletokio::spawncalls. You've found thatCopyis very restrictive, howeverCloneis not. You should use it instead and simply call.clone()when you handle the new request:Then the only errors are
'*const Request' cannot be sent between threads safely. The compiler does not automatically implementSendorSyncfor pointers because it doesn't know if that's safe, but yourFnneeds to be called from different threads. Fortunately, you don't need to worry about that. Whether yourPhantomData<*const T>is there simply to satisfy the compiler or to enforce specific variance, you can get the same result like this:Then, now that we've fixed the type constraint errors, the compiler now produces errors about lifetimes:
req.clone().lock().unwrap()doesn't work because the result of.lock()is tied to the value fromreq.clone(), but that gets dropped immediately. The fix is that the.clone()is unnecessary and can be removed.p.clone().publish(*req)doesn't work since dereferencing aMutexLockGuardcannot provide a owned value, only a reference. You can fix this by adding a.clone()instead. If instead you think theArcparameter is exclusive, you can get ownership by following the advice here: How to take ownership of T from Arc<Mutex<T>>?The last lifetime error is a bit fuzzy because it has to do with the lifetime of the returned
Futurebeing tied to thereqparameter. This can be fixed by usingasync move { }but thenpis moved out of the closure into the future, meaning it is no longerFn. What you want is to movereqbut move a clone ofp. You can do this like so:"now I'm trying to resolve errs related to
await-ingp.publish" - the error has to do with the lock now persisting across anawaitpoint, but since the mutex guard doesn't implementSend, theFuturecannot beSend. You can fix this by locking and cloning in one step, so the lock isn't held:See this compiling on the playground. There are still a number of warnings that should be addressed (unused
Results), but I hope this gets you on the right path.