from __future__ import annotations
import datetime as dt
import sys
from enum import Enum
from functools import partial
from typing import (
TYPE_CHECKING, Callable, ClassVar, List, Mapping, Optional, Type,
)
import numpy as np
import param
from bokeh.models import ColumnDataSource, ImportedStyleSheet
from pyviz_comms import JupyterComm
from ..io.resources import CDN_DIST
from ..io.state import state
from ..reactive import ReactiveData
from ..util import datetime_types, lazy_load
from ..viewable import Viewable
from .base import ModelPane
if TYPE_CHECKING:
from bokeh.document import Document
from bokeh.model import Model
from pyviz_comms import Comm
from ..model.perspective import PerspectiveClickEvent
DEFAULT_THEME = "material"
THEMES = [
'material', 'material-dark', 'monokai', 'solarized', 'solarized-dark', 'vaporwave'
]
[docs]class Plugin(Enum):
"""The plugins (grids/charts) available in Perspective. Pass these into
the `plugin` arg in `PerspectiveWidget` or `PerspectiveViewer`.
"""
HYPERGRID = "hypergrid" # hypergrid
GRID = "datagrid" # hypergrid
YBAR_D3 = "d3_y_bar" # d3fc
XBAR_D3 = "d3_x_bar" # d3fc
XYLINE_D3 = "d3_xy_line" # d3fc
YLINE_D3 = "d3_y_line" # d3fc
YAREA_D3 = "d3_y_area" # d3fc
YSCATTER_D3 = "d3_y_scatter" # d3fc
XYSCATTER_D3 = "d3_xy_scatter" # d3fc
TREEMAP_D3 = "d3_treemap" # d3fc
SUNBURST_D3 = "d3_sunburst" # d3fc
HEATMAP_D3 = "d3_heatmap" # d3fc
CANDLESTICK = "d3_candlestick" # d3fc
CANDLESTICK_D3 = "d3_candlestick" # d3fc
OHLC = "d3_ohlc" # d3fc
OHLC_D3 = "d3_ohlc" # d3fc
[docs] @staticmethod
def options():
"""
Returns the list of options of the PerspectiveViewer, like Hypergrid, Grid etc.
Returns
-------
options: list
A list of available options
"""
return list(c.value for c in Plugin)
[docs]def deconstruct_pandas(data, kwargs=None):
"""
Given a dataframe, flatten it by resetting the index and memoizing
the pivots that were applied.
This code was copied from the Perspective repository and is
reproduced under Apache 2.0 license. See the original at:
https://github.com/finos/perspective/blob/master/python/perspective/perspective/core/data/pd.py
Arguments
---------
data: (pandas.dataframe)
A Pandas DataFrame to parse
Returns
-------
data: pandas.DataFrame
A flattened version of the DataFrame
kwargs: dict
A dictionary containing optional members `columns`,
`group_by`, and `split_by`.
"""
import pandas as pd
kwargs = kwargs or {}
kwargs = {"columns": [], "group_by": [], "split_by": []}
if isinstance(data.index, pd.PeriodIndex):
data.index = data.index.to_timestamp()
if isinstance(data, pd.DataFrame):
if hasattr(pd, "CategoricalDtype"):
for k, v in data.dtypes.items():
if isinstance(v, pd.CategoricalDtype):
data[k] = data[k].astype(str)
if (
isinstance(data, pd.DataFrame)
and isinstance(data.columns, pd.MultiIndex)
and isinstance(data.index, pd.MultiIndex)
):
# Row and col pivots
kwargs["group_by"].extend([str(c) for c in data.index.names])
# Two strategies
if None in data.columns.names:
# In this case, we need to extract the column names from the row
# e.g. pt = pd.pivot_table(df, values = ['Discount','Sales'], index=['Country','Region'], columns=["State","Quantity"])
# Table will be
# Discount Sales
# State Alabama Alaska ... Alabama Alaska ...
# Quantity 150 350 ... 300 500
# Country Region
# US Region 0 ...
# US Region 1
#
# We need to transform this to:
# group_by = ['Country', 'Region']
# split_by = ['State', 'Quantity']
# columns = ['Discount', 'Sales']
existent = kwargs["group_by"] + data.columns.names
for c in data.columns.names:
if c is not None:
kwargs["split_by"].append(c)
data = data.stack()
data = pd.DataFrame(data).reset_index()
for new_column in data.columns:
if new_column not in existent:
kwargs["columns"].append(new_column)
else:
# In this case, we have no need as the values is just a single entry
# e.g. pt = pd.pivot_table(df, values = 'Discount', index=['Country','Region'], columns = ['Category', 'Segment'])
for _ in kwargs["group_by"]:
# unstack row pivots
data = data.unstack()
data = pd.DataFrame(data)
# this rather weird loop is to map existing None columns into
# levels, e.g. in the `else` block above, to reconstruct
# the "Discount" name. IDK if this is stored or if the name is
# lots, so we'll just call it 'index', 'index-1', ...
i = 0
new_names = list(data.index.names)
for j, val in enumerate(data.index.names):
if val is None:
new_names[j] = "index" if i == 0 else "index-{}".format(i)
i += 1
# kwargs['group_by'].append(str(new_names[j]))
else:
if str(val) not in kwargs["group_by"]:
kwargs["split_by"].append(str(val))
# Finally, remap any values columns to have column name 'value'
data.index.names = new_names
data = data.reset_index() # copy
data.columns = [
str(c)
if c
in ["index"]
+ kwargs["group_by"]
+ kwargs["split_by"]
+ kwargs["columns"]
else "value"
for c in data.columns
]
kwargs["columns"].extend(
[
"value"
for c in data.columns
if c
not in ["index"]
+ kwargs["group_by"]
+ kwargs["split_by"]
+ kwargs["columns"]
]
)
elif isinstance(data, pd.DataFrame) and isinstance(data.columns, pd.MultiIndex):
# Col pivots
if data.index.name:
kwargs["group_by"].append(str(data.index.name))
push_row_pivot = False
else:
push_row_pivot = True
data = pd.DataFrame(data.unstack())
i = 0
new_names = list(data.index.names)
for j, val in enumerate(data.index.names):
if val is None:
new_names[j] = "index" if i == 0 else "index-{}".format(i)
i += 1
if push_row_pivot:
kwargs["group_by"].append(str(new_names[j]))
else:
if str(val) not in kwargs["group_by"]:
kwargs["split_by"].append(str(val))
data.index.names = new_names
data.columns = [
str(c)
if c in ["index"] + kwargs["group_by"] + kwargs["split_by"]
else "value"
for c in data.columns
]
kwargs["columns"].extend(
[
"value"
for c in data.columns
if c not in ["index"] + kwargs["group_by"] + kwargs["split_by"]
]
)
elif isinstance(data, pd.DataFrame) and isinstance(data.index, pd.MultiIndex):
# Row pivots
kwargs["group_by"].extend(list(data.index.names))
data = data.reset_index() # copy
if isinstance(data, pd.DataFrame):
# flat df
if "index" not in [str(c).lower() for c in data.columns]:
data = data.reset_index(col_fill="index")
if not kwargs["columns"]:
# might already be set in row+col pivot df
kwargs["columns"].extend([str(c) for c in data.columns])
data.columns = kwargs["columns"]
if isinstance(data, pd.Series):
# Series
flattened = data.reset_index() # copy
if isinstance(data, pd.Series):
# preserve name from series
flattened.name = data.name
# make sure all columns are strings
flattened.columns = [str(c) for c in flattened.columns]
data = flattened
return data, kwargs
[docs]class Perspective(ModelPane, ReactiveData):
"""
The `Perspective` pane provides an interactive visualization component for
large, real-time datasets built on the Perspective project.
Reference: https://panel.holoviz.org/reference/panes/Perspective.html
:Example:
>>> Perspective(df, plugin='hypergrid', theme='material-dark')
"""
aggregates = param.Dict(default=None, nested_refs=True, doc="""
How to aggregate. For example {"x": "distinct count"}""")
columns = param.List(default=None, nested_refs=True, doc="""
A list of source columns to show as columns. For example ["x", "y"]""")
editable = param.Boolean(default=True, allow_None=True, doc="""
Whether items are editable.""")
expressions = param.List(default=None, nested_refs=True, doc="""
A list of expressions computing new columns from existing columns.
For example [""x"+"index""]""")
split_by = param.List(default=None, nested_refs=True, doc="""
A list of source columns to pivot by. For example ["x", "y"]""")
filters = param.List(default=None, nested_refs=True, doc="""
How to filter. For example [["x", "<", 3],["y", "contains", "abc"]]""")
min_width = param.Integer(default=420, bounds=(0, None), doc="""
Minimal width of the component (in pixels) if width is adjustable.""")
object = param.Parameter(doc="""
The plot data declared as a dictionary of arrays or a DataFrame.""")
group_by = param.List(default=None, doc="""
A list of source columns to group by. For example ["x", "y"]""")
selectable = param.Boolean(default=True, allow_None=True, doc="""
Whether items are selectable.""")
sort = param.List(default=None, doc="""
How to sort. For example[["x","desc"]]""")
plugin = param.ObjectSelector(default=Plugin.GRID.value, objects=Plugin.options(), doc="""
The name of a plugin to display the data. For example hypergrid or d3_xy_scatter.""")
plugin_config = param.Dict(default={}, nested_refs=True, doc="""
Configuration for the PerspectiveViewerPlugin.""")
toggle_config = param.Boolean(default=True, doc="""
Whether to show the config menu.""")
theme = param.ObjectSelector(default='material', objects=THEMES, doc="""
The style of the PerspectiveViewer. For example material-dark""")
priority: ClassVar[float | bool | None] = None
_bokeh_model: ClassVar[Type[Model] | None] = None
_data_params: ClassVar[List[str]] = ['object']
_rename: ClassVar[Mapping[str, str | None]] = {
'selection': None
}
_updates: ClassVar[bool] = True
_stylesheets: ClassVar[List[str]] = [
f'{CDN_DIST}css/perspective-datatable.css'
]
[docs] @classmethod
def applies(cls, object):
if isinstance(object, dict) and all(isinstance(v, (list, np.ndarray)) for v in object.values()):
return 0 if object else None
elif 'pandas' in sys.modules:
import pandas as pd
if isinstance(object, pd.DataFrame):
return 0
return False
def __init__(self, object=None, **params):
click_handler = params.pop('on_click', None)
self._on_click_callbacks = []
super().__init__(object, **params)
if click_handler:
self.on_click(click_handler)
def _get_data(self):
if self.object is None:
return {}, {}
if isinstance(self.object, dict):
ncols = len(self.object)
df = data = self.object
else:
df, kwargs = deconstruct_pandas(self.object)
ncols = len(df.columns)
data = {col: df[col].values for col in df.columns}
if kwargs:
self.param.update(**{
k: v for k, v in kwargs.items()
if getattr(self, k) is None
})
cols = set(self._as_digit(c) for c in df)
if len(cols) != ncols:
raise ValueError("Integer columns must be unique when "
"converted to strings.")
return df, {str(k): v for k, v in data.items()}
def _filter_properties(self, properties):
ignored = list(Viewable.param)
return [p for p in properties if p not in ignored]
def _get_properties(self, doc, source=None):
props = super()._get_properties(doc)
del props['object']
if props.get('toggle_config'):
props['height'] = self.height or 300
else:
props['height'] = self.height or 150
if source is None:
source = ColumnDataSource(data=self._data)
else:
source.data = self._data
props['source'] = source
props['schema'] = schema = {}
for col, array in source.data.items():
if not isinstance(array, np.ndarray):
array = np.asarray(array)
kind = array.dtype.kind
if kind == 'M':
schema[col] = 'datetime'
elif kind in 'ui':
schema[col] = 'integer'
elif kind == 'b':
schema[col] = 'boolean'
elif kind == 'f':
schema[col] = 'float'
elif kind in 'sU':
schema[col] = 'string'
else:
if len(array):
value = array[0]
if isinstance(value, dt.date):
schema[col] = 'date'
elif isinstance(value, datetime_types):
schema[col] = 'datetime'
elif isinstance(value, str):
schema[col] = 'string'
elif isinstance(value, (float, np.floating)):
schema[col] = 'float'
elif isinstance(value, (int, np.integer)):
schema[col] = 'integer'
else:
schema[col] = 'string'
else:
schema[col] = 'string'
return props
def _get_theme(self, theme, resources=None):
from ..models.perspective import THEME_URL
theme_url = f'{THEME_URL}{theme}.css'
if self._bokeh_model is not None:
self._bokeh_model.__css_raw__ = self._bokeh_model.__css_raw__[:3] + [theme_url]
return theme_url
def _process_param_change(self, params):
if 'stylesheets' in params or 'theme' in params:
self._get_theme(params.get('theme', self.theme))
css = getattr(self._bokeh_model, '__css__', [])
params['stylesheets'] = [
ImportedStyleSheet(url=ss) for ss in css
] + params.get('stylesheets', self.stylesheets)
props = super()._process_param_change(params)
for p in ('columns', 'group_by', 'split_by'):
if props.get(p):
props[p] = [None if col is None else str(col) for col in props[p]]
if props.get('sort'):
props['sort'] = [[str(col), *args] for col, *args in props['sort']]
if props.get('filters'):
props['filters'] = [[str(col), *args] for col, *args in props['filters']]
if props.get('aggregates'):
props['aggregates'] = {str(col): agg for col, agg in props['aggregates'].items()}
return props
def _as_digit(self, col):
if self._processed is None or col in self._processed or col is None:
return col
elif col.isdigit() and int(col) in self._processed:
return int(col)
return col
def _process_property_change(self, msg):
msg = super()._process_property_change(msg)
for prop in ('columns', 'group_by', 'split_by'):
if prop not in msg:
continue
msg[prop] = [self._as_digit(col) for col in msg[prop]]
if msg.get('sort'):
msg['sort'] = [[self._as_digit(col), *args] for col, *args in msg['sort']]
if msg.get('filters'):
msg['filters'] = [[self._as_digit(col), *args] for col, *args in msg['filters']]
if msg.get('aggregates'):
msg['aggregates'] = {self._as_digit(col): agg for col, agg in msg['aggregates'].items()}
return msg
def _get_model(
self, doc: Document, root: Optional[Model] = None,
parent: Optional[Model] = None, comm: Optional[Comm] = None
) -> Model:
self._bokeh_model = lazy_load(
'panel.models.perspective', 'Perspective', isinstance(comm, JupyterComm), root
)
model = super()._get_model(doc, root, parent, comm)
self._register_events('perspective-click', model=model, doc=doc, comm=comm)
return model
def _update(self, ref: str, model: Model) -> None:
model.update(**self._get_properties(model.document, source=model.source))
def _process_event(self, event):
if event.event_name == 'perspective-click':
for cb in self._on_click_callbacks:
state.execute(partial(cb, event), schedule=False)
[docs] def on_click(self, callback: Callable[[PerspectiveClickEvent], None]):
"""
Register a callback to be executed when any row is clicked.
The callback is given a PerspectiveClickEvent declaring the
config, column names, and row values of the row that was
clicked.
Arguments
---------
callback: (callable)
The callback to run on edit events.
"""
self._on_click_callbacks.append(callback)