Source code for panel.io.jupyter_server_extension

"""
The Panel jupyter_server_extension implements Jupyter RequestHandlers
that allow a Panel application to run inside Jupyter kernels. This
allows Panel applications to be served in the kernel (and therefore
the environment) they were written for.

The `PanelJupyterHandler` may be given the path to a notebook, .py
file or Lumen .yaml file. It will then attempt to resolve the
appropriate kernel based on the kernelspec or a query parameter. Once
the kernel has been provisioned the handler will execute a code snippet
on the kernel which creates a `PanelExecutor`. The `PanelExecutor`
creates a `bokeh.server.session.ServerSession` and connects it to a
Jupyter Comm. The `PanelJupyterHandler` will then serve the result of
executor.render().

Once the frontend has rendered the HTML it will send a request to open
a WebSocket will be sent to the `PanelWSProxy`. This in turns forwards
any messages it receives via the kernel.shell_channel and the Comm to
the PanelExecutor. This way we proxy any WS messages being sent to and
from the server to the kernel.
"""

from __future__ import annotations

import asyncio
import calendar
import datetime as dt
import inspect
import json
import logging
import os
import pathlib
import textwrap
import time

from queue import Empty
from typing import Any, Awaitable
from urllib.parse import urljoin

import tornado

from bokeh.embed.bundle import extension_dirs
from bokeh.protocol import Protocol
from bokeh.protocol.exceptions import ProtocolError
from bokeh.protocol.receiver import Receiver
from bokeh.server.tornado import DEFAULT_KEEP_ALIVE_MS
from bokeh.server.views.multi_root_static_handler import MultiRootStaticHandler
from bokeh.server.views.static_handler import StaticHandler
from bokeh.server.views.ws import WSHandler
from bokeh.util.token import (
    generate_jwt_token, generate_session_id, get_session_id, get_token_payload,
)
from jupyter_server.base.handlers import JupyterHandler
from tornado.ioloop import PeriodicCallback
from tornado.web import StaticFileHandler

from ..config import config
from .resources import DIST_DIR, ERROR_TEMPLATE, _env
from .state import state

logger = logging.getLogger(__name__)

KERNEL_TIMEOUT = 60 # Timeout for kernel startup (including app startup)
CONNECTION_TIMEOUT = 30 # Timeout for WS connection to open

KERNEL_ERROR_TEMPLATE = _env.get_template('kernel_error.html')

[docs]async def ensure_async(obj: Awaitable | Any) -> Any: """Convert a non-awaitable object to a coroutine if needed, and await it if it was not already awaited. """ if inspect.isawaitable(obj): try: result = await obj except RuntimeError as e: if str(e) == 'cannot reuse already awaited coroutine': # obj is already the coroutine's result return obj raise return result # obj doesn't need to be awaited return obj
[docs]def url_path_join(*pieces): """Join components of url into a relative url Use to prevent double slash when joining subpath. This will leave the initial and final / in place """ initial = pieces[0].startswith('/') final = pieces[-1].endswith('/') stripped = [s.strip('/') for s in pieces] result = '/'.join(s for s in stripped if s) if initial: result = '/' + result if final: result = result + '/' if result == '//': result = '/' return result
def get_server_root_dir(settings): if 'server_root_dir' in settings: # notebook >= 5.0.0 has this in the settings root_dir = settings['server_root_dir'] else: # This copies the logic added in the notebook in # https://github.com/jupyter/notebook/pull/2234 contents_manager = settings['contents_manager'] root_dir = contents_manager.root_dir return os.path.expanduser(root_dir) EXECUTION_TEMPLATE = """ import os import pathlib import sys app = r'{{ path }}' os.chdir(str(pathlib.Path(app).parent)) sys.path = [os.getcwd()] + sys.path[1:] from panel.io.jupyter_executor import PanelExecutor executor = PanelExecutor(app, '{{ token }}', '{{ root_url }}') executor.render() """
[docs]def generate_executor(path: str, token: str, root_url: str) -> str: """ Generates the code to instantiate a PanelExecutor that is to be be run on the kernel to start a server session. Arguments --------- path: str The path of the Panel application to execute. token: str The Bokeh JWT token containing the session_id, request arguments, headers and cookies. root_url: str The root_url the server is running on. Returns ------- The code to be executed inside the kernel. """ execute_template = _env.from_string(EXECUTION_TEMPLATE) return textwrap.dedent( execute_template.render(path=path, token=token, root_url=root_url) )
[docs]class PanelJupyterHandler(JupyterHandler): """ The PanelJupyterHandler expects to be given a path to a notebook, .py file or Lumen .yaml file. Based on the kernelspec in the notebook or the kernel query parameter it will then provision a Jupyter kernel to run the Panel application in. Once the kernel is launched it will instantiate a PanelExecutor inside the kernel and serve the HTML returned by it. If successful it will store the kernel and comm_id on `panel.state`. """
[docs] def initialize(self, **kwargs): super().initialize(**kwargs) self.notebook_path = kwargs.pop('notebook_path', []) self.kernel_started = False
async def _get_info(self, msg_id, timeout=KERNEL_TIMEOUT): deadline = time.monotonic() + timeout result, comm_id, extension_dirs = None, None, None while result is None or comm_id is None or extension_dirs is None: if time.monotonic() > deadline: raise TimeoutError('Timed out while waiting for kernel to open Comm channel to Panel application.') try: msg = await ensure_async(self.kernel.iopub_channel.get_msg(timeout=None)) except Empty: if not await ensure_async(self.kernel.is_alive()): raise RuntimeError("Kernel died before establishing Comm connection to Panel application.") continue if msg['parent_header'].get('msg_id') != msg_id: continue msg_type = msg['header']['msg_type'] if msg_type == 'execute_result': data = msg['content']['data'] if 'text/error' in data: raise RuntimeError(data['text/error']) extension_dirs = data['application/bokeh-extensions'] result = data['text/html'] elif msg_type == 'comm_open' and msg['content']['target_name'] == self.session_id: comm_id = msg['content']['comm_id'] elif msg_type == 'stream' and msg['content']['name'] == 'stderr': logger.error(msg['content']['text']) elif msg_type == "error": logger.error(msg['content']['traceback']) return result, comm_id, extension_dirs @tornado.web.authenticated async def get(self, path=None): root_dir = get_server_root_dir(self.application.settings) rel_path = pathlib.Path(self.notebook_path or path) if rel_path.is_absolute(): notebook_path = str(rel_path) else: notebook_path = str((root_dir / rel_path).absolute()) if ( self.notebook_path and path ): # when we are in single notebook mode but have a path self.redirect_to_file(path) return cwd = os.path.dirname(notebook_path) root_url = url_path_join(self.base_url, 'panel-preview') # Compose reply self.set_header('Content-Type', 'text/html') self.set_header('Cache-Control', 'no-cache, no-store, must-revalidate') self.set_header('Pragma', 'no-cache') self.set_header('Expires', '0') # Provision a kernel with the desired kernelspec if self.request.arguments.get('kernel'): requested_kernel = self.request.arguments.pop('kernel')[0].decode('utf-8') elif notebook_path.endswith('.ipynb'): with open(notebook_path) as f: nb = json.load(f) requested_kernel = nb.get('metadata', {}).get('kernelspec', {}).get('name') else: requested_kernel = None if requested_kernel: available_kernels = list(self.kernel_manager.kernel_spec_manager.find_kernel_specs()) if requested_kernel not in available_kernels: logger.error('Could not start server session, no such kernel %r.', requested_kernel) html = KERNEL_ERROR_TEMPLATE.render( base_url=f'{root_url}/', kernels=available_kernels, error_type='Kernel error', error=f"No such kernel '{requested_kernel}'", title='Panel: Kernel not found' ) self.finish(html) return kernel_env = {**os.environ} kernel_id = await ensure_async( ( self.kernel_manager.start_kernel( kernel_name=requested_kernel, path=cwd, env=kernel_env, ) ) ) kernel_future = self.kernel_manager.get_kernel(kernel_id) km = await ensure_async(kernel_future) self.kernel = km.client() await ensure_async(self.kernel.start_channels()) await ensure_async(self.kernel.wait_for_ready(timeout=None)) # Run PanelExecutor inside the kernel self.session_id = generate_session_id() args = { k: [v.decode('utf-8') for v in vs] for k, vs in self.request.arguments.items() } payload = { 'arguments': args, 'headers': dict(self.request.headers.items()), 'cookies': dict(self.request.cookies.items()) } token = generate_jwt_token(self.session_id, extra_payload=payload) source = generate_executor(notebook_path, token, root_url) msg_id = self.kernel.execute(source) # Wait for comm to open and rendered HTML to be returned by the kernel try: html, comm_id, ext_dirs = await self._get_info(msg_id) except (TimeoutError, RuntimeError) as e: await self.kernel_manager.shutdown_kernel(kernel_id, now=True) html = ERROR_TEMPLATE.render( npm_cdn=config.npm_cdn, base_url=f'{root_url}/', error_type="Kernel error", error="Failed to start kernel", error_msg=str(e), title="Panel: Kernel Error" ) self.finish(html) else: # Sync extension_dirs from kernel to ensure dynamically loaded extensions are served extension_dirs.update(ext_dirs) state._kernels[self.session_id] = (self.kernel, comm_id, kernel_id, False) loop = tornado.ioloop.IOLoop.current() loop.call_later(CONNECTION_TIMEOUT, self._check_connected) self.finish(html) async def _check_connected(self): if self.session_id not in state._kernels: return _, _, kernel_id, connected = state._kernels[self.session_id] if not connected: await self.kernel_manager.shutdown_kernel(kernel_id, now=True)
[docs]class PanelWSProxy(WSHandler, JupyterHandler): """ The PanelWSProxy serves as a proxy between the frontend and the Jupyter kernel that is running the Panel application. It send and receives Bokeh protocol messages via a Jupyter Comm. """ def __init__(self, tornado_app, *args, **kw) -> None: # Note: tornado_app is stored as self.application kw['application_context'] = None super().__init__(tornado_app, *args, **kw)
[docs] def initialize(self, *args, **kwargs): self._ping_count = 0 self._ping_job = PeriodicCallback(self._keep_alive, DEFAULT_KEEP_ALIVE_MS)
def _keep_alive(self): self.ping(str(self._ping_count).encode("utf-8")) self._ping_count += 1
[docs] async def prepare(self): pass
[docs] def get_current_user(self): return "default_user"
[docs] def check_origin(self, origin: str) -> bool: return True
[docs] @tornado.web.authenticated async def open(self, path, *args, **kwargs) -> None: ''' Initialize a connection to a client. Returns: None ''' token = self._token if self.selected_subprotocol != 'bokeh': self.close() raise ProtocolError("Subprotocol header is not 'bokeh'") elif token is None: self.close() raise ProtocolError("No token received in subprotocol header") now = calendar.timegm(dt.datetime.utcnow().utctimetuple()) payload = get_token_payload(token) if 'session_expiry' not in payload: self.close() raise ProtocolError("Session expiry has not been provided") elif now >= payload['session_expiry']: self.close() raise ProtocolError("Token is expired.") try: protocol = Protocol() self.receiver = Receiver(protocol) except ProtocolError as e: logger.error("Could not create new server session, reason: %s", e) self.close() raise e self.session_id = get_session_id(token) if self.session_id not in state._kernels: self.close() kernel_info = state._kernels[self.session_id] self.kernel, self.comm_id, self.kernel_id, _ = kernel_info state._kernels[self.session_id] = kernel_info[:-1] + (True,) msg = protocol.create('ACK') await self.send_message(msg) self._ping_job.start() asyncio.create_task(self._check_for_message())
async def _check_for_message(self): while True: if self.kernel is None: break try: msg = await ensure_async(self.kernel.iopub_channel.get_msg(timeout=None)) except Empty: if not await ensure_async(self.kernel.is_alive()): raise RuntimeError("Kernel died before expected shutdown of Panel app.") continue msg_type = msg['header']['msg_type'] if msg_type == 'stream' and msg['content']['name'] == 'stderr': logger.error(msg['content']['text']) continue elif not (msg_type == 'comm_msg' and msg['content']['comm_id'] == self.comm_id): continue content, metadata = msg['content'], msg['metadata'] status = metadata.get('status') if status == 'protocol_error': return self._protocol_error(content['data']) elif status == 'internal_error': return self._internal_error(content['data']) binary = metadata.get('binary') if binary: fragment = msg['buffers'][0].tobytes() else: fragment = content['data'] if isinstance(fragment, dict): fragment = json.dumps(fragment) message = await self._receive(fragment) if message: await self.send_message(message)
[docs] async def on_message(self, fragment: str | bytes) -> None: content = dict(data=fragment, comm_id=self.comm_id, target_name=self.session_id) msg = self.kernel.session.msg("comm_msg", content) self.kernel.shell_channel.send(msg)
[docs] def on_close(self) -> None: """ Clean up when the connection is closed. """ logger.info('WebSocket connection closed: code=%s, reason=%r', self.close_code, self.close_reason) if self.session_id in state._kernels: del state._kernels[self.session_id] self._ping_job.stop() reply = self.kernel.shutdown(reply=True) future = self.kernel_manager.shutdown_kernel(self.kernel_id, now=True) asyncio.ensure_future(reply) asyncio.ensure_future(future) self.kernel = None
def _load_jupyter_server_extension(notebook_app): base_url = notebook_app.web_app.settings["base_url"] # Set up handlers notebook_app.web_app.add_handlers( host_pattern=r".*$", host_handlers=[ (urljoin(base_url, r"panel-preview/static/extensions/(.*)"), MultiRootStaticHandler, dict(root=extension_dirs)), (urljoin(base_url, r"panel-preview/static/(.*)"), StaticHandler), (urljoin(base_url, r"panel-preview/render/(.*)/ws"), PanelWSProxy), (urljoin(base_url, r"panel-preview/render/(.*)"), PanelJupyterHandler, {}), (urljoin(base_url, r"panel_dist/(.*)"), StaticFileHandler, dict(path=DIST_DIR)) ] ) # compat for older versions of Jupyter load_jupyter_server_extension = _load_jupyter_server_extension