use {
crate::{BorrowedObject, ConnectDetails, DetailedSignal, FromValues, ObjectSignalExt, ToValueOption},
futures_channel::mpsc,
futures_core::{ready, FusedFuture, FusedStream, Stream},
glib::{
g_warning,
object::{ObjectExt, ObjectType},
value::FromValue,
Closure, SignalHandlerId, Value, WeakRef,
},
std::{
error::Error, fmt, future::Future, hint::unreachable_unchecked, io, mem::ManuallyDrop, pin::Pin, ptr, task::Poll,
},
};
#[must_use]
#[cfg_attr(docsrs, doc(cfg(feature = "futures")))]
#[derive(Debug)]
pub struct SignalStream<O: ObjectType, T> {
rx: mpsc::UnboundedReceiver<T>,
target: WeakRef<O>,
handle: Option<SignalHandlerId>,
}
impl<O: ObjectType, T> SignalStream<O, T> {
pub fn connect<F, S>(target: &O, signal: ConnectDetails<S>, res: F) -> Self
where
S: DetailedSignal<Arguments = T>,
T: for<'a> FromValues<'a> + 'static,
F: Fn(&O, &T) -> <<S as DetailedSignal>::Return as ToValueOption>::Type + 'static,
for<'a> BorrowedObject<'a, O>: FromValue<'a>,
{
let (tx, rx) = futures_channel::mpsc::unbounded();
let callback = move |values: &[Value]| {
let (this, args) = values.split_first().unwrap();
let this: BorrowedObject<O> = this.get().unwrap();
let args = FromValues::from_values(args).unwrap();
let res = res(&this, &args);
match tx.unbounded_send(args) {
Ok(()) => (),
Err(e) => {
g_warning!("glib-signal", "Failed to signal {:?}: {:?}", signal, e);
},
}
res.into().to_value_option()
};
let handle = unsafe { target.handle_closure(&signal.normalize(), &Closure::new_unsafe(callback)) }.unwrap();
SignalStream {
rx,
target: target.downgrade(),
handle: Some(handle),
}
}
pub fn once(self) -> OnceFuture<O, T> {
OnceFuture::new(self)
}
pub fn disconnect(&mut self) {
if let Some(handle) = self.handle.take() {
if let Some(target) = self.target.upgrade() {
target.disconnect(handle);
}
}
}
pub fn into_target(self) -> WeakRef<O> {
let mut this = ManuallyDrop::new(self);
this.disconnect();
unsafe { ptr::read(&this.target) }
}
pub fn target(&self) -> &WeakRef<O> {
&self.target
}
pub fn attach_target(self) -> SignalStreamSelf<O, T> {
SignalStreamSelf::from(self)
}
}
impl<O: ObjectType, T> Stream for SignalStream<O, T> {
type Item = T;
fn poll_next(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Option<Self::Item>> {
let rx = unsafe { self.map_unchecked_mut(|s| &mut s.rx) };
rx.poll_next(cx)
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.rx.size_hint()
}
}
impl<O: ObjectType, T> FusedStream for SignalStream<O, T> {
fn is_terminated(&self) -> bool {
self.rx.is_terminated()
}
}
impl<O: ObjectType, T> Drop for SignalStream<O, T> {
fn drop(&mut self) {
self.disconnect();
}
}
#[derive(Debug, Copy, Clone)]
pub struct ConnectEof;
impl fmt::Display for ConnectEof {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "unexpected connect handle EOF")
}
}
impl Error for ConnectEof {}
impl From<ConnectEof> for io::Error {
fn from(eof: ConnectEof) -> Self {
io::Error::new(io::ErrorKind::UnexpectedEof, eof)
}
}
impl From<ConnectEof> for glib::Error {
fn from(eof: ConnectEof) -> Self {
glib::Error::new(glib::FileError::Pipe, &format!("{:?}", eof))
}
}
pub struct OnceFuture<O: ObjectType, T> {
stream: Option<SignalStream<O, T>>,
}
impl<O: ObjectType, T> OnceFuture<O, T> {
pub fn new(stream: SignalStream<O, T>) -> Self {
Self { stream: Some(stream) }
}
pub fn into_inner(self) -> SignalStream<O, T> {
self.stream.unwrap()
}
}
impl<O: ObjectType, T> Future for OnceFuture<O, T> {
type Output = Result<(T, WeakRef<O>), ConnectEof>;
fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
let (res, stream) = unsafe {
let mut stream = self.map_unchecked_mut(|this| &mut this.stream);
let res = match stream.as_mut().as_pin_mut() {
Some(stream) => ready!(stream.poll_next(cx)),
None => return Poll::Pending,
};
(res, match stream.get_unchecked_mut().take() {
Some(s) => s,
None => unreachable_unchecked(),
})
};
let obj = stream.into_target();
Poll::Ready(match res {
Some(res) => Ok((res, obj)),
None => Err(ConnectEof),
})
}
}
impl<O: ObjectType, T> FusedFuture for OnceFuture<O, T> {
fn is_terminated(&self) -> bool {
self.stream.as_ref().map(|s| s.is_terminated()).unwrap_or(true)
}
}
pub struct SignalStreamSelf<O: ObjectType, T> {
inner: SignalStream<O, T>,
}
impl<O: ObjectType, T> From<SignalStream<O, T>> for SignalStreamSelf<O, T> {
fn from(inner: SignalStream<O, T>) -> Self {
Self { inner }
}
}
impl<O: ObjectType, T> Stream for SignalStreamSelf<O, T> {
type Item = (Option<O>, T);
fn poll_next(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Option<Self::Item>> {
let mut inner = unsafe { self.map_unchecked_mut(|s| &mut s.inner) };
Poll::Ready(ready!(inner.as_mut().poll_next(cx)).map(|res| (inner.target().upgrade(), res)))
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.inner.size_hint()
}
}
impl<O: ObjectType, T> FusedStream for SignalStreamSelf<O, T> {
fn is_terminated(&self) -> bool {
self.inner.is_terminated()
}
}