Source code for panel.io.document

from __future__ import annotations

import asyncio
import dataclasses
import datetime as dt
import inspect
import logging
import threading

from contextlib import contextmanager
from functools import partial, wraps
from typing import (
    Callable, Iterator, List, Optional,
)

from bokeh.application.application import SessionContext
from bokeh.document.document import Document
from bokeh.document.events import DocumentChangedEvent, ModelChangedEvent

from .model import monkeypatch_events
from .state import curdoc_locked, state

logger = logging.getLogger(__name__)

#---------------------------------------------------------------------
# Private API
#---------------------------------------------------------------------

[docs]@dataclasses.dataclass class Request: headers : dict cookies : dict arguments : dict
[docs]class MockSessionContext(SessionContext): def __init__(self, *args, document=None, **kwargs): self._document = document super().__init__(*args, server_context=None, session_id=None, **kwargs)
[docs] def with_locked_document(self, *args): return
@property def destroyed(self) -> bool: return False @property def request(self): return Request(headers={}, cookies={}, arguments={})
def _dispatch_events(doc: Document, events: List[DocumentChangedEvent]) -> None: """ Handles dispatch of events which could not be processed in unlocked decorator. """ for event in events: doc.callbacks.trigger_on_change(event) def _cleanup_doc(doc): for callback in doc.session_destroyed_callbacks: try: callback(None) except Exception: pass if hasattr(doc.callbacks, '_change_callbacks'): doc.callbacks._change_callbacks[None] = {} # Remove views from ..viewable import Viewable views = {} for ref, (pane, root, vdoc, comm) in list(state._views.items()): if vdoc is doc: pane._cleanup(root) if isinstance(pane, Viewable): pane._hooks = [] for p in pane.select(): p._hooks = [] p._param_watchers = {} p._documents = {} p._callbacks = {} pane._param_watchers = {} pane._documents = {} pane._callbacks = {} else: views[ref] = (pane, root, doc, comm) state._views = views # Clean up templates if doc in state._templates: tmpl = state._templates[doc] tmpl._documents = {} del state._templates[doc] # Destroy doc doc.destroy(None) #--------------------------------------------------------------------- # Public API #--------------------------------------------------------------------- def init_doc(doc: Optional[Document]) -> Document: curdoc = doc or curdoc_locked() if not isinstance(curdoc, Document): curdoc = curdoc._doc if not curdoc.session_context: return curdoc thread = threading.current_thread() if thread: state._thread_id_[curdoc] = thread.ident session_id = curdoc.session_context.id sessions = state.session_info['sessions'] if session_id not in sessions: return curdoc sessions[session_id].update({ 'started': dt.datetime.now().timestamp() }) curdoc.on_event('document_ready', state._init_session) return curdoc
[docs]def with_lock(func: Callable) -> Callable: """ Wrap a callback function to execute with a lock allowing the function to modify bokeh models directly. Arguments --------- func: callable The callable to wrap Returns ------- wrapper: callable Function wrapped to execute without a Document lock. """ if inspect.iscoroutinefunction(func): @wraps(func) async def wrapper(*args, **kw): return await func(*args, **kw) else: @wraps(func) def wrapper(*args, **kw): return func(*args, **kw) wrapper.lock = True # type: ignore return wrapper
def dispatch_tornado(conn, event): from tornado.websocket import WebSocketHandler socket = conn._socket ws_conn = socket.ws_connection if not ws_conn or ws_conn.is_closing(): # type: ignore return [] msg = conn.protocol.create('PATCH-DOC', [event]) futures = [ WebSocketHandler.write_message(socket, msg.header_json), WebSocketHandler.write_message(socket, msg.metadata_json), WebSocketHandler.write_message(socket, msg.content_json) ] for header, payload in msg._buffers: futures.extend([ WebSocketHandler.write_message(socket, header), WebSocketHandler.write_message(socket, payload, binary=True) ]) return futures def dispatch_django(conn, event): socket = conn._socket msg = conn.protocol.create('PATCH-DOC', [event]) futures = [ socket.send(text_data=msg.header_json), socket.send(text_data=msg.metadata_json), socket.send(text_data=msg.content_json) ] for header, payload in msg._buffers: futures.extend([ socket.send(text_data=header), socket.send(binary_data=payload) ]) return futures
[docs]@contextmanager def unlocked() -> Iterator: """ Context manager which unlocks a Document and dispatches ModelChangedEvents triggered in the context body to all sockets on current sessions. """ curdoc = state.curdoc session_context = getattr(curdoc, 'session_context', None) session = getattr(session_context, 'session', None) if curdoc is None or session_context is None or session is None or state._jupyter_kernel_context: yield return elif curdoc.callbacks.hold_value: yield monkeypatch_events(curdoc.callbacks._held_events) return from tornado.websocket import WebSocketClosedError, WebSocketHandler connections = session._subscribed_connections curdoc.hold() try: yield locked = False for conn in connections: socket = conn._socket if hasattr(socket, 'write_lock') and socket.write_lock._block._value == 0: locked = True break events = curdoc.callbacks._held_events monkeypatch_events(events) remaining_events, futures = [], [] for event in events: if not isinstance(event, ModelChangedEvent) or locked: remaining_events.append(event) continue for conn in connections: if isinstance(conn._socket, WebSocketHandler): futures += dispatch_tornado(conn, event) else: futures += dispatch_django(conn, event) # Ensure that all write_message calls are awaited and handled async def handle_write_errors(): for future in futures: try: await future except WebSocketClosedError: logger.warning("Failed sending message as connection was closed") except Exception as e: logger.warning(f"Failed sending message due to following error: {e}") if state._unblocked(curdoc): try: asyncio.ensure_future(handle_write_errors()) except RuntimeError: curdoc.add_next_tick_callback(handle_write_errors) else: curdoc.add_next_tick_callback(handle_write_errors) curdoc.callbacks._held_events = remaining_events finally: try: curdoc.unhold() except RuntimeError: curdoc.add_next_tick_callback(partial(_dispatch_events, curdoc, remaining_events))