Source code for jupyter_ui_poll._poll

import asyncio
import contextlib
import sys
import time
from collections import abc
from functools import singledispatch
from inspect import isawaitable, iscoroutinefunction
from typing import (
    Any,
    AsyncIterable,
    AsyncIterator,
    Callable,
    Generic,
    Iterable,
    Iterator,
    List,
    Optional,
    Tuple,
    TypeVar,
)

from IPython import get_ipython

from ._async_thread import AsyncThread

T = TypeVar("T")

ZMQ_POLLOUT = 2  # zmq.POLLOUT without zmq dependency


class KernelWrapper:
    _current: Optional["KernelWrapper"] = None

    def __init__(self, shell, loop) -> None:
        kernel = shell.kernel

        self._shell = shell
        self._kernel = kernel
        self._loop = loop
        self._original_parent = (
            kernel._parent_ident,
            (
                kernel.get_parent()  # ipykernel 6+
                if hasattr(kernel, "get_parent")
                else kernel._parent_header
            ),  # ipykernel < 6
        )
        self._events: List[Tuple[Any, Any, Any]] = []
        self._backup_execute_request = kernel.shell_handlers["execute_request"]
        self._backup_main_asyncio_lock = None
        if hasattr(kernel, "_main_asyncio_lock"):  # ipykernel 7+
            # Introduced in https://github.com/ipython/ipykernel/pull/1430
            # Does not seem to have a very good reason to be introduced, only to reduce flakiness
            self._backup_main_asyncio_lock = kernel._main_asyncio_lock
            kernel._main_asyncio_lock = contextlib.nullcontext()
        self._aproc = None

        if iscoroutinefunction(self._backup_execute_request):  # ipykernel 6+
            kernel.shell_handlers["execute_request"] = self._execute_request_async
        else:
            # ipykernel < 6
            kernel.shell_handlers["execute_request"] = self._execute_request

        # Previously, we used "post_execute", but a comm message handler can also trigger this event:
        #  https://github.com/ipython/comm/blob/73e28fc4adaca9b05dd437d70717480be19ce25b/comm/base_comm.py#L151
        # Instead, we now use the "post_run_cell" event, which is also triggered after the cell is executed.
        # However, "post_run_cell" is not triggered, it's unclear when this happens, but it should not happen
        # for normal cell execution.
        shell.events.register("post_run_cell", self._post_run_cell)

    def restore(self):
        if self._backup_execute_request is not None:
            self._kernel.shell_handlers["execute_request"] = (
                self._backup_execute_request
            )
            self._backup_execute_request = None
            if self._backup_main_asyncio_lock is not None:
                self._kernel._main_asyncio_lock = self._backup_main_asyncio_lock
                self._backup_main_asyncio_lock = None

    def _reset_output(self):
        self._kernel.set_parent(*self._original_parent)

    def _execute_request(self, stream, ident, parent):
        # store away execute request for later and reset io back to the original cell
        self._events.append((stream, ident, parent))
        self._reset_output()

    async def _execute_request_async(self, stream, ident, parent):
        self._execute_request(stream, ident, parent)

    async def replay(self):
        kernel = self._kernel
        self.restore()

        sys.stdout.flush()
        sys.stderr.flush()
        shell_stream = getattr(
            kernel, "shell_stream", None
        )  # ipykernel 6 vs 5 differences

        for stream, ident, parent in self._events:
            kernel.set_parent(ident, parent)
            if kernel._aborting:
                kernel._send_abort_reply(stream, parent, ident)
            else:
                rr = kernel.execute_request(stream, ident, parent)
                if isawaitable(rr):
                    await rr

                # replicate shell_dispatch behaviour
                sys.stdout.flush()
                sys.stderr.flush()
                if shell_stream is not None:  # 6+
                    kernel._publish_status("idle", "shell")
                    shell_stream.flush(ZMQ_POLLOUT)
                else:
                    kernel._publish_status("idle")

    async def do_one_iteration(self):
        try:
            rr = self._kernel.do_one_iteration()
            if isawaitable(rr):
                await rr
        except Exception:  # pylint: disable=broad-except
            # it's probably a bug in ipykernel,
            # .do_one_iteration() should not throw
            return
        finally:
            # reset stdio back to original cell
            self._reset_output()

    def _post_run_cell(self, *args, **kw):
        self._shell.events.unregister("post_run_cell", self._post_run_cell)
        self.restore()
        KernelWrapper._current = None
        asyncio.ensure_future(self.replay(), loop=self._loop)

    async def _poll_async(self, n=1):
        for _ in range(n):
            await self.do_one_iteration()

    async def __aenter__(self):
        return self._poll_async

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        pass

    def __enter__(self):
        if self._aproc is not None:
            raise ValueError("Nesting not supported")
        self._aproc = AsyncThread()
        return self._aproc.wrap(self._poll_async)

    def __exit__(self, exc_type, exc_val, exc_tb):
        self._aproc.terminate()
        self._aproc = None

    @staticmethod
    def get() -> "KernelWrapper":
        if KernelWrapper._current is None:
            KernelWrapper._current = KernelWrapper(
                get_ipython(), asyncio.get_event_loop()
            )
        return KernelWrapper._current


class IteratorWrapperAsync(abc.AsyncIterable, Generic[T]):
    def __init__(
        self,
        its: AsyncIterable[T],
        n: int = 1,
    ):
        self._its = its
        self._n = n

    def __aiter__(self) -> AsyncIterator[T]:
        async def _loop(
            kernel: KernelWrapper, its: AsyncIterable[T], n: int
        ) -> AsyncIterator[T]:
            async with kernel as poll:
                async for x in its:
                    await poll(n)
                    yield x

        return _loop(KernelWrapper.get(), self._its, self._n)


class IteratorWrapper(abc.Iterable, Generic[T]):
    def __init__(
        self,
        its: Iterable[T],
        n: int = 1,
    ):
        self._its = its
        self._n = n

    def __iter__(self) -> Iterator[T]:
        def _loop(kernel: KernelWrapper, its: Iterable[T], n: int) -> Iterator[T]:
            with kernel as poll:
                try:
                    for x in its:
                        poll(n)
                        yield x
                except GeneratorExit:
                    pass
                except Exception as e:
                    raise e

        return _loop(KernelWrapper.get(), self._its, self._n)

    def __aiter__(self) -> AsyncIterator[T]:
        async def _loop(
            kernel: KernelWrapper, its: Iterable[T], n: int
        ) -> AsyncIterator[T]:
            async with kernel as poll:
                for x in its:
                    await poll(n)
                    yield x

        return _loop(KernelWrapper.get(), self._its, self._n)


[docs] def ui_events(): """ Gives you a function you can call to process UI events while running a long task inside a Jupyter cell. .. code-block:: python with ui_events() as ui_poll: while some_condition: ui_poll(10) # Process upto 10 UI events if any happened do_some_more_compute() Async mode is also supported: .. code-block:: python async with ui_events() as ui_poll: while some_condition: await ui_poll(10) # Process upto 10 UI events if any happened do_some_more_compute() #. Call ``kernel.do_one_iteration()`` taking care of IO redirects #. Intercept ``execute_request`` IPython kernel events and delay their execution #. Schedule replay of any blocked ``execute_request`` events when cell execution is finished. """ return KernelWrapper.get()
[docs] @singledispatch def with_ui_events(its, n: int = 1): """ Deal with kernel ui events while processing a long sequence Iterable returned from this can be used in both async and sync contexts. .. code-block:: python for x in with_ui_events(some_data_stream, n=10): do_things_with(x) async for x in with_ui_events(some_data_stream, n=10): await do_things_with(x) This is basically equivalent to: .. code-block:: python with ui_events() as poll: for x in some_data_stream: poll(10) do_things_with(x) :param its: Iterator to pass through, this should be either :class:`~collections.abc.Iterable` or :class:`~collections.abc.AsyncIterable` :param n: Number of events to process in between items :returns: :class:`~collections.abc.AsyncIterable` when input is :class:`~collections.abc.AsyncIterable` :returns: Object that implements both :class:`~collections.abc.Iterable` and :class:`~collections.abc.AsyncIterable` interfaces when input is normal :class:`~collections.abc.Iterable` """ raise TypeError("Expect Iterable[T]|AsyncIterable[T]")
@with_ui_events.register(abc.Iterable) def with_ui_events_sync(its: Iterable[T], n: int = 1) -> IteratorWrapper[T]: return IteratorWrapper(its, n=n) @with_ui_events.register(abc.AsyncIterable) def with_ui_events_async(its: AsyncIterable[T], n: int = 1) -> AsyncIterable[T]: return IteratorWrapperAsync(its, n=n)
[docs] def run_ui_poll_loop( f: Callable[[], Optional[T]], sleep: float = 0.02, n: int = 1 ) -> T: """ Repeatedly call ``f()`` until it returns something other than ``None`` while also responding to widget events. This blocks execution of cells below in the notebook while still preserving interactivity of jupyter widgets. :param f: Function to periodically call (``f()`` should not block for long) :param sleep: Amount of time to sleep in between polling (in seconds, 1/50 is the default) :param n: Number of events to process per iteration :returns: First non-``None`` value returned from ``f()`` """ def as_iterator( f: Callable[[], Optional[T]], sleep: float ) -> Iterator[Optional[T]]: x = None while x is None: if sleep is not None: time.sleep(sleep) x = f() yield x for x in with_ui_events(as_iterator(f, sleep), n): if x is not None: return x raise RuntimeError("hm...") # for mypy sake