Source code for panel.models.comm_manager

from bokeh.core.properties import (
    Int, Nullable, Required, String,
)
from bokeh.models import Model
from bokeh.protocol import Protocol


[docs]class CommManager(Model): plot_id = Required(Nullable(String)) comm_id = Required(Nullable(String)) client_comm_id = Required(Nullable(String)) debounce = Int(50) timeout = Int(5000) def __init__(self, **properties): super().__init__(**properties) self._protocol = Protocol() def assemble(self, msg): header = msg['header'] buffers = msg.pop('_buffers') or {} header['num_buffers'] = len(buffers) cls = self._protocol._messages[header['msgtype']] msg_obj = cls(header, msg['metadata'], msg['content']) for (bid, buff) in buffers.items(): msg_obj.assemble_buffer({'id': bid}, buff.tobytes()) return msg_obj