use std::{
    fmt,
    future::Future,
    pin::Pin,
    task::{Context, Poll},
};
use bitflags::bitflags;
use futures_core::ready;
use smallvec::SmallVec;
use crate::{
    actor::{Actor, ActorContext, ActorState, AsyncContext, Running, SpawnHandle, Supervised},
    address::{Addr, AddressSenderProducer},
    context_items::ActorWaitItem,
    fut::ActorFuture,
    mailbox::Mailbox,
};
bitflags! {
    #[derive(Debug)]
    struct ContextFlags: u8 {
        const STARTED =  0b0000_0001;
        const RUNNING =  0b0000_0010;
        const STOPPING = 0b0000_0100;
        const STOPPED =  0b0001_0000;
        const MB_CAP_CHANGED = 0b0010_0000;
    }
}
type Item<A> = (SpawnHandle, Pin<Box<dyn ActorFuture<A, Output = ()>>>);
pub trait AsyncContextParts<A>: ActorContext + AsyncContext<A>
where
    A: Actor<Context = Self>,
{
    fn parts(&mut self) -> &mut ContextParts<A>;
}
pub struct ContextParts<A>
where
    A: Actor,
    A::Context: AsyncContext<A>,
{
    addr: AddressSenderProducer<A>,
    flags: ContextFlags,
    wait: SmallVec<[ActorWaitItem<A>; 2]>,
    items: SmallVec<[Item<A>; 3]>,
    handles: SmallVec<[SpawnHandle; 2]>,
}
impl<A> fmt::Debug for ContextParts<A>
where
    A: Actor,
    A::Context: AsyncContext<A>,
{
    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
        fmt.debug_struct("ContextParts")
            .field("flags", &self.flags)
            .finish()
    }
}
impl<A> ContextParts<A>
where
    A: Actor,
    A::Context: AsyncContext<A>,
{
    #[inline]
    pub fn new(addr: AddressSenderProducer<A>) -> Self {
        ContextParts {
            addr,
            flags: ContextFlags::RUNNING,
            wait: SmallVec::new(),
            items: SmallVec::new(),
            handles: SmallVec::from_slice(&[SpawnHandle::default(), SpawnHandle::default()]),
        }
    }
    #[inline]
    pub fn stop(&mut self) {
        if self.flags.contains(ContextFlags::RUNNING) {
            self.flags.remove(ContextFlags::RUNNING);
            self.flags.insert(ContextFlags::STOPPING);
        }
    }
    #[inline]
    pub fn terminate(&mut self) {
        self.flags = ContextFlags::STOPPED;
    }
    #[inline]
    pub fn state(&self) -> ActorState {
        if self.flags.contains(ContextFlags::RUNNING) {
            ActorState::Running
        } else if self.flags.contains(ContextFlags::STOPPED) {
            ActorState::Stopped
        } else if self.flags.contains(ContextFlags::STOPPING) {
            ActorState::Stopping
        } else {
            ActorState::Started
        }
    }
    #[inline]
    pub fn waiting(&self) -> bool {
        !self.wait.is_empty()
            || self
                .flags
                .intersects(ContextFlags::STOPPING | ContextFlags::STOPPED)
    }
    #[inline]
    pub fn curr_handle(&self) -> SpawnHandle {
        self.handles[1]
    }
    #[inline]
    pub fn spawn<F>(&mut self, fut: F) -> SpawnHandle
    where
        F: ActorFuture<A, Output = ()> + 'static,
    {
        let handle = self.handles[0].next();
        self.handles[0] = handle;
        let fut: Box<dyn ActorFuture<A, Output = ()>> = Box::new(fut);
        self.items.push((handle, Pin::from(fut)));
        handle
    }
    #[inline]
    pub fn wait<F>(&mut self, f: F)
    where
        F: ActorFuture<A, Output = ()> + 'static,
    {
        self.wait.push(ActorWaitItem::new(f));
    }
    #[inline]
    pub fn cancel_future(&mut self, handle: SpawnHandle) -> bool {
        self.handles.push(handle);
        true
    }
    #[inline]
    pub fn capacity(&mut self) -> usize {
        self.addr.capacity()
    }
    #[inline]
    pub fn set_mailbox_capacity(&mut self, cap: usize) {
        self.flags.insert(ContextFlags::MB_CAP_CHANGED);
        self.addr.set_capacity(cap);
    }
    #[inline]
    pub fn address(&self) -> Addr<A> {
        Addr::new(self.addr.sender())
    }
    #[inline]
    pub(crate) fn restart(&mut self) {
        self.flags = ContextFlags::RUNNING;
        self.wait = SmallVec::new();
        self.items = SmallVec::new();
        self.handles[0] = SpawnHandle::default();
    }
    #[inline]
    pub fn started(&mut self) -> bool {
        self.flags.contains(ContextFlags::STARTED)
    }
    #[inline]
    pub fn connected(&self) -> bool {
        self.addr.connected()
    }
}
pub struct ContextFut<A, C>
where
    C: AsyncContextParts<A> + Unpin,
    A: Actor<Context = C>,
{
    ctx: C,
    act: A,
    mailbox: Mailbox<A>,
    wait: SmallVec<[ActorWaitItem<A>; 2]>,
    items: SmallVec<[Item<A>; 3]>,
}
impl<A, C> fmt::Debug for ContextFut<A, C>
where
    C: AsyncContextParts<A> + Unpin,
    A: Actor<Context = C>,
{
    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(fmt, "ContextFut {{ /* omitted */ }}")
    }
}
impl<A, C> Drop for ContextFut<A, C>
where
    C: AsyncContextParts<A> + Unpin,
    A: Actor<Context = C>,
{
    fn drop(&mut self) {
        if self.alive() {
            self.ctx.parts().stop();
            let waker = futures_task::noop_waker();
            let mut cx = std::task::Context::from_waker(&waker);
            let _ = Pin::new(self).poll(&mut cx);
        }
    }
}
impl<A, C> ContextFut<A, C>
where
    C: AsyncContextParts<A> + Unpin,
    A: Actor<Context = C>,
{
    pub fn new(ctx: C, act: A, mailbox: Mailbox<A>) -> Self {
        ContextFut {
            ctx,
            act,
            mailbox,
            wait: SmallVec::new(),
            items: SmallVec::new(),
        }
    }
    #[inline]
    pub fn ctx(&mut self) -> &mut C {
        &mut self.ctx
    }
    #[inline]
    pub fn address(&self) -> Addr<A> {
        self.mailbox.address()
    }
    #[inline]
    fn stopping(&mut self) -> bool {
        self.ctx
            .parts()
            .flags
            .intersects(ContextFlags::STOPPING | ContextFlags::STOPPED)
    }
    #[inline]
    pub fn alive(&mut self) -> bool {
        if self.ctx.parts().flags.contains(ContextFlags::STOPPED) {
            false
        } else {
            !self.ctx.parts().flags.contains(ContextFlags::STARTED)
                || self.mailbox.connected()
                || !self.items.is_empty()
                || !self.wait.is_empty()
        }
    }
    #[inline]
    pub fn restart(&mut self) -> bool
    where
        A: Supervised,
    {
        if self.mailbox.connected() {
            self.wait = SmallVec::new();
            self.items = SmallVec::new();
            self.ctx.parts().restart();
            self.act.restarting(&mut self.ctx);
            true
        } else {
            false
        }
    }
    fn merge(&mut self) -> bool {
        let mut modified = false;
        let parts = self.ctx.parts();
        if !parts.wait.is_empty() {
            modified = true;
            self.wait.extend(parts.wait.drain(0..));
        }
        if !parts.items.is_empty() {
            modified = true;
            self.items.extend(parts.items.drain(0..));
        }
        if parts.flags.contains(ContextFlags::MB_CAP_CHANGED) {
            modified = true;
            parts.flags.remove(ContextFlags::MB_CAP_CHANGED);
        }
        if parts.handles.len() > 2 {
            modified = true;
        }
        modified
    }
    fn clean_canceled_handle(&mut self) {
        fn remove_item_by_handle<C>(
            items: &mut SmallVec<[Item<C>; 3]>,
            handle: &SpawnHandle,
        ) -> bool {
            let mut idx = 0;
            let mut removed = false;
            while idx < items.len() {
                if &items[idx].0 == handle {
                    items.swap_remove(idx);
                    removed = true;
                } else {
                    idx += 1;
                }
            }
            removed
        }
        while self.ctx.parts().handles.len() > 2 {
            let handle = self.ctx.parts().handles.pop().unwrap();
            if !remove_item_by_handle(&mut self.items, &handle) {
                remove_item_by_handle(&mut self.ctx.parts().items, &handle);
            }
        }
    }
}
#[doc(hidden)]
impl<A, C> Future for ContextFut<A, C>
where
    C: AsyncContextParts<A> + Unpin,
    A: Actor<Context = C>,
{
    type Output = ();
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let this = self.get_mut();
        if !this.ctx.parts().flags.contains(ContextFlags::STARTED) {
            this.ctx.parts().flags.insert(ContextFlags::STARTED);
            Actor::started(&mut this.act, &mut this.ctx);
            if this.merge() {
                this.clean_canceled_handle();
            }
        }
        'outer: loop {
            while !this.wait.is_empty() && !this.stopping() {
                let idx = this.wait.len() - 1;
                let item = this.wait.last_mut().unwrap();
                ready!(Pin::new(item).poll(&mut this.act, &mut this.ctx, cx));
                this.wait.remove(idx);
                this.merge();
            }
            this.mailbox.poll(&mut this.act, &mut this.ctx, cx);
            if !this.wait.is_empty() && !this.stopping() {
                continue;
            }
            let mut idx = 0;
            while idx < this.items.len() && !this.stopping() {
                this.ctx.parts().handles[1] = this.items[idx].0;
                match Pin::new(&mut this.items[idx].1).poll(&mut this.act, &mut this.ctx, cx) {
                    Poll::Pending => {
                        if this.ctx.waiting() {
                            this.merge();
                        }
                        if this.ctx.parts().handles.len() > 2 {
                            this.clean_canceled_handle();
                            continue 'outer;
                        }
                        if !this.wait.is_empty() && !this.stopping() {
                            let next = this.items.len() - 1;
                            if idx != next {
                                this.items.swap(idx, next);
                            }
                            continue 'outer;
                        } else {
                            idx += 1;
                        }
                    }
                    Poll::Ready(()) => {
                        this.items.swap_remove(idx);
                        if this.ctx.waiting() {
                            this.merge();
                        }
                        if !this.wait.is_empty() && !this.stopping() {
                            continue 'outer;
                        }
                    }
                }
            }
            this.ctx.parts().handles[1] = SpawnHandle::default();
            if this.merge() && !this.ctx.parts().flags.contains(ContextFlags::STOPPING) {
                if this.items.is_empty() {
                    this.ctx.parts().handles.truncate(2);
                }
                continue;
            }
            if this.ctx.parts().flags.contains(ContextFlags::RUNNING) {
                if !this.alive() && Actor::stopping(&mut this.act, &mut this.ctx) == Running::Stop {
                    this.ctx.parts().flags = ContextFlags::STOPPED | ContextFlags::STARTED;
                    Actor::stopped(&mut this.act, &mut this.ctx);
                    return Poll::Ready(());
                }
            } else if this.ctx.parts().flags.contains(ContextFlags::STOPPING) {
                if Actor::stopping(&mut this.act, &mut this.ctx) == Running::Stop {
                    this.ctx.parts().flags = ContextFlags::STOPPED | ContextFlags::STARTED;
                    Actor::stopped(&mut this.act, &mut this.ctx);
                    return Poll::Ready(());
                } else {
                    this.ctx.parts().flags.remove(ContextFlags::STOPPING);
                    this.ctx.parts().flags.insert(ContextFlags::RUNNING);
                    continue;
                }
            } else if this.ctx.parts().flags.contains(ContextFlags::STOPPED) {
                Actor::stopped(&mut this.act, &mut this.ctx);
                return Poll::Ready(());
            }
            return Poll::Pending;
        }
    }
}