major-tom-explorer / components.py
MarcSkovMadsen's picture
Add core-1 dataset
57681d6
import holoviews as hv
import numpy as np
import pandas as pd
import panel as pn
import param
from holoviews.operation.datashader import dynspread, rasterize
from utils import (
DATASET_COLUMNS,
DATASETS,
DATASHADER_LOGO,
DATASHADER_URL,
DEFAULT_DATASET,
DESCRIPTION,
ESA_EASTING,
ESA_NORTHING,
MAJOR_TOM_LOGO,
MAJOR_TOM_LYRICS,
MAJOR_TOM_PICTURE,
MAJOR_TOM_REF_URL,
PANEL_LOGO,
PANEL_URL,
get_closest_rows,
get_image,
get_meta_data,
)
class DatasetInput(pn.viewable.Viewer):
value = param.Selector(
default=DEFAULT_DATASET,
objects=DATASETS,
allow_None=False,
label="Dataset",
doc="""The name of the dataset""",
)
data = param.DataFrame(allow_None=False, doc="""The metadata dataset""")
columns = param.Dict(allow_None=False, doc="""The columns of the dataset""")
def __panel__(self):
return pn.widgets.RadioButtonGroup.from_param(
self.param.value, button_style="outline"
)
@pn.depends("value", watch=True, on_init=True)
def _update_data(self):
columns = DATASET_COLUMNS[self.value]
data = pn.cache(get_meta_data)(dataset=self.value)
self.param.update(columns=columns, data=data)
class MapInput(pn.viewable.Viewer):
data = param.DataFrame(allow_refs=True, allow_None=False)
data_in_view = param.DataFrame(allow_None=False)
data_selected = param.DataFrame(allow_None=False)
_plot = param.Parameter(allow_None=False)
_pointer_x = param.Parameter(allow_None=False)
_pointer_y = param.Parameter(allow_None=False)
_range_xy = param.Parameter(allow_None=False)
_tap = param.Parameter(allow_None=False)
updating = param.Boolean()
def __panel__(self):
return pn.Column(
pn.pane.HoloViews(
self._plot, height=550, width=800, loading=self.param.updating
),
self._description,
)
@param.depends("data", watch=True, on_init=True)
def _handle_data_dask_change(self):
with self.param.update(updating=True):
data = self.data[["centre_easting", "centre_northing"]].copy()
points = hv.Points(
data, kdims=["centre_easting", "centre_northing"], vdims=[]
)
rangexy = hv.streams.RangeXY(source=points)
tap = hv.streams.Tap(source=points, x=ESA_EASTING, y=ESA_NORTHING)
agg = rasterize(
points, link_inputs=True, x_sampling=0.0001, y_sampling=0.0001
)
dyn = dynspread(agg)
dyn.opts(cmap="kr_r", colorbar=True)
pointerx = hv.streams.PointerX(x=ESA_EASTING, source=points)
pointery = hv.streams.PointerY(y=ESA_NORTHING, source=points)
vline = hv.DynamicMap(lambda x: hv.VLine(x), streams=[pointerx])
hline = hv.DynamicMap(lambda y: hv.HLine(y), streams=[pointery])
tiles = hv.Tiles(
"https://tile.openstreetmap.org/{Z}/{X}/{Y}.png", name="OSM"
).opts(xlabel="Longitude", ylabel="Latitude")
self.param.update(
_plot=tiles * agg * dyn * hline * vline,
_pointer_x=pointerx,
_pointer_y=pointery,
_range_xy=rangexy,
_tap=tap,
)
update_viewed = pn.bind(
self._update_data_in_view,
rangexy.param.x_range,
rangexy.param.y_range,
watch=True,
)
update_viewed()
update_selected = pn.bind(
self._update_data_selected, tap.param.x, tap.param.y, watch=True
)
update_selected()
def _update_data_in_view(self, x_range, y_range):
if not x_range or not y_range:
self.data_in_view = self.data
return
data = self.data
data = data[
(data.centre_easting.between(*x_range))
& (data.centre_northing.between(*y_range))
]
self.data_in_view = data.reset_index(drop=True)
def _update_data_selected(self, tap_x, tap_y):
self.data_selected = get_closest_rows(self.data, tap_x, tap_y)
@pn.depends("data_in_view")
def _description(self):
return f"Rows: {len(self.data_in_view):,}"
class ImageInput(pn.viewable.Viewer):
data = param.DataFrame(
allow_refs=True, allow_None=False, doc="""The metadata selected"""
)
columns = param.Dict(
allow_refs=True, allow_None=False, doc="""The list of columns of the dataset"""
)
column_name = param.Selector(
label="Image Type",
allow_None=False,
doc="""The name of the image type to view""",
)
updating = param.Boolean()
meta_data = param.DataFrame()
image = param.Parameter()
plot = param.Parameter()
_timestamp = param.Selector(label="Timestamp", objects=[None], doc="""The timestamp of the sample to view""")
def __panel__(self):
return pn.Column(
pn.Row(
pn.widgets.RadioButtonGroup.from_param(
self.param._timestamp,
button_style="outline",
align="end",
disabled=self.param.updating,
),
pn.widgets.Select.from_param(
self.param.column_name, disabled=self.param.updating
),
),
pn.Tabs(
pn.pane.HoloViews(
self.param.plot,
height=800,
width=800,
name="Interactive Image",
),
pn.pane.Image(
self.param.image,
name="Static Image",
width=800,
),
pn.widgets.Tabulator(
self.param.meta_data,
name="Meta Data",
disabled=True,
),
pn.pane.Markdown(self.code, name="Code"),
dynamic=True,
loading=self.param.updating,
),
)
@pn.depends("data", watch=True, on_init=True)
def _update_timestamp(self):
if self.data.empty:
default_value = None
options = [None]
else:
options = sorted(self.data["timestamp"].unique())
default_value = options[0]
self.param._timestamp.objects = options
if not self._timestamp in options:
self._timestamp = default_value
@pn.depends("columns", watch=True, on_init=True)
def _update_column_names(self):
options = sorted(self.columns)
default_value = "Thumbnail"
self.param.column_name.objects = options
if not self.column_name in options:
self.column_name = default_value
@property
def column(self):
return self.columns[self.column_name]
@pn.depends("_timestamp", "column_name", watch=True, on_init=True)
def _update_plot(self):
if self.data.empty or not self._timestamp or not self.column_name:
self.meta_data = self.data.T
self.image = None
self.plot = hv.RGB(np.array([]))
else:
with self.param.update(updating=True):
row = self.data[self.data.timestamp == self._timestamp].iloc[0]
self.meta_data = pd.DataFrame(row)
self.image = image = pn.cache(get_image)(row, self.column)
image_array = np.array(image)
if image_array.ndim == 2:
self.plot = hv.Image(image_array).opts(
cmap="gray_r", xaxis=None, yaxis=None, colorbar=True
)
else:
self.plot = hv.RGB(image_array).opts(xaxis=None, yaxis=None)
@pn.depends("meta_data", "column_name")
def code(self):
if self.meta_data.empty:
return ""
parquet_url = self.meta_data.T["parquet_url"].iloc[0]
parquet_row = self.meta_data.T["parquet_row"].iloc[0]
return f"""\
```bash
pip install aiohttp fsspec holoviews numpy panel pyarrow requests
```
```python
from io import BytesIO
import holoviews as hv
import numpy as np
import panel as pn
import pyarrow.parquet as pq
from fsspec.parquet import open_parquet_file
from PIL import Image
pn.extension()
parquet_url = "{parquet_url}"
parquet_row = {parquet_row}
column = "{self.column}"
with open_parquet_file(parquet_url, columns=[column]) as f:
with pq.ParquetFile(f) as pf:
first_row_group = pf.read_row_group(parquet_row, columns=[column])
stream = BytesIO(first_row_group[column][0].as_py())
image = Image.open(stream)
image_array = np.array(image)
if image_array.ndim==2:
plot = hv.Image(image_array).opts(cmap="gray", colorbar=True)
else:
plot = hv.RGB(image_array)
plot.opts(xaxis=None, yaxis=None)
pn.panel(plot).servable()
```
```bash
panel serve app.py --autoreload
```
"""
class App(param.Parameterized):
sidebar = param.Parameter()
main = param.Parameter()
def __init__(self, **params):
super().__init__(**params)
self.sidebar = self._create_sidebar()
self.main = pn.FlexBox(
pn.Column(
pn.Row(
pn.indicators.LoadingSpinner(value=True, size=50),
"**Loading data...**",
),
MAJOR_TOM_LYRICS,
)
)
pn.state.onload(self._update_main)
def _create_sidebar(self):
return pn.Column(
pn.pane.Image(
MAJOR_TOM_LOGO,
link_url=MAJOR_TOM_REF_URL,
height=60,
sizing_mode="stretch_width",
),
pn.pane.Image(
MAJOR_TOM_PICTURE,
link_url=MAJOR_TOM_REF_URL,
sizing_mode="stretch_width",
),
DESCRIPTION,
pn.pane.Image(PANEL_LOGO, link_url=PANEL_URL, width=200, margin=(10, 20)),
pn.pane.Image(
DATASHADER_LOGO, link_url=DATASHADER_URL, width=200, margin=(10, 20)
),
)
def _create_main_content(self):
dataset = DatasetInput()
map_input = MapInput(data=dataset.param.data)
image_input = ImageInput(
data=map_input.param.data_selected, columns=dataset.param.columns
)
return pn.Column(dataset, map_input), image_input
def _update_main(self):
self.main[:] = list(self._create_main_content())