Source code for meerkat.block.pandas_block
from __future__ import annotations
import os
from dataclasses import dataclass
from typing import Hashable, Sequence, Tuple, Union
import pandas as pd
import torch
from meerkat.block.ref import BlockRef
from meerkat.columns.numpy_column import NumpyArrayColumn
from meerkat.columns.tensor_column import TensorColumn
from .abstract import AbstractBlock, BlockIndex, BlockView
[docs]class PandasBlock(AbstractBlock):
[docs] @dataclass(eq=True, frozen=True)
class Signature:
nrows: int
klass: type
def __init__(self, data: pd.DataFrame, *args, **kwargs):
super(PandasBlock, self).__init__(*args, **kwargs)
self.data = data
@property
def signature(self) -> Hashable:
return self.Signature(
klass=PandasBlock,
# we don't
nrows=len(self.data),
)
def _get_data(self, index: BlockIndex) -> pd.Series:
return self.data[index]
[docs] @classmethod
def from_column_data(cls, data: pd.Series) -> Tuple[PandasBlock, BlockView]:
"""[summary]
Args:
data (np.ndarray): [description]
names (Sequence[str]): [description]
Raises:
ValueError: [description]
Returns:
Tuple[PandasBlock, Mapping[str, BlockIndex]]: [description]
"""
data = pd.DataFrame({"col": data})
block = cls(data)
return BlockView(block_index="col", block=block)
@classmethod
def _consolidate(
cls,
block_refs: Sequence[BlockRef],
) -> BlockRef:
df = pd.DataFrame(
# need to ignore index when concatenating
{
name: ref.block.data[col._block_index].reset_index(drop=True)
for ref in block_refs
for name, col in ref.items()
}
)
block = cls(df)
# pull out the block columns from all the block_refs
columns = {}
for ref in block_refs:
columns.update(ref)
new_columns = {
name: col._clone(data=block[name]) for name, col in columns.items()
}
return BlockRef(block=block, columns=new_columns)
@staticmethod
def _convert_index(index):
if torch.is_tensor(index):
# need to convert to numpy for boolean indexing
return index.numpy()
if isinstance(index, NumpyArrayColumn):
return index.data
if isinstance(index, TensorColumn):
# need to convert to numpy for boolean indexing
return index.data.numpy()
if isinstance(index, pd.Series):
# need to convert to numpy for boolean indexing
return index.values
from meerkat.columns.pandas_column import PandasSeriesColumn
if isinstance(index, PandasSeriesColumn):
return index.data.values
return index
def _get(
self, index, block_ref: BlockRef, materialize: bool = True
) -> Union[BlockRef, dict]:
index = self._convert_index(index)
# TODO: check if they're trying to index more than just the row dimension
data = self.data.iloc[index]
if isinstance(index, int):
# if indexing a single row, we do not return a block manager, just a dict
return {
name: data[col._block_index] for name, col in block_ref.columns.items()
}
block = self.__class__(data)
columns = {
name: col._clone(data=block[col._block_index])
for name, col in block_ref.columns.items()
}
# note that the new block may share memory with the old block
return BlockRef(block=block, columns=columns)
def _write_data(self, path: str):
self.data.reset_index(drop=True).to_feather(os.path.join(path, "data.feather"))
@staticmethod
def _read_data(path: str, mmap: bool = False):
return pd.read_feather(os.path.join(path, "data.feather"))