Source code for meerkat.block.pandas_block

from __future__ import annotations

import os
from dataclasses import dataclass
from typing import Dict, Hashable, Sequence, Tuple, Union

import pandas as pd
import torch

from meerkat.block.ref import BlockRef
from meerkat.columns.abstract import AbstractColumn
from meerkat.columns.numpy_column import NumpyArrayColumn
from meerkat.columns.tensor_column import TensorColumn

from .abstract import AbstractBlock, BlockIndex, BlockView


[docs]class PandasBlock(AbstractBlock):
[docs] @dataclass(eq=True, frozen=True) class Signature: nrows: int klass: type
def __init__(self, data: pd.DataFrame, *args, **kwargs): super(PandasBlock, self).__init__(*args, **kwargs) self.data = data @property def signature(self) -> Hashable: return self.Signature( klass=PandasBlock, # we don't nrows=len(self.data), ) def _get_data(self, index: BlockIndex) -> pd.Series: return self.data[index]
[docs] @classmethod def from_column_data(cls, data: pd.Series) -> Tuple[PandasBlock, BlockView]: """[summary] Args: data (np.ndarray): [description] names (Sequence[str]): [description] Raises: ValueError: [description] Returns: Tuple[PandasBlock, Mapping[str, BlockIndex]]: [description] """ data = pd.DataFrame({"col": data}) block = cls(data) return BlockView(block_index="col", block=block)
@classmethod def _consolidate( cls, block_refs: Sequence[BlockRef], consolidated_inputs: Dict[int, "AbstractColumn"] = None, ) -> BlockRef: df = pd.DataFrame( # need to ignore index when concatenating { name: ref.block.data[col._block_index].reset_index(drop=True) for ref in block_refs for name, col in ref.items() } ) block = cls(df) # pull out the block columns from all the block_refs columns = {} for ref in block_refs: columns.update(ref) new_columns = { name: col._clone(data=block[name]) for name, col in columns.items() } return BlockRef(block=block, columns=new_columns) @staticmethod def _convert_index(index): if torch.is_tensor(index): # need to convert to numpy for boolean indexing return index.numpy() if isinstance(index, NumpyArrayColumn): return index.data if isinstance(index, TensorColumn): # need to convert to numpy for boolean indexing return index.data.numpy() if isinstance(index, pd.Series): # need to convert to numpy for boolean indexing return index.values from meerkat.columns.pandas_column import PandasSeriesColumn if isinstance(index, PandasSeriesColumn): return index.data.values return index def _get( self, index, block_ref: BlockRef, materialize: bool = True ) -> Union[BlockRef, dict]: index = self._convert_index(index) # TODO: check if they're trying to index more than just the row dimension data = self.data.iloc[index] if isinstance(index, int): # if indexing a single row, we do not return a block manager, just a dict return { name: data[col._block_index] for name, col in block_ref.columns.items() } # All Pandas Columns should have contiguous indices so that we can perform # comparisons etc. data = data.reset_index(drop=True) block = self.__class__(data) columns = { name: col._clone(data=block[col._block_index]) for name, col in block_ref.columns.items() } # note that the new block may share memory with the old block return BlockRef(block=block, columns=columns) def _write_data(self, path: str): self.data.reset_index(drop=True).to_feather(os.path.join(path, "data.feather")) @staticmethod def _read_data( path: str, mmap: bool = False, read_inputs: Dict[str, AbstractColumn] = None ): return pd.read_feather(os.path.join(path, "data.feather"))