Source code for aas_timeseries.visualization

import os
import tempfile
from io import StringIO
from json import dump
from zipfile import ZipFile

import numpy as np

from matplotlib import pyplot as plt

from astropy.time import Time
from astropy.table import Table
from astropy import units as u

from astropy.visualization import quantity_support

from aas_timeseries.backports import time_support
from aas_timeseries.colors import auto_assign_colors
from aas_timeseries.views import BaseView, View
from aas_timeseries.matplotlib import (PhaseAsDegreesLocator,
                                       PhaseAsDegreesFormatter,
                                       PhaseAsRadiansLocator,
                                       PhaseAsRadiansFormatter)

__all__ = ['InteractiveTimeSeriesFigure']


[docs]class InteractiveTimeSeriesFigure(BaseView): """ An interactive time series figure. Parameters ---------- width : int, optional The preferred width of the figure, in pixels. height : int, optional The preferred height of the figure, in pixels. padding : float, optional The padding inside the axes, as a fraction of the size of the axes resize : bool, optional Whether to resize the figure to the available space. title : str, optinal If views are added to the figure, this title is used for the default view, otherwise 'Default' is used. """ def __init__(self, width=600, height=400, padding=36, resize=False, title=None, time_mode=None): super().__init__(time_mode=time_mode) self._width = width self._height = height self._resize = resize self._padding = padding self._yunit = 'auto' self._views = [] self._title = title @property def yunit(self): """ The unit to use for the y axis, or 'auto' to guess. If using data without quantities, this can be set to ``u.one``. """ return self._yunit @yunit.setter def yunit(self, value): if isinstance(value, u.UnitBase) or value == 'auto': self._yunit = value else: self._yunit = u.Unit(value) def _guess_yunit(self): """ Try and guess unit to use on the y axis based on the limits set and the data used. """ # First check if the limits were set explicitly and if so use those # units if they were Quantities since it seems likely this was # deliberate. if self.ylim is not None and isinstance(self.ylim[0], u.Quantity): return self.ylim[0].unit # Next, check through all the data required by the layers, first in the # figure, then in the views, and return the first unit or lack of unit # found. for layer in self._layers: for (data, colname) in layer._required_ydata: return data.unit(colname) for view in self._views: for layer in view['view']._layers: for (data, colname) in layer._required_ydata: return data.unit(colname) # Since we didn't find anything, let's assume that the y axis should # be dimensionless. In theory we could also go through non-data layers # and check the units used there. return u.one
[docs] def add_view(self, title, description=None, include=None, exclude=None, empty=False, time_mode=None): if empty: inherited_layers = {} elif include is not None: for layer in include: if layer not in self._layers: raise ValueError(f'Layer {layer} does not exist in base figure') inherited_layers = {layer: self._layers[layer] for layer in include} elif exclude is not None: for layer in exclude: if layer not in self._layers: raise ValueError(f'Layer {layer} does not exist in base figure') inherited_layers = {layer: self._layers[layer] for layer in self._layers if layer not in exclude} else: inherited_layers = self._layers.copy() view = View(figure=self, inherited_layers=inherited_layers, time_mode=time_mode) self._views.append({'title': title, 'description': description, 'view': view}) return view
[docs] def export_interactive_bundle(self, filename, embed_data=False, minimize_data=True, override_style=False): """ Create a bundle for the interactive figure containing an HTML file and JSON file along with any required CSV files. Parameters ---------- filename : str The filename for the zip file embed_data : bool, optional Whether to embed the data in the JSON file (`True`) or include it in separate CSV files (`False`). The default is `False`. minimize_data : bool, optional Whether to include only data required for the visualization (`True`) or also other unused columns/fields in the time series (`False`). The default is `True`. override_style : bool, optional By default, any unspecified colors will be automatically chosen. If this parameter is set to `True`, all colors will be reassigned, even if already set. """ start_dir = os.path.abspath('.') tmp_dir = tempfile.mkdtemp() os.chdir(tmp_dir) try: self.save_vega_json('figure.json', embed_data=embed_data) finally: os.chdir(start_dir) html_file = os.path.join(os.path.dirname(__file__), 'screenshot', 'template.html') with ZipFile(filename, 'w') as fzip: for filename in os.listdir(tmp_dir): fzip.write(os.path.join(tmp_dir, filename), os.path.basename(filename)) fzip.write(html_file, 'index.html')
def _check_colors(self, override_style=False): # Auto-assign colors if needed colors = auto_assign_colors(self._layers) for layer, color in zip(self._layers, colors): if override_style or layer.color is None: layer.color = color
[docs] def save_static(self, prefix, format='png', override_style=False): """ Export the figure to one or more static files using Matplotlib. If views are present then one plot is produced for each view. Parameters ---------- prefix : str The name of the plot (without extension). format : str Any valid format supported by Matplotlib. override_style : bool, optional By default, any unspecified colors will be automatically chosen. If this parameter is set to `True`, all colors will be reassigned, even if already set. """ # Start off by figuring out what units we are using on the y axis. # Note that we check the consistency of the units only here for # simplicity otherwise any guessing while users add/remove layers is # tricky. yunit = self._guess_yunit() if self.yunit == 'auto' else self.yunit # Auto-assign colors if needed self._check_colors(override_style=override_style) # We now loop over the main figure and all the views, and produce a # static plot for each of them. def pad_limits(limits, padding): vrange = (limits[1] - limits[0]) * padding return limits[0] - vrange, limits[1] + vrange for iview, view in enumerate([self] + self._views): if view is not self: view = view['view'] # Note that if we aren't dealing with non-absolute times, the # following settings don't matter since the time_support context # manager won't have any effect. if view.time_format == 'auto' or view.time_mode != 'absolute': time_format = 'iso' simplify = True else: time_format = view.time_format simplify = False with time_support(format=time_format, simplify=simplify, scale='utc'): with quantity_support(): fig = plt.figure(figsize=(self._width / 100, self._height / 100)) ax = fig.add_axes([0.15, 0.12, 0.8, 0.86]) for layer in view.layers: layer.to_mpl(ax, yunit=yunit) if view.time_mode == 'phase': if view.time_format == 'degrees': ax.xaxis.set_major_locator(PhaseAsDegreesLocator()) ax.xaxis.set_major_formatter(PhaseAsDegreesFormatter()) elif view.time_format == 'radians': ax.xaxis.set_major_locator(PhaseAsRadiansLocator()) ax.xaxis.set_major_formatter(PhaseAsRadiansFormatter()) x_domain, y_domain = view._get_domains(yunit, as_vega=False) ax.set_xlim(*x_domain) ax.set_ylim(*y_domain) # Apply padding - we just get the limits again because the x limits # above may have been Time objects, so we get the limits again from # Matplotlib. ax.set_xlim(*pad_limits(ax.get_xlim(), self._padding / self._width)) ax.set_ylim(*pad_limits(ax.get_ylim(), self._padding / self._height)) if view is self: filename = prefix + '.' + format else: filename = prefix + '_view' + str(iview) + '.' + format ax.set_xlabel(view.xlabel) ax.set_ylabel(view.ylabel) fig.savefig(filename)
[docs] def save_vega_json(self, filename, embed_data=False, minimize_data=True, override_style=False): """ Export the JSON file, and optionally CSV data files. Parameters ---------- filename : str The name of the JSON file embed_data : bool, optional Whether to embed the data in the JSON file (`True`) or include it in separate CSV files (`False`). The default is `False`. minimize_data : bool, optional Whether to include only data required for the visualization (`True`) or also other unused columns/fields in the time series (`False`). The default is `True`. override_style : bool, optional By default, any unspecified colors will be automatically chosen. If this parameter is set to `True`, all colors will be reassigned, even if already set. """ # Start off by figuring out what units we are using on the y axis. # Note that we check the consistency of the units only here for # simplicity otherwise any guessing while users add/remove layers is # tricky. yunit = self._guess_yunit() if self.yunit == 'auto' else self.yunit # Auto-assign colors if needed self._check_colors(override_style=override_style) # Start off with empty JSON json = {} # Schema json['$schema'] = 'https://vega.github.io/schema/vega/v4.json' # Layout json['title'] = self._title or 'Default' json['width'] = self._width json['height'] = self._height json['padding'] = 0 json['autosize'] = {'type': 'fit', 'resize': self._resize} json['_extend'] = {} # Data # We start off by checking which columns and data are going to be # required. We do this by iterating over the layers in the main # figure and the views and keeping track of the set of (data, column) # that are needed. required_xdata = [] required_ydata = [] required_tooltipdata = [] for layer in self._layers: required_xdata.extend(layer._required_xdata) required_ydata.extend(layer._required_ydata) required_tooltipdata.extend(layer._required_tooltipdata) for view in self._views: for layer in view['view']._layers: required_xdata.extend(layer._required_xdata) required_ydata.extend(layer._required_ydata) required_tooltipdata.extend(layer._required_tooltipdata) required_xdata = set(required_xdata) required_ydata = set(required_ydata) required_tooltipdata = set(required_tooltipdata) json['data'] = [] for data in self._data.values(): # Start off by constructing a new table with only the subset of # columns required, and the time as an ISO string. Note that we # need to explicitly specify that we want UTC times, then add the # Z suffix since this isn't something that astropy does. For # relative times we always use seconds, and for phases we use values # in the range [0:1]. table = Table() time_columns = [] for colname in data.time_series.colnames: if (not minimize_data or (data, colname) in required_xdata | required_ydata | required_tooltipdata): column = data.time_series[colname] if isinstance(column, Time): table[colname] = np.char.add(column.utc.isot, 'Z') time_columns.append(colname) elif (data, colname) in required_xdata: try: table[colname] = data.column_to_values(colname, u.s) except u.UnitsError: table[colname] = data.column_to_values(colname, u.one) elif (data, colname) in required_ydata: table[colname] = data.column_to_values(colname, yunit) else: table[colname] = column # Next up, we create the information for the 'parse' Vega key # which indicates the format for each column. parse = {} for colname in table.colnames: column = table[colname] if colname in time_columns: parse[colname] = 'date' elif column.dtype.kind in 'fi': parse[colname] = 'number' elif column.dtype.kind in 'b': parse[colname] = 'boolean' else: parse[colname] = 'string' vega = {'name': data.uuid, 'format': {'type': 'csv', 'parse': parse}} # We now either embed the CSV inside the JSON or create CSV files. # For now we use UUIDs for the data file names in the latter case # but in future we could find a way to preserve information about # the original filenames the data came from. if embed_data: s = StringIO() table.write(s, format='ascii.basic', delimiter=',') s.seek(0) # NOTE: when embedding the data inside the JSON file, we should # just use simple Unix line endings inside the serialized table. csv_string = s.read().replace('\r\n', '\n') vega['values'] = csv_string else: data_filename = 'data_' + data.uuid + '.csv' data_path = os.path.join(os.path.dirname(filename), data_filename) table.write(data_path, format='ascii.basic', delimiter=',') vega['url'] = data_filename json['data'].append(vega) # At this point, we loop over all the views (including the main view # given by self) and output these to the JSON. for view in [self] + self._views: if view is self: view_json = json else: view_json = {'name': self.uuid, 'title': view['title'], 'description': view['description']} if '_views' not in json: json['_views'] = [] json['_views'].append(view_json) view = view['view'] if view._time_mode == 'absolute': x_type = 'time' x_input = 'iso' elif view._time_mode == 'relative': x_type = 'number' x_input = 'seconds' elif view._time_mode == 'phase': x_type = 'number' x_input = 'unity' view_json['_extend'] = {'scales': [{'name': 'xscale', 'input': x_input, 'output': view.time_format}]} view_json['axes'] = [{'orient': 'bottom', 'scale': 'xscale', 'title': view.xlabel}, {'orient': 'left', 'scale': 'yscale', 'title': view.ylabel}] view_json['scales'] = [{'name': 'xscale', 'type': x_type, 'range': 'width', 'zero': False, 'padding': self._padding}, {'name': 'yscale', 'type': 'log' if view.ylog else 'linear', 'range': 'height', 'zero': False, 'padding': self._padding}] # Limits, if specified x_domain, y_domain = view._get_domains(yunit) if x_domain is not None: view_json['scales'][0]['domain'] = x_domain if y_domain is not None: view_json['scales'][1]['domain'] = y_domain if view is self: view_json['marks'] = [] for layer, settings in self._layers.items(): view_json['marks'].extend(layer.to_vega(yunit=yunit)) else: view_json['markers'] = [] for layer, settings in view._inherited_layers.items(): for uuid in layer.uuids: view_json['markers'].append({'name': uuid, 'visible': settings['visible']}) for layer, settings in view._layers.items(): if 'marks' not in json['_extend']: json['_extend']['marks'] = [] json['_extend']['marks'].extend(layer.to_vega(yunit=yunit)) for uuid in layer.uuids: view_json['markers'].append({'name': uuid, 'visible': settings['visible']}) with open(filename, 'w') as f: dump(json, f, indent=' ', sort_keys=True)
[docs] def preview_interactive(self): """ Show an interactive version of the figure (only works in Jupyter notebook or lab). """ # FIXME: should be able to do without a file from jupyter_aas_timeseries import TimeSeriesWidget tmpfile = tempfile.mktemp() self.save_vega_json(tmpfile, embed_data=True, minimize_data=True) widget = TimeSeriesWidget(tmpfile) return widget
[docs] def remove(self, layer): """ Remove a layer from the figure. """ if layer in self._layers: self._layers.pop(layer) for view in self._views: if layer in view['view'].layers: view['view'].remove(layer) else: raise ValueError(f"Layer '{layer.label}' is not in figure")