#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};
#[cfg(feature = "lua")]
use wireplumber::lua::from_variant;
use {
futures::{channel::mpsc, future, FutureExt, StreamExt},
glib::{prelude::*, Error, SourceId, Variant},
once_cell::unsync::OnceCell,
std::{future::Future, iter, pin::Pin},
wireplumber::{
core::ObjectFeatures,
error,
log::{info, warning},
plugin::{self, AsyncPluginImpl, SimplePlugin, SimplePluginObject, SourceHandlesCell},
prelude::*,
pw::{self, Link, Node, Port, Properties, ProxyFeatures},
registry::{Constraint, ConstraintType, Interest, ObjectManager},
},
};
const LOG_DOMAIN: &'static str = "static-link";
#[derive(Debug, Clone)]
#[cfg_attr(feature = "serde", derive(Deserialize, Serialize))]
pub struct PortMapping {
output: Vec<Constraint>,
input: Vec<Constraint>,
}
#[doc(hidden)]
#[cfg(feature = "serde")]
fn true_() -> bool {
true
}
#[derive(Debug, Clone)]
#[cfg_attr(feature = "serde", derive(Deserialize, Serialize))]
pub struct StaticLinkArgs {
output: Vec<Constraint>,
input: Vec<Constraint>,
#[cfg_attr(feature = "serde", serde(default, rename = "mappings"))]
port_mappings: Vec<PortMapping>,
#[cfg_attr(feature = "serde", serde(default = "true_"))]
passive: bool,
#[cfg_attr(feature = "serde", serde(default = "true_"))]
linger: bool,
}
impl Default for StaticLinkArgs {
fn default() -> Self {
Self {
output: Default::default(),
input: Default::default(),
port_mappings: Default::default(),
passive: true,
linger: true,
}
}
}
fn link_ports<'a>(
mappings: &'a [PortMapping],
core: &'a Core,
output: &'a Node,
input: &'a Node,
link_props: &'a Properties,
) -> impl Iterator<Item = Result<Link, Error>> + 'a {
mappings.iter().flat_map(move |mapping| {
let port_input_interest: Interest<Port> = mapping
.input
.iter()
.chain(iter::once(&Constraint::compare(
ConstraintType::default(),
pw::PW_KEY_PORT_DIRECTION,
"in",
true,
)))
.collect();
let port_inputs = port_input_interest.filter(input).into_iter();
let port_output_interest: Interest<Port> = mapping
.output
.iter()
.chain(iter::once(&Constraint::compare(
ConstraintType::default(),
pw::PW_KEY_PORT_DIRECTION,
"out",
true,
)))
.collect();
let port_outputs = move || port_output_interest.clone().filter(output).into_iter();
port_inputs.flat_map(move |i| port_outputs().map(move |o| Link::new(&core, &o, &i, link_props)))
})
}
pub async fn main_loop(
om: ObjectManager,
core: Core,
arg: StaticLinkArgs,
input_interest: Interest<Node>,
output_interest: Interest<Node>,
mut rx: mpsc::Receiver<()>,
) {
let link_props = Properties::new();
link_props.insert(pw::PW_KEY_LINK_PASSIVE, arg.passive);
link_props.insert(pw::PW_KEY_OBJECT_LINGER, arg.linger);
while let Some(()) = rx.next().await {
let inputs = input_interest.clone().filter(&om).into_iter();
let outputs = || output_interest.clone().filter(&om).into_iter();
let pairs = inputs.flat_map(|i| outputs().map(move |o| (i.clone(), o)));
let mut links = Vec::new();
for (input, output) in pairs {
info!(domain: LOG_DOMAIN, "linking {input} to {output}");
if arg.port_mappings.is_empty() {
links.push(Link::new(&core, &output, &input, &link_props));
} else {
links.extend(link_ports(&arg.port_mappings, &core, &output, &input, &link_props));
}
}
let links = links.into_iter().filter_map(|l| match l {
Ok(link) => Some(link.activate_future(ProxyFeatures::MINIMAL).map(|res| match res {
Err(e) if Link::error_is_exists(&e) => info!(domain: LOG_DOMAIN, "{:?}", e),
Err(e) => warning!(domain: LOG_DOMAIN, "Failed to activate link: {:?}", e),
Ok(_) => (),
})),
Err(e) => {
warning!(domain: LOG_DOMAIN, "Failed to create link: {:?}", e);
None
},
});
future::join_all(links).await;
}
}
pub async fn main_async(
plugin: &SimplePluginObject<StaticLink>,
core: Core,
arg: StaticLinkArgs,
) -> Result<impl IntoIterator<Item = impl Future<Output = ()>>, Error> {
let om = ObjectManager::new();
let output_interest: Interest<Node> = arg.output.iter().collect();
om.add_interest(output_interest.clone());
let input_interest: Interest<Node> = arg.input.iter().collect();
om.add_interest(input_interest.clone());
let (link_nodes_signal, rx) = mpsc::channel(1);
let port_signals = {
let mut object_added = om.signal_stream(ObjectManager::SIGNAL_OBJECT_ADDED);
let link_nodes_signal = link_nodes_signal.clone();
let plugin = plugin.downgrade();
async move {
while let Some((obj,)) = object_added.next().await {
let node: Node = obj.dynamic_cast().unwrap();
let plugin = match plugin.upgrade() {
Some(plugin) => plugin,
None => break,
};
plugin.spawn_local(
node
.signal_stream(Node::SIGNAL_PORTS_CHANGED)
.map(|_| Ok(()))
.forward(link_nodes_signal.clone())
.map(drop),
);
}
}
};
let object_signals = om
.signal_stream(ObjectManager::SIGNAL_OBJECTS_CHANGED)
.map(|_| Ok(()))
.forward(link_nodes_signal)
.map(drop);
om.request_object_features(Node::static_type(), ObjectFeatures::ALL);
core.install_object_manager(&om);
om.installed_future().await?;
let main_loop = main_loop(om, core, arg, input_interest, output_interest, rx);
Ok([
port_signals.boxed_local(),
object_signals.boxed_local(),
main_loop.boxed_local(),
])
}
#[derive(Default)]
pub struct StaticLink {
args: OnceCell<Vec<StaticLinkArgs>>,
handles: SourceHandlesCell,
}
impl AsyncPluginImpl for StaticLink {
type EnableFuture = Pin<Box<dyn Future<Output = Result<(), Error>>>>;
fn register_source(&self, source: SourceId) {
self.handles.push(source);
}
fn enable(&self, this: Self::Type) -> Self::EnableFuture {
let core = this.plugin_core();
let context = this.plugin_context();
let res = self
.handles
.try_init(context.clone())
.map_err(|_| error::invariant(format_args!("{LOG_DOMAIN} plugin has already been enabled")));
async move {
res?;
let loops = this
.args
.get()
.unwrap()
.iter()
.map(|arg| main_async(&this, core.clone(), arg.clone()));
for spawn in future::try_join_all(loops).await?.into_iter().flat_map(|l| l) {
this.spawn_local(spawn);
}
Ok(())
}
.boxed_local()
}
fn disable(&self) {
self.handles.clear();
}
}
impl SimplePlugin for StaticLink {
type Args = Vec<StaticLinkArgs>;
fn init_args(&self, args: Self::Args) {
self.args.set(args).unwrap();
}
#[cfg(all(feature = "serde", feature = "lua"))]
fn decode_args(args: Option<Variant>) -> Result<Self::Args, Error> {
args
.map(|args| from_variant(&args))
.unwrap_or(Ok(Default::default()))
.map_err(error::invalid_argument)
}
#[cfg(not(all(feature = "serde", feature = "lua")))]
fn decode_args(args: Option<Variant>) -> Result<Self::Args, Error> {
let args = args.map(|_args| {
warning!(domain: LOG_DOMAIN, "requires lua and serde build features");
Default::default()
});
Ok(args.unwrap_or_default())
}
}
plugin::simple_plugin_subclass! {
impl ObjectSubclass for LOG_DOMAIN as StaticLink { }
}
plugin::plugin_export!(StaticLink);