Source code for meerkat.block.manager

from __future__ import annotations

import os
import shutil
from collections import defaultdict
from collections.abc import MutableMapping
from typing import Dict, Mapping, Sequence, Union

import numpy as np
import pandas as pd
import yaml

import meerkat.config
from meerkat.block.abstract import AbstractBlock, BlockIndex
from meerkat.columns.abstract import AbstractColumn
from meerkat.tools.utils import MeerkatLoader

from .ref import BlockRef


[docs]class BlockManager(MutableMapping): """Manages all blocks in a DataPanel.""" def __init__(self) -> None: self._columns: Dict[str, AbstractColumn] = {} # ordered as of 3.7 self._column_to_block_id: Dict[str, int] = {} self._block_refs: Dict[int, BlockRef] = {}
[docs] def update(self, block_ref: BlockRef): """data (): a single blockable object, potentially contains multiple columns.""" for name in block_ref: if name in self: self.remove(name) # although we can't have the same column living in multiple managers # we don't view here because it can lead to multiple calls to clone self._columns.update(block_ref) block_id = id(block_ref.block) # check if there already is a block_ref in the manager for this block if block_id in self._block_refs: self._block_refs[block_id].update(block_ref) else: self._block_refs[block_id] = block_ref self._column_to_block_id.update({name: block_id for name in block_ref.keys()})
[docs] def apply(self, method_name: str = "_get", *args, **kwargs) -> BlockManager: """[summary] Args: fn (str): a function that is applied to a block and column_spec and returns a new block and column_spec. Returns: [type]: [description] """ results = None for block_ref in self._block_refs.values(): result = block_ref.apply(method_name=method_name, *args, **kwargs) if results is None: results = BlockManager() if isinstance(result, BlockRef) else {} results.update(result) # apply method to columns not stored in block for name, col in self._columns.items(): if results is not None and name in results: continue result = getattr(col, method_name)(*args, **kwargs) if results is None: results = BlockManager() if isinstance(result, AbstractColumn) else {} results[name] = result if isinstance(results, BlockManager): results.reorder(self.keys()) return results
[docs] def consolidate(self): column_order = list( self._columns.keys() ) # need to maintain order after consolidate block_ref_groups = defaultdict(list) for block_ref in self._block_refs.values(): block_ref_groups[block_ref.block.signature].append(block_ref) for block_refs in block_ref_groups.values(): if len(block_refs) == 1: # if there is only one block ref in the group, do not consolidate continue # consolidate group block_class = block_refs[0].block.__class__ block_ref = block_class.consolidate(block_refs) self.update(block_ref) self.reorder(column_order)
[docs] def remove(self, name): if name not in self._columns: raise ValueError(f"Remove failed: no column '{name}' in BlockManager.") self._columns.pop(name) if name in self._column_to_block_id: # column is blockable block_ref = self._block_refs[self._column_to_block_id[name]] del block_ref[name] if len(block_ref) == 0: self._block_refs.pop(self._column_to_block_id[name]) self._column_to_block_id.pop(name)
[docs] def reorder(self, order: Sequence[str]): if set(order) != set(self._columns): raise ValueError("Must include all columns when reordering a BlockManager.") self._columns = {name: self._columns[name] for name in order}
def __getitem__( self, index: Union[str, Sequence[str]] ) -> Union[AbstractColumn, BlockManager]: if isinstance(index, str): return self._columns[index] elif isinstance(index, Sequence): mgr = BlockManager() block_id_to_names = defaultdict(list) for name in index: if name not in self._column_to_block_id: if name in self: # non-blockable column mgr.add_column(col=self._columns[name], name=name) else: raise ValueError( f"`BlockManager` does not contain column '{name}'." ) else: # group blockable columns by block block_id_to_names[self._column_to_block_id[name]].append(name) # block refs for blockable columns for block_id, names in block_id_to_names.items(): block_ref = self._block_refs[block_id] mgr.update(block_ref[names]) mgr.reorder(order=index) return mgr else: raise ValueError( f"Unsupported index of type `{type(index)}` passed to `BlockManager`." ) def __setitem__(self, index: str, data: Union[str, Sequence[str]]): if isinstance(data, AbstractColumn): self.add_column(data, name=index) else: raise ValueError( f"Cannot set item with object of type `{type(data)}` on `BlockManager`." ) def __delitem__(self, key): self.remove(key) def __len__(self): return len(self._columns) @property def nrows(self): return 0 if len(self) == 0 else len(next(iter(self._columns.values()))) @property def ncols(self): return len(self) def __contains__(self, value): return value in self._columns def __iter__(self): return iter(self._columns)
[docs] def get_block_ref(self, name: str): return self._block_refs[self._column_to_block_id[name]]
[docs] def add_column(self, col: AbstractColumn, name: str): """Convert data to a meerkat column using the appropriate Column type.""" if len(self) > 0 and len(col) != self.nrows: raise ValueError( f"Cannot add column '{name}' with length {len(col)} to `BlockManager` " f" with length {self.nrows} columns." ) if not col.is_blockable(): col = col.view() self._columns[name] = col else: col = col.view() self.update(BlockRef(columns={name: col}, block=col._block))
[docs] @classmethod def from_dict(cls, data: Mapping[str, object]): mgr = cls() for name, data in data.items(): col = AbstractColumn.from_data(data) mgr.add_column(col=col, name=name) return mgr
[docs] def write(self, path: str): meta = { "dtype": BlockManager, "columns": {}, "_column_order": list(self.keys()), } # prepare directories columns_dir = os.path.join(path, "columns") blocks_dir = os.path.join(path, "blocks") meta_path = os.path.join(path, "meta.yaml") if os.path.isdir(path): if ( os.path.exists(meta_path) and os.path.exists(columns_dir) and os.path.exists(blocks_dir) ): # if overwriting, ensure that old columns are removed shutil.rmtree(columns_dir) shutil.rmtree(blocks_dir) else: # if path already points to a dir that wasn't previously holding a # block manager, do not overwrite it. We'd like to protect against # situation in which user accidentally puts in an important directory raise IsADirectoryError( f"Cannot write `BlockManager`. {path} is a directory." ) os.makedirs(path, exist_ok=True) os.makedirs(blocks_dir) os.makedirs(columns_dir) # consolidate before writing self.consolidate() for block_id, block_ref in self._block_refs.items(): block: AbstractBlock = block_ref.block block_dir = os.path.join(blocks_dir, str(block_id)) block.write(block_dir) for name, column in block_ref.items(): column_dir = os.path.join(columns_dir, name) os.makedirs(column_dir, exist_ok=True) # don't write the data, reference the block meta["columns"][name] = { **column._get_meta(), "block": { "block_dir": os.path.relpath(block_dir, path), "block_index": _serialize_block_index(column._block_index), "mmap": block.is_mmap, }, } column._write_state(column_dir) # write columns not in a block for name, column in self._columns.items(): if name in meta["columns"]: continue meta["columns"][name] = column._get_meta() column.write(os.path.join(columns_dir, name)) # Save the metadata as a yaml file yaml.dump(meta, open(meta_path, "w"))
[docs] @classmethod def read( cls, path: str, columns: Sequence[str] = None, *args, **kwargs, ) -> BlockManager: """Load a DataPanel stored on disk.""" # Load the metadata meta = dict( yaml.load(open(os.path.join(path, "meta.yaml")), Loader=MeerkatLoader) ) blocks = {} mgr = cls() for name, col_meta in meta["columns"].items(): column_dir = os.path.join(path, "columns", name) # load a subset of columns if columns is not None and name not in columns: continue if "block" in col_meta: # read block or fetch it from `blocks` if it's already been read block_meta = col_meta["block"] if block_meta["block_dir"] not in blocks: blocks[block_meta["block_dir"]] = AbstractBlock.read( os.path.join(path, block_meta["block_dir"]), mmap=block_meta.get("mmap", False), ) block = blocks[block_meta["block_dir"]] # read column, passing in a block_view col = col_meta["dtype"].read( column_dir, _data=block[_deserialize_block_index(block_meta["block_index"])], _meta=col_meta, **kwargs, ) mgr.add_column(col, name) else: mgr.add_column( col_meta["dtype"].read(path=column_dir, _meta=col_meta, **kwargs), name, ) mgr.reorder(meta["_column_order"]) return mgr
def _repr_pandas_(self, max_rows: int = None): if max_rows is None: max_rows = meerkat.config.DisplayOptions.max_rows cols = {} formatters = {} for name, column in self._columns.items(): cols[name], formatters[name] = column._repr_pandas_(max_rows=max_rows) if self.nrows > max_rows: pd_index = np.concatenate( ( np.arange(max_rows // 2), np.zeros(1), np.arange(self.nrows - max_rows // 2, self.nrows), ), ) else: pd_index = np.arange(self.nrows) df = pd.DataFrame(cols) df = df.set_index(pd_index.astype(int)) return df, formatters
[docs] def view(self): mgr = BlockManager() for name, col in self.items(): mgr.add_column(col.view(), name) return mgr
[docs] def copy(self): mgr = BlockManager() for name, col in self.items(): mgr.add_column(col.copy(), name) return mgr
def _serialize_block_index(index: BlockIndex) -> Union[Dict, str, int]: if not isinstance(index, (int, str, slice)): raise ValueError("Can only serialize `BlockIndex` objects.") elif isinstance(index, slice): return {"start": index.start, "stop": index.stop, "step": index.step} return index def _deserialize_block_index(index: Union[Dict, int, str]) -> BlockIndex: if isinstance(index, Dict): return slice(index["start"], index["stop"], index["step"]) return index