mirror of
https://github.com/PiBrewing/craftbeerpi4.git
synced 2025-01-10 14:43:19 +01:00
687 lines
20 KiB
Python
687 lines
20 KiB
Python
from abc import ABC, abstractmethod
|
|
import sys
|
|
from typing import (
|
|
IO,
|
|
TYPE_CHECKING,
|
|
Iterable,
|
|
Iterator,
|
|
List,
|
|
Mapping,
|
|
Optional,
|
|
Sequence,
|
|
Union,
|
|
)
|
|
|
|
from pandas._config import get_option
|
|
|
|
from pandas._typing import Dtype, FrameOrSeriesUnion
|
|
|
|
from pandas.core.indexes.api import Index
|
|
|
|
from pandas.io.formats import format as fmt
|
|
from pandas.io.formats.printing import pprint_thing
|
|
|
|
if TYPE_CHECKING:
|
|
from pandas.core.frame import DataFrame
|
|
|
|
|
|
def _put_str(s: Union[str, Dtype], space: int) -> str:
|
|
"""
|
|
Make string of specified length, padding to the right if necessary.
|
|
|
|
Parameters
|
|
----------
|
|
s : Union[str, Dtype]
|
|
String to be formatted.
|
|
space : int
|
|
Length to force string to be of.
|
|
|
|
Returns
|
|
-------
|
|
str
|
|
String coerced to given length.
|
|
|
|
Examples
|
|
--------
|
|
>>> pd.io.formats.info._put_str("panda", 6)
|
|
'panda '
|
|
>>> pd.io.formats.info._put_str("panda", 4)
|
|
'pand'
|
|
"""
|
|
return str(s)[:space].ljust(space)
|
|
|
|
|
|
def _sizeof_fmt(num: Union[int, float], size_qualifier: str) -> str:
|
|
"""
|
|
Return size in human readable format.
|
|
|
|
Parameters
|
|
----------
|
|
num : int
|
|
Size in bytes.
|
|
size_qualifier : str
|
|
Either empty, or '+' (if lower bound).
|
|
|
|
Returns
|
|
-------
|
|
str
|
|
Size in human readable format.
|
|
|
|
Examples
|
|
--------
|
|
>>> _sizeof_fmt(23028, '')
|
|
'22.5 KB'
|
|
|
|
>>> _sizeof_fmt(23028, '+')
|
|
'22.5+ KB'
|
|
"""
|
|
for x in ["bytes", "KB", "MB", "GB", "TB"]:
|
|
if num < 1024.0:
|
|
return f"{num:3.1f}{size_qualifier} {x}"
|
|
num /= 1024.0
|
|
return f"{num:3.1f}{size_qualifier} PB"
|
|
|
|
|
|
def _initialize_memory_usage(
|
|
memory_usage: Optional[Union[bool, str]] = None,
|
|
) -> Union[bool, str]:
|
|
"""Get memory usage based on inputs and display options."""
|
|
if memory_usage is None:
|
|
memory_usage = get_option("display.memory_usage")
|
|
return memory_usage
|
|
|
|
|
|
class BaseInfo(ABC):
|
|
"""
|
|
Base class for DataFrameInfo and SeriesInfo.
|
|
|
|
Parameters
|
|
----------
|
|
data : DataFrame or Series
|
|
Either dataframe or series.
|
|
memory_usage : bool or str, optional
|
|
If "deep", introspect the data deeply by interrogating object dtypes
|
|
for system-level memory consumption, and include it in the returned
|
|
values.
|
|
"""
|
|
|
|
data: FrameOrSeriesUnion
|
|
memory_usage: Union[bool, str]
|
|
|
|
@property
|
|
@abstractmethod
|
|
def dtypes(self) -> Iterable[Dtype]:
|
|
"""
|
|
Dtypes.
|
|
|
|
Returns
|
|
-------
|
|
dtypes : sequence
|
|
Dtype of each of the DataFrame's columns (or one series column).
|
|
"""
|
|
|
|
@property
|
|
@abstractmethod
|
|
def dtype_counts(self) -> Mapping[str, int]:
|
|
"""Mapping dtype - number of counts."""
|
|
|
|
@property
|
|
@abstractmethod
|
|
def non_null_counts(self) -> Sequence[int]:
|
|
"""Sequence of non-null counts for all columns or column (if series)."""
|
|
|
|
@property
|
|
@abstractmethod
|
|
def memory_usage_bytes(self) -> int:
|
|
"""
|
|
Memory usage in bytes.
|
|
|
|
Returns
|
|
-------
|
|
memory_usage_bytes : int
|
|
Object's total memory usage in bytes.
|
|
"""
|
|
|
|
@property
|
|
def memory_usage_string(self) -> str:
|
|
"""Memory usage in a form of human readable string."""
|
|
return f"{_sizeof_fmt(self.memory_usage_bytes, self.size_qualifier)}\n"
|
|
|
|
@property
|
|
def size_qualifier(self) -> str:
|
|
size_qualifier = ""
|
|
if self.memory_usage:
|
|
if self.memory_usage != "deep":
|
|
# size_qualifier is just a best effort; not guaranteed to catch
|
|
# all cases (e.g., it misses categorical data even with object
|
|
# categories)
|
|
if (
|
|
"object" in self.dtype_counts
|
|
or self.data.index._is_memory_usage_qualified()
|
|
):
|
|
size_qualifier = "+"
|
|
return size_qualifier
|
|
|
|
@abstractmethod
|
|
def render(
|
|
self,
|
|
*,
|
|
buf: Optional[IO[str]],
|
|
max_cols: Optional[int],
|
|
verbose: Optional[bool],
|
|
show_counts: Optional[bool],
|
|
) -> None:
|
|
"""
|
|
Print a concise summary of a %(klass)s.
|
|
|
|
This method prints information about a %(klass)s including
|
|
the index dtype%(type_sub)s, non-null values and memory usage.
|
|
%(version_added_sub)s\
|
|
|
|
Parameters
|
|
----------
|
|
data : %(klass)s
|
|
%(klass)s to print information about.
|
|
verbose : bool, optional
|
|
Whether to print the full summary. By default, the setting in
|
|
``pandas.options.display.max_info_columns`` is followed.
|
|
buf : writable buffer, defaults to sys.stdout
|
|
Where to send the output. By default, the output is printed to
|
|
sys.stdout. Pass a writable buffer if you need to further process
|
|
the output.
|
|
%(max_cols_sub)s
|
|
memory_usage : bool, str, optional
|
|
Specifies whether total memory usage of the %(klass)s
|
|
elements (including the index) should be displayed. By default,
|
|
this follows the ``pandas.options.display.memory_usage`` setting.
|
|
|
|
True always show memory usage. False never shows memory usage.
|
|
A value of 'deep' is equivalent to "True with deep introspection".
|
|
Memory usage is shown in human-readable units (base-2
|
|
representation). Without deep introspection a memory estimation is
|
|
made based in column dtype and number of rows assuming values
|
|
consume the same memory amount for corresponding dtypes. With deep
|
|
memory introspection, a real memory usage calculation is performed
|
|
at the cost of computational resources.
|
|
%(show_counts_sub)s
|
|
|
|
Returns
|
|
-------
|
|
None
|
|
This method prints a summary of a %(klass)s and returns None.
|
|
|
|
See Also
|
|
--------
|
|
%(see_also_sub)s
|
|
|
|
Examples
|
|
--------
|
|
%(examples_sub)s
|
|
"""
|
|
|
|
|
|
class DataFrameInfo(BaseInfo):
|
|
"""
|
|
Class storing dataframe-specific info.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
data: "DataFrame",
|
|
memory_usage: Optional[Union[bool, str]] = None,
|
|
):
|
|
self.data: "DataFrame" = data
|
|
self.memory_usage = _initialize_memory_usage(memory_usage)
|
|
|
|
@property
|
|
def dtype_counts(self) -> Mapping[str, int]:
|
|
return _get_dataframe_dtype_counts(self.data)
|
|
|
|
@property
|
|
def dtypes(self) -> Iterable[Dtype]:
|
|
"""
|
|
Dtypes.
|
|
|
|
Returns
|
|
-------
|
|
dtypes
|
|
Dtype of each of the DataFrame's columns.
|
|
"""
|
|
return self.data.dtypes
|
|
|
|
@property
|
|
def ids(self) -> Index:
|
|
"""
|
|
Column names.
|
|
|
|
Returns
|
|
-------
|
|
ids : Index
|
|
DataFrame's column names.
|
|
"""
|
|
return self.data.columns
|
|
|
|
@property
|
|
def col_count(self) -> int:
|
|
"""Number of columns to be summarized."""
|
|
return len(self.ids)
|
|
|
|
@property
|
|
def non_null_counts(self) -> Sequence[int]:
|
|
"""Sequence of non-null counts for all columns or column (if series)."""
|
|
return self.data.count()
|
|
|
|
@property
|
|
def memory_usage_bytes(self) -> int:
|
|
if self.memory_usage == "deep":
|
|
deep = True
|
|
else:
|
|
deep = False
|
|
return self.data.memory_usage(index=True, deep=deep).sum()
|
|
|
|
def render(
|
|
self,
|
|
*,
|
|
buf: Optional[IO[str]],
|
|
max_cols: Optional[int],
|
|
verbose: Optional[bool],
|
|
show_counts: Optional[bool],
|
|
) -> None:
|
|
printer = DataFrameInfoPrinter(
|
|
info=self,
|
|
max_cols=max_cols,
|
|
verbose=verbose,
|
|
show_counts=show_counts,
|
|
)
|
|
printer.to_buffer(buf)
|
|
|
|
|
|
class InfoPrinterAbstract:
|
|
"""
|
|
Class for printing dataframe or series info.
|
|
"""
|
|
|
|
def to_buffer(self, buf: Optional[IO[str]] = None) -> None:
|
|
"""Save dataframe info into buffer."""
|
|
table_builder = self._create_table_builder()
|
|
lines = table_builder.get_lines()
|
|
if buf is None: # pragma: no cover
|
|
buf = sys.stdout
|
|
fmt.buffer_put_lines(buf, lines)
|
|
|
|
@abstractmethod
|
|
def _create_table_builder(self) -> "TableBuilderAbstract":
|
|
"""Create instance of table builder."""
|
|
|
|
|
|
class DataFrameInfoPrinter(InfoPrinterAbstract):
|
|
"""
|
|
Class for printing dataframe info.
|
|
|
|
Parameters
|
|
----------
|
|
info : DataFrameInfo
|
|
Instance of DataFrameInfo.
|
|
max_cols : int, optional
|
|
When to switch from the verbose to the truncated output.
|
|
verbose : bool, optional
|
|
Whether to print the full summary.
|
|
show_counts : bool, optional
|
|
Whether to show the non-null counts.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
info: DataFrameInfo,
|
|
max_cols: Optional[int] = None,
|
|
verbose: Optional[bool] = None,
|
|
show_counts: Optional[bool] = None,
|
|
):
|
|
self.info = info
|
|
self.data = info.data
|
|
self.verbose = verbose
|
|
self.max_cols = self._initialize_max_cols(max_cols)
|
|
self.show_counts = self._initialize_show_counts(show_counts)
|
|
|
|
@property
|
|
def max_rows(self) -> int:
|
|
"""Maximum info rows to be displayed."""
|
|
return get_option("display.max_info_rows", len(self.data) + 1)
|
|
|
|
@property
|
|
def exceeds_info_cols(self) -> bool:
|
|
"""Check if number of columns to be summarized does not exceed maximum."""
|
|
return bool(self.col_count > self.max_cols)
|
|
|
|
@property
|
|
def exceeds_info_rows(self) -> bool:
|
|
"""Check if number of rows to be summarized does not exceed maximum."""
|
|
return bool(len(self.data) > self.max_rows)
|
|
|
|
@property
|
|
def col_count(self) -> int:
|
|
"""Number of columns to be summarized."""
|
|
return self.info.col_count
|
|
|
|
def _initialize_max_cols(self, max_cols: Optional[int]) -> int:
|
|
if max_cols is None:
|
|
return get_option("display.max_info_columns", self.col_count + 1)
|
|
return max_cols
|
|
|
|
def _initialize_show_counts(self, show_counts: Optional[bool]) -> bool:
|
|
if show_counts is None:
|
|
return bool(not self.exceeds_info_cols and not self.exceeds_info_rows)
|
|
else:
|
|
return show_counts
|
|
|
|
def _create_table_builder(self) -> "DataFrameTableBuilder":
|
|
"""
|
|
Create instance of table builder based on verbosity and display settings.
|
|
"""
|
|
if self.verbose:
|
|
return DataFrameTableBuilderVerbose(
|
|
info=self.info,
|
|
with_counts=self.show_counts,
|
|
)
|
|
elif self.verbose is False: # specifically set to False, not necessarily None
|
|
return DataFrameTableBuilderNonVerbose(info=self.info)
|
|
else:
|
|
if self.exceeds_info_cols:
|
|
return DataFrameTableBuilderNonVerbose(info=self.info)
|
|
else:
|
|
return DataFrameTableBuilderVerbose(
|
|
info=self.info,
|
|
with_counts=self.show_counts,
|
|
)
|
|
|
|
|
|
class TableBuilderAbstract(ABC):
|
|
"""
|
|
Abstract builder for info table.
|
|
"""
|
|
|
|
_lines: List[str]
|
|
info: BaseInfo
|
|
|
|
@abstractmethod
|
|
def get_lines(self) -> List[str]:
|
|
"""Product in a form of list of lines (strings)."""
|
|
|
|
@property
|
|
def data(self) -> FrameOrSeriesUnion:
|
|
return self.info.data
|
|
|
|
@property
|
|
def dtypes(self) -> Iterable[Dtype]:
|
|
"""Dtypes of each of the DataFrame's columns."""
|
|
return self.info.dtypes
|
|
|
|
@property
|
|
def dtype_counts(self) -> Mapping[str, int]:
|
|
"""Mapping dtype - number of counts."""
|
|
return self.info.dtype_counts
|
|
|
|
@property
|
|
def display_memory_usage(self) -> bool:
|
|
"""Whether to display memory usage."""
|
|
return bool(self.info.memory_usage)
|
|
|
|
@property
|
|
def memory_usage_string(self) -> str:
|
|
"""Memory usage string with proper size qualifier."""
|
|
return self.info.memory_usage_string
|
|
|
|
@property
|
|
def non_null_counts(self) -> Sequence[int]:
|
|
return self.info.non_null_counts
|
|
|
|
def add_object_type_line(self) -> None:
|
|
"""Add line with string representation of dataframe to the table."""
|
|
self._lines.append(str(type(self.data)))
|
|
|
|
def add_index_range_line(self) -> None:
|
|
"""Add line with range of indices to the table."""
|
|
self._lines.append(self.data.index._summary())
|
|
|
|
def add_dtypes_line(self) -> None:
|
|
"""Add summary line with dtypes present in dataframe."""
|
|
collected_dtypes = [
|
|
f"{key}({val:d})" for key, val in sorted(self.dtype_counts.items())
|
|
]
|
|
self._lines.append(f"dtypes: {', '.join(collected_dtypes)}")
|
|
|
|
|
|
class DataFrameTableBuilder(TableBuilderAbstract):
|
|
"""
|
|
Abstract builder for dataframe info table.
|
|
|
|
Parameters
|
|
----------
|
|
info : DataFrameInfo.
|
|
Instance of DataFrameInfo.
|
|
"""
|
|
|
|
def __init__(self, *, info: DataFrameInfo):
|
|
self.info: DataFrameInfo = info
|
|
|
|
def get_lines(self) -> List[str]:
|
|
self._lines = []
|
|
if self.col_count == 0:
|
|
self._fill_empty_info()
|
|
else:
|
|
self._fill_non_empty_info()
|
|
return self._lines
|
|
|
|
def _fill_empty_info(self) -> None:
|
|
"""Add lines to the info table, pertaining to empty dataframe."""
|
|
self.add_object_type_line()
|
|
self.add_index_range_line()
|
|
self._lines.append(f"Empty {type(self.data).__name__}")
|
|
|
|
@abstractmethod
|
|
def _fill_non_empty_info(self) -> None:
|
|
"""Add lines to the info table, pertaining to non-empty dataframe."""
|
|
|
|
@property
|
|
def data(self) -> "DataFrame":
|
|
"""DataFrame."""
|
|
return self.info.data
|
|
|
|
@property
|
|
def ids(self) -> Index:
|
|
"""Dataframe columns."""
|
|
return self.info.ids
|
|
|
|
@property
|
|
def col_count(self) -> int:
|
|
"""Number of dataframe columns to be summarized."""
|
|
return self.info.col_count
|
|
|
|
def add_memory_usage_line(self) -> None:
|
|
"""Add line containing memory usage."""
|
|
self._lines.append(f"memory usage: {self.memory_usage_string}")
|
|
|
|
|
|
class DataFrameTableBuilderNonVerbose(DataFrameTableBuilder):
|
|
"""
|
|
Dataframe info table builder for non-verbose output.
|
|
"""
|
|
|
|
def _fill_non_empty_info(self) -> None:
|
|
"""Add lines to the info table, pertaining to non-empty dataframe."""
|
|
self.add_object_type_line()
|
|
self.add_index_range_line()
|
|
self.add_columns_summary_line()
|
|
self.add_dtypes_line()
|
|
if self.display_memory_usage:
|
|
self.add_memory_usage_line()
|
|
|
|
def add_columns_summary_line(self) -> None:
|
|
self._lines.append(self.ids._summary(name="Columns"))
|
|
|
|
|
|
class TableBuilderVerboseMixin(TableBuilderAbstract):
|
|
"""
|
|
Mixin for verbose info output.
|
|
"""
|
|
|
|
SPACING: str = " " * 2
|
|
strrows: Sequence[Sequence[str]]
|
|
gross_column_widths: Sequence[int]
|
|
with_counts: bool
|
|
|
|
@property
|
|
@abstractmethod
|
|
def headers(self) -> Sequence[str]:
|
|
"""Headers names of the columns in verbose table."""
|
|
|
|
@property
|
|
def header_column_widths(self) -> Sequence[int]:
|
|
"""Widths of header columns (only titles)."""
|
|
return [len(col) for col in self.headers]
|
|
|
|
def _get_gross_column_widths(self) -> Sequence[int]:
|
|
"""Get widths of columns containing both headers and actual content."""
|
|
body_column_widths = self._get_body_column_widths()
|
|
return [
|
|
max(*widths)
|
|
for widths in zip(self.header_column_widths, body_column_widths)
|
|
]
|
|
|
|
def _get_body_column_widths(self) -> Sequence[int]:
|
|
"""Get widths of table content columns."""
|
|
strcols: Sequence[Sequence[str]] = list(zip(*self.strrows))
|
|
return [max(len(x) for x in col) for col in strcols]
|
|
|
|
def _gen_rows(self) -> Iterator[Sequence[str]]:
|
|
"""
|
|
Generator function yielding rows content.
|
|
|
|
Each element represents a row comprising a sequence of strings.
|
|
"""
|
|
if self.with_counts:
|
|
return self._gen_rows_with_counts()
|
|
else:
|
|
return self._gen_rows_without_counts()
|
|
|
|
@abstractmethod
|
|
def _gen_rows_with_counts(self) -> Iterator[Sequence[str]]:
|
|
"""Iterator with string representation of body data with counts."""
|
|
|
|
@abstractmethod
|
|
def _gen_rows_without_counts(self) -> Iterator[Sequence[str]]:
|
|
"""Iterator with string representation of body data without counts."""
|
|
|
|
def add_header_line(self) -> None:
|
|
header_line = self.SPACING.join(
|
|
[
|
|
_put_str(header, col_width)
|
|
for header, col_width in zip(self.headers, self.gross_column_widths)
|
|
]
|
|
)
|
|
self._lines.append(header_line)
|
|
|
|
def add_separator_line(self) -> None:
|
|
separator_line = self.SPACING.join(
|
|
[
|
|
_put_str("-" * header_colwidth, gross_colwidth)
|
|
for header_colwidth, gross_colwidth in zip(
|
|
self.header_column_widths, self.gross_column_widths
|
|
)
|
|
]
|
|
)
|
|
self._lines.append(separator_line)
|
|
|
|
def add_body_lines(self) -> None:
|
|
for row in self.strrows:
|
|
body_line = self.SPACING.join(
|
|
[
|
|
_put_str(col, gross_colwidth)
|
|
for col, gross_colwidth in zip(row, self.gross_column_widths)
|
|
]
|
|
)
|
|
self._lines.append(body_line)
|
|
|
|
def _gen_non_null_counts(self) -> Iterator[str]:
|
|
"""Iterator with string representation of non-null counts."""
|
|
for count in self.non_null_counts:
|
|
yield f"{count} non-null"
|
|
|
|
def _gen_dtypes(self) -> Iterator[str]:
|
|
"""Iterator with string representation of column dtypes."""
|
|
for dtype in self.dtypes:
|
|
yield pprint_thing(dtype)
|
|
|
|
|
|
class DataFrameTableBuilderVerbose(DataFrameTableBuilder, TableBuilderVerboseMixin):
|
|
"""
|
|
Dataframe info table builder for verbose output.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
*,
|
|
info: DataFrameInfo,
|
|
with_counts: bool,
|
|
):
|
|
self.info = info
|
|
self.with_counts = with_counts
|
|
self.strrows: Sequence[Sequence[str]] = list(self._gen_rows())
|
|
self.gross_column_widths: Sequence[int] = self._get_gross_column_widths()
|
|
|
|
def _fill_non_empty_info(self) -> None:
|
|
"""Add lines to the info table, pertaining to non-empty dataframe."""
|
|
self.add_object_type_line()
|
|
self.add_index_range_line()
|
|
self.add_columns_summary_line()
|
|
self.add_header_line()
|
|
self.add_separator_line()
|
|
self.add_body_lines()
|
|
self.add_dtypes_line()
|
|
if self.display_memory_usage:
|
|
self.add_memory_usage_line()
|
|
|
|
@property
|
|
def headers(self) -> Sequence[str]:
|
|
"""Headers names of the columns in verbose table."""
|
|
if self.with_counts:
|
|
return [" # ", "Column", "Non-Null Count", "Dtype"]
|
|
return [" # ", "Column", "Dtype"]
|
|
|
|
def add_columns_summary_line(self) -> None:
|
|
self._lines.append(f"Data columns (total {self.col_count} columns):")
|
|
|
|
def _gen_rows_without_counts(self) -> Iterator[Sequence[str]]:
|
|
"""Iterator with string representation of body data without counts."""
|
|
yield from zip(
|
|
self._gen_line_numbers(),
|
|
self._gen_columns(),
|
|
self._gen_dtypes(),
|
|
)
|
|
|
|
def _gen_rows_with_counts(self) -> Iterator[Sequence[str]]:
|
|
"""Iterator with string representation of body data with counts."""
|
|
yield from zip(
|
|
self._gen_line_numbers(),
|
|
self._gen_columns(),
|
|
self._gen_non_null_counts(),
|
|
self._gen_dtypes(),
|
|
)
|
|
|
|
def _gen_line_numbers(self) -> Iterator[str]:
|
|
"""Iterator with string representation of column numbers."""
|
|
for i, _ in enumerate(self.ids):
|
|
yield f" {i}"
|
|
|
|
def _gen_columns(self) -> Iterator[str]:
|
|
"""Iterator with string representation of column names."""
|
|
for col in self.ids:
|
|
yield pprint_thing(col)
|
|
|
|
|
|
def _get_dataframe_dtype_counts(df: "DataFrame") -> Mapping[str, int]:
|
|
"""
|
|
Create mapping between datatypes and their number of occurences.
|
|
"""
|
|
# groupby dtype.name to collect e.g. Categorical columns
|
|
return df.dtypes.value_counts().groupby(lambda x: x.name).sum()
|