#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};
#[cfg(feature = "lua")]
use wireplumber::lua::from_variant;
use {
	futures::{channel::mpsc, future, FutureExt, StreamExt},
	glib::{prelude::*, Error, SourceId, Variant},
	once_cell::unsync::OnceCell,
	std::{future::Future, iter, pin::Pin},
	wireplumber::{
		core::ObjectFeatures,
		error,
		log::{info, warning},
		plugin::{self, AsyncPluginImpl, SimplePlugin, SimplePluginObject, SourceHandlesCell},
		prelude::*,
		pw::{self, Link, Node, Port, Properties, ProxyFeatures},
		registry::{Constraint, ConstraintType, Interest, ObjectManager},
	},
};
const LOG_DOMAIN: &'static str = "static-link";
#[derive(Debug, Clone)]
#[cfg_attr(feature = "serde", derive(Deserialize, Serialize))]
pub struct PortMapping {
	output: Vec<Constraint>,
	input: Vec<Constraint>,
}
#[doc(hidden)]
#[cfg(feature = "serde")]
fn true_() -> bool {
	true
}
#[derive(Debug, Clone)]
#[cfg_attr(feature = "serde", derive(Deserialize, Serialize))]
pub struct StaticLinkArgs {
	output: Vec<Constraint>,
	input: Vec<Constraint>,
	#[cfg_attr(feature = "serde", serde(default, rename = "mappings"))]
	port_mappings: Vec<PortMapping>,
	#[cfg_attr(feature = "serde", serde(default = "true_"))]
	passive: bool,
	#[cfg_attr(feature = "serde", serde(default = "true_"))]
	linger: bool,
}
impl Default for StaticLinkArgs {
	fn default() -> Self {
		Self {
			output: Default::default(),
			input: Default::default(),
			port_mappings: Default::default(),
			passive: true,
			linger: true,
		}
	}
}
fn link_ports<'a>(
	mappings: &'a [PortMapping],
	core: &'a Core,
	output: &'a Node,
	input: &'a Node,
	link_props: &'a Properties,
) -> impl Iterator<Item = Result<Link, Error>> + 'a {
	mappings.iter().flat_map(move |mapping| {
		let port_input_interest: Interest<Port> = mapping
			.input
			.iter()
			.chain(iter::once(&Constraint::compare(
				ConstraintType::default(),
				pw::PW_KEY_PORT_DIRECTION,
				"in",
				true,
			)))
			.collect();
		let port_inputs = port_input_interest.filter(input).into_iter();
		let port_output_interest: Interest<Port> = mapping
			.output
			.iter()
			.chain(iter::once(&Constraint::compare(
				ConstraintType::default(),
				pw::PW_KEY_PORT_DIRECTION,
				"out",
				true,
			)))
			.collect();
		let port_outputs = move || port_output_interest.clone().filter(output).into_iter();
		port_inputs.flat_map(move |i| port_outputs().map(move |o| Link::new(&core, &o, &i, link_props)))
	})
}
pub async fn main_loop(
	om: ObjectManager,
	core: Core,
	arg: StaticLinkArgs,
	input_interest: Interest<Node>,
	output_interest: Interest<Node>,
	mut rx: mpsc::Receiver<()>,
) {
	let link_props = Properties::new();
	link_props.insert(pw::PW_KEY_LINK_PASSIVE, arg.passive);
	link_props.insert(pw::PW_KEY_OBJECT_LINGER, arg.linger);
	while let Some(()) = rx.next().await {
		let inputs = input_interest.clone().filter(&om).into_iter();
		let outputs = || output_interest.clone().filter(&om).into_iter();
		let pairs = inputs.flat_map(|i| outputs().map(move |o| (i.clone(), o)));
		let mut links = Vec::new();
		for (input, output) in pairs {
			info!(domain: LOG_DOMAIN, "linking {input} to {output}");
			if arg.port_mappings.is_empty() {
				links.push(Link::new(&core, &output, &input, &link_props));
			} else {
				links.extend(link_ports(&arg.port_mappings, &core, &output, &input, &link_props));
			}
		}
		let links = links.into_iter().filter_map(|l| match l {
			Ok(link) => Some(link.activate_future(ProxyFeatures::MINIMAL).map(|res| match res {
				Err(e) if Link::error_is_exists(&e) => info!(domain: LOG_DOMAIN, "{:?}", e),
				Err(e) => warning!(domain: LOG_DOMAIN, "Failed to activate link: {:?}", e),
				Ok(_) => (),
			})),
			Err(e) => {
				warning!(domain: LOG_DOMAIN, "Failed to create link: {:?}", e);
				None
			},
		});
		future::join_all(links).await;
	}
}
pub async fn main_async(
	plugin: &SimplePluginObject<StaticLink>,
	core: Core,
	arg: StaticLinkArgs,
) -> Result<impl IntoIterator<Item = impl Future<Output = ()>>, Error> {
	let om = ObjectManager::new();
	let output_interest: Interest<Node> = arg.output.iter().collect();
	om.add_interest(output_interest.clone());
	let input_interest: Interest<Node> = arg.input.iter().collect();
	om.add_interest(input_interest.clone());
	let (link_nodes_signal, rx) = mpsc::channel(1);
	let port_signals = {
		let mut object_added = om.signal_stream(ObjectManager::SIGNAL_OBJECT_ADDED);
		let link_nodes_signal = link_nodes_signal.clone();
		let plugin = plugin.downgrade();
		async move {
			while let Some((obj,)) = object_added.next().await {
				let node: Node = obj.dynamic_cast().unwrap();
				let plugin = match plugin.upgrade() {
					Some(plugin) => plugin,
					None => break,
				};
				plugin.spawn_local(
					node
						.signal_stream(Node::SIGNAL_PORTS_CHANGED)
						.map(|_| Ok(()))
						.forward(link_nodes_signal.clone())
						.map(drop),
				);
			}
		}
	};
	let object_signals = om
		.signal_stream(ObjectManager::SIGNAL_OBJECTS_CHANGED)
		.map(|_| Ok(()))
		.forward(link_nodes_signal)
		.map(drop);
	om.request_object_features(Node::static_type(), ObjectFeatures::ALL);
	core.install_object_manager(&om);
	om.installed_future().await?;
	let main_loop = main_loop(om, core, arg, input_interest, output_interest, rx);
	Ok([
		port_signals.boxed_local(),
		object_signals.boxed_local(),
		main_loop.boxed_local(),
	])
}
#[derive(Default)]
pub struct StaticLink {
	args: OnceCell<Vec<StaticLinkArgs>>,
	handles: SourceHandlesCell,
}
impl AsyncPluginImpl for StaticLink {
	type EnableFuture = Pin<Box<dyn Future<Output = Result<(), Error>>>>;
	fn register_source(&self, source: SourceId) {
		self.handles.push(source);
	}
	fn enable(&self, this: Self::Type) -> Self::EnableFuture {
		let core = this.plugin_core();
		let context = this.plugin_context();
		let res = self
			.handles
			.try_init(context.clone())
			.map_err(|_| error::invariant(format_args!("{LOG_DOMAIN} plugin has already been enabled")));
		async move {
			res?;
			let loops = this
				.args
				.get()
				.unwrap()
				.iter()
				.map(|arg| main_async(&this, core.clone(), arg.clone()));
			for spawn in future::try_join_all(loops).await?.into_iter().flat_map(|l| l) {
				this.spawn_local(spawn);
			}
			Ok(())
		}
		.boxed_local()
	}
	fn disable(&self) {
		self.handles.clear();
	}
}
impl SimplePlugin for StaticLink {
	type Args = Vec<StaticLinkArgs>;
	fn init_args(&self, args: Self::Args) {
		self.args.set(args).unwrap();
	}
	#[cfg(all(feature = "serde", feature = "lua"))]
	fn decode_args(args: Option<Variant>) -> Result<Self::Args, Error> {
		args
			.map(|args| from_variant(&args))
			.unwrap_or(Ok(Default::default()))
			.map_err(error::invalid_argument)
	}
	#[cfg(not(all(feature = "serde", feature = "lua")))]
	fn decode_args(args: Option<Variant>) -> Result<Self::Args, Error> {
		let args = args.map(|_args| {
			warning!(domain: LOG_DOMAIN, "requires lua and serde build features");
			Default::default()
		});
		Ok(args.unwrap_or_default())
	}
}
plugin::simple_plugin_subclass! {
	impl ObjectSubclass for LOG_DOMAIN as StaticLink { }
}
plugin::plugin_export!(StaticLink);