Source code for meerkat.block.abstract

from __future__ import annotations

import os
from dataclasses import dataclass
from typing import TYPE_CHECKING, Hashable, Mapping, Sequence, Tuple, Union

import yaml

from meerkat.errors import ConsolidationError

# an index into a block that specifies where a column's data lives in the block
BlockIndex = Union[int, slice, str]


if TYPE_CHECKING:
    from meerkat.block.ref import BlockRef


[docs]@dataclass class BlockView: block_index: BlockIndex block: AbstractBlock @property def data(self): return self.block._get_data(self.block_index)
[docs]class AbstractBlock: def __init__(self, *args, **kwargs): super(AbstractBlock, self).__init__(*args, **kwargs) def __getitem__(self, index: BlockIndex) -> BlockView: return BlockView(block_index=index, block=self) def _get_data(self, index: BlockIndex) -> object: """Must return view of the underlying data.""" raise NotImplementedError() @property def signature(self) -> Hashable: raise NotImplementedError
[docs] @classmethod def from_column_data(cls, data: object) -> Tuple[AbstractBlock, BlockView]: raise NotImplementedError()
[docs] @classmethod def from_block_data(cls, data: object) -> Tuple[AbstractBlock, BlockView]: raise NotImplementedError()
[docs] @classmethod def consolidate( cls, block_refs: Sequence[BlockRef] ) -> Tuple[AbstractBlock, Mapping[str, BlockIndex]]: if len(block_refs) == 0: raise ConsolidationError("Must pass at least 1 BlockRef to consolidate.") if len({ref.block.signature for ref in block_refs}) != 1: raise ConsolidationError( "Can only consolidate blocks with matching signatures." ) return cls._consolidate(block_refs=block_refs)
@classmethod def _consolidate(cls, block_refs: Sequence[BlockRef]) -> BlockRef: raise NotImplementedError def _get(self, index, block_ref: BlockRef) -> Union[BlockRef, dict]: raise NotImplementedError @property def is_mmap(self): return False
[docs] def write(self, path: str, *args, **kwargs): os.makedirs(path, exist_ok=True) self._write_data(path, *args, **kwargs) metadata = {"klass": type(self)} metadata_path = os.path.join(path, "meta.yaml") yaml.dump(metadata, open(metadata_path, "w"))
[docs] @classmethod def read(cls, path: str, *args, **kwargs): assert os.path.exists(path), f"`path` {path} does not exist." metadata_path = os.path.join(path, "meta.yaml") metadata = dict(yaml.load(open(metadata_path), Loader=yaml.FullLoader)) block_class = metadata["klass"] data = block_class._read_data(path, *args, **kwargs) return block_class(data)