from __future__ import annotations
import re
import sys
from typing import (
TYPE_CHECKING, Any, ClassVar, Mapping, Optional,
)
import numpy as np
import param
from bokeh.models import ColumnDataSource
from pyviz_comms import JupyterComm
from ..util import lazy_load
from .base import ModelPane
if TYPE_CHECKING:
from bokeh.document import Document
from bokeh.model import Model
from pyviz_comms import Comm
[docs]def ds_as_cds(dataset):
"""
Converts Vega dataset into Bokeh ColumnDataSource data
"""
if len(dataset) == 0:
return {}
# create a list of unique keys from all items as some items may not include optional fields
keys = sorted(set(k for d in dataset for k in d.keys()))
data = {k: [] for k in keys}
for item in dataset:
for k in keys:
data[k].append(item.get(k))
data = {k: np.asarray(v) for k, v in data.items()}
return data
_containers = ['hconcat', 'vconcat', 'layer']
SCHEMA_REGEX = re.compile('^v(\d+)\.\d+\.\d+.json')
def _isin(obj, attr):
if isinstance(obj, dict):
return attr in obj
else:
return hasattr(obj, attr)
def _get_type(spec, version):
if version >= 5:
if isinstance(spec, dict):
return spec.get('select', {}).get('type', 'interval')
elif isinstance(spec.select, dict):
return spec.select.get('type', 'interval')
else:
return getattr(spec.select, 'type', 'interval')
else:
if isinstance(spec, dict):
return spec.get('type', 'interval')
else:
return getattr(spec, 'type', 'interval')
def _get_dimensions(spec, props):
dimensions = {}
responsive_height = spec.get('height') == 'container' and props.get('height') is None
responsive_width = spec.get('width') == 'container' and props.get('width') is None
if responsive_height and responsive_width:
dimensions['sizing_mode'] = 'stretch_both'
elif responsive_width:
dimensions['sizing_mode'] = 'stretch_width'
elif responsive_height:
dimensions['sizing_mode'] = 'stretch_height'
return dimensions
def _get_schema_version(obj, default_version: int = 5) -> int:
if Vega.is_altair(obj):
schema = obj.to_dict().get('$schema', '')
else:
schema = obj.get('$schema', '')
version = schema.split('/')[-1]
match = SCHEMA_REGEX.fullmatch(version)
if match is None or not match.groups():
return default_version
return int(match.groups()[0])
def _get_selections(obj, version=None):
if obj is None:
return {}
elif version is None:
version = _get_schema_version(obj)
key = 'params' if version >= 5 else 'selection'
selections = {}
if _isin(obj, key):
params = obj[key]
if version >= 5 and isinstance(params, list):
params = {
p.name if hasattr(p, 'name') else p['name']: p for p in params
if getattr(p, 'param_type', None) == 'selection' or _isin(p, 'select')
}
try:
selections.update({
name: _get_type(spec, version) for name, spec in params.items()
})
except (AttributeError, TypeError):
pass
for c in _containers:
if _isin(obj, c):
for subobj in obj[c]:
selections.update(_get_selections(subobj, version=version))
return selections
def _to_json(obj):
if isinstance(obj, dict):
json = dict(obj)
if 'data' in json:
data = json['data']
if isinstance(data, dict):
json['data'] = dict(data)
elif isinstance(data, list):
json['data'] = [dict(d) for d in data]
return json
return obj.to_dict()
[docs]class Vega(ModelPane):
"""
The Vega pane renders Vega-lite based plots (including those from Altair)
inside a panel.
Note
- to use the `Vega` pane, the Panel `extension` has to be
loaded with 'vega' as an argument to ensure that vega.js is initialized.
- it supports selection events
- it optimizes the plot rendering by using binary serialization for any
array data found on the Vega/Altair object, providing huge speedups over
the standard JSON serialization employed by Vega natively.
Reference: https://panel.holoviz.org/reference/panes/Vega.html
:Example:
>>> pn.extension('vega')
>>> Vega(some_vegalite_dict_or_altair_object, height=240)
"""
debounce = param.ClassSelector(default=20, class_=(int, dict), doc="""
Declares the debounce time in milliseconds either for all
events or if a dictionary is provided for individual events.""")
selection = param.ClassSelector(class_=param.Parameterized, doc="""
The Selection object reflects any selections available on the
supplied vega plot into Python.""")
show_actions = param.Boolean(default=False, doc="""
Whether to show Vega actions.""")
theme = param.ObjectSelector(default=None, allow_None=True, objects=[
'excel', 'ggplot2', 'quartz', 'vox', 'fivethirtyeight', 'dark',
'latimes', 'urbaninstitute', 'googlecharts'])
priority: ClassVar[float | bool | None] = 0.8
_rename: ClassVar[Mapping[str, str | None]] = {
'selection': None, 'debounce': None, 'object': 'data'}
_updates: ClassVar[bool] = True
def __init__(self, object=None, **params):
super().__init__(object, **params)
self.param.watch(self._update_selections, ['object'])
self._update_selections()
@property
def _selections(self):
return _get_selections(self.object)
@property
def _throttle(self):
default = self.param.debounce.default
if isinstance(self.debounce, dict):
throttle = {
sel: self.debounce.get(sel, default)
for sel in self._selections
}
else:
throttle = {sel: self.debounce or default for sel in self._selections}
return throttle
def _update_selections(self, *args):
params = {
e: param.Dict(allow_refs=False) if stype == 'interval' else param.List(allow_refs=False)
for e, stype in self._selections.items()
}
if self.selection and (set(self.selection.param) - {'name'}) == set(params):
self.selection.param.update({p: None for p in params})
return
self.selection = type('Selection', (param.Parameterized,), params)()
@classmethod
def is_altair(cls, obj):
if 'altair' in sys.modules:
import altair as alt
return isinstance(obj, alt.api.TopLevelMixin)
return False
[docs] @classmethod
def applies(cls, obj: Any) -> float | bool | None:
if isinstance(obj, dict) and 'vega' in obj.get('$schema', '').lower():
return True
return cls.is_altair(obj)
def _get_sources(self, json, sources=None):
sources = {} if sources is None else dict(sources)
datasets = json.get('datasets', {})
for name in list(datasets):
if name in sources or isinstance(datasets[name], dict):
continue
data = datasets.pop(name)
if isinstance(data, list) and any(isinstance(d, dict) and 'geometry' in d for d in data):
# Handle geometry records types
datasets[name] = data
continue
columns = set(data[0]) if data else []
if self.is_altair(self.object):
import altair as alt
if (not isinstance(self.object.data, (alt.Data, alt.UrlData, type(alt.Undefined))) and
columns == set(self.object.data)):
data = ColumnDataSource.from_df(self.object.data)
else:
data = ds_as_cds(data)
sources[name] = ColumnDataSource(data=data)
else:
sources[name] = ColumnDataSource(data=ds_as_cds(data))
data = json.get('data', {})
if isinstance(data, dict):
data = data.pop('values', {})
if data:
sources['data'] = ColumnDataSource(data=ds_as_cds(data))
elif isinstance(data, list):
for d in data:
if 'values' in d:
sources[d['name']] = ColumnDataSource(data=ds_as_cds(d.pop('values')))
return sources
def _process_event(self, event):
name = event.data['type']
stype = self._selections.get(name)
value = event.data['value']
if stype != 'interval':
value = list(value)
self.selection.param.update(**{name: value})
def _process_param_change(self, params):
props = super()._process_param_change(params)
if 'data' in props and props['data'] is not None:
props['data'] = _to_json(props['data'])
return props
def _get_properties(self, doc, sources={}):
props = super()._get_properties(doc)
data = props['data']
if data is not None:
sources = self._get_sources(data, sources)
dimensions = _get_dimensions(data, props) if data else {}
props['data'] = data
props['data_sources'] = sources
props['events'] = list(self._selections)
props['throttle'] = self._throttle
props.update(dimensions)
return props
def _get_model(
self, doc: Document, root: Optional[Model] = None,
parent: Optional[Model] = None, comm: Optional[Comm] = None
) -> Model:
self._bokeh_model = lazy_load(
'panel.models.vega', 'VegaPlot', isinstance(comm, JupyterComm), root
)
model = super()._get_model(doc, root, parent, comm)
self._register_events('vega_event', model=model, doc=doc, comm=comm)
return model
def _update(self, ref: str, model: Model) -> None:
props = self._get_properties(model.document, sources=dict(model.data_sources))
model.update(**props)