Source code for meerkat.block.manager

from __future__ import annotations

import os
import shutil
from collections import defaultdict
from collections.abc import MutableMapping
from typing import Dict, List, Mapping, Sequence, Tuple, Union

import numpy as np
import pandas as pd
import yaml

import meerkat.config
from meerkat.block.abstract import AbstractBlock, BlockIndex
from meerkat.columns.abstract import AbstractColumn
from meerkat.tools.utils import MeerkatLoader

from .lambda_block import LambdaBlock
from .ref import BlockRef


[docs]class BlockManager(MutableMapping): """Manages all blocks in a DataPanel.""" def __init__(self) -> None: self._columns: Dict[str, AbstractColumn] = {} # ordered as of 3.7 self._column_to_block_id: Dict[str, int] = {} self._block_refs: Dict[int, BlockRef] = {}
[docs] def update(self, block_ref: BlockRef): """data (): a single blockable object, potentially contains multiple columns.""" for name in block_ref: if name in self: self.remove(name) # although we can't have the same column living in multiple managers # we don't view here because it can lead to multiple calls to clone self._columns.update(block_ref) block_id = id(block_ref.block) # check if there already is a block_ref in the manager for this block if block_id in self._block_refs: self._block_refs[block_id].update(block_ref) else: self._block_refs[block_id] = block_ref self._column_to_block_id.update({name: block_id for name in block_ref.keys()})
[docs] def topological_block_refs(self): """Topological sort of the block refs based on Kahn's algorithm.""" children = defaultdict(list) parents = defaultdict(list) for block_id, block_ref in self._block_refs.items(): if isinstance(block_ref.block, LambdaBlock): for arg in block_ref.block.data.args + list( block_ref.block.data.kwargs.values() ): if arg.is_blockable(): children[id(arg._block)].append(block_id) # if the parent is in the block ref, add it to the graph if (id(arg._block)) in self._block_refs: parents[block_id].append(id(arg._block)) current = [] # get a set of all the nodes without an incoming edge for block_id, block_ref in self._block_refs.items(): if not parents[block_id] or not isinstance(block_ref.block, LambdaBlock): current.append((block_id, block_ref)) while current: block_id, block_ref = current.pop(0) yield block_id, block_ref for child_id in children[block_id]: parents[child_id].remove(block_id) if not parents[child_id]: current.append((child_id, self._block_refs[child_id]))
[docs] def apply(self, method_name: str = "_get", *args, **kwargs) -> BlockManager: """""" from .lambda_block import LambdaBlock results = None indexed_inputs = {} for _, block_ref in self.topological_block_refs(): if isinstance(block_ref.block, LambdaBlock): # defer computation of lambda columns, since they may be functions of # the other columns result = block_ref.apply( method_name=method_name, indexed_inputs=indexed_inputs, *args, **kwargs, ) # continue # TODO: fix this to work with chained lambda columns else: result = block_ref.apply(method_name=method_name, *args, **kwargs) if results is None: # apply returns one of BlockRef, List[BlockRef], dict results = BlockManager() if isinstance(result, (BlockRef, List)) else {} if isinstance(result, List): for ref in result: if isinstance(ref, BlockRef): results.update(ref) elif isinstance(ref, Tuple): name, column = ref results[name] = column else: raise ValueError("Unrecognized.") elif isinstance(result, (BlockRef, Dict)): # result is a new block_ref new_block_ref = result for name, col in block_ref.items(): indexed_inputs[id(col)] = new_block_ref[name] results.update(result) else: raise ValueError("Unexpected result of type {}".format(type(result))) # apply method to columns not stored in block for name, col in self._columns.items(): if results is not None and name in results: continue result = getattr(col, method_name)(*args, **kwargs) if results is None: results = BlockManager() if isinstance(result, AbstractColumn) else {} results[name] = result if isinstance(results, BlockManager): results.reorder(self.keys()) return results
[docs] def consolidate(self, consolidate_unitary_groups: bool = False): column_order = list( self._columns.keys() ) # need to maintain order after consolidate block_ref_groups = defaultdict(list) for _, block_ref in self.topological_block_refs(): block_ref_groups[block_ref.block.signature].append(block_ref) # TODO we need to go through these block_ref groups in topological order consolidated_inputs: Dict[int, AbstractColumn] = {} for block_refs in block_ref_groups.values(): if (not consolidate_unitary_groups) and len(block_refs) == 1: # if there is only one block ref in the group, do not consolidate continue # consolidate group block_class = block_refs[0].block.__class__ # consolidate needs to return a mapping from old column ids to new column # ids so that we can update dependent lambda columns. new_block_ref = block_class.consolidate( block_refs, consolidated_inputs=consolidated_inputs ) for block_ref in block_refs: for name, col in block_ref.items(): consolidated_inputs[id(col)] = new_block_ref[name] self.update(new_block_ref) self.reorder(column_order)
[docs] def remove(self, name): if name not in self._columns: raise ValueError(f"Remove failed: no column '{name}' in BlockManager.") self._columns.pop(name) if name in self._column_to_block_id: # column is blockable block_ref = self._block_refs[self._column_to_block_id[name]] del block_ref[name] if len(block_ref) == 0: self._block_refs.pop(self._column_to_block_id[name]) self._column_to_block_id.pop(name)
[docs] def reorder(self, order: Sequence[str]): if set(order) != set(self._columns): raise ValueError("Must include all columns when reordering a BlockManager.") self._columns = {name: self._columns[name] for name in order}
def __getitem__( self, index: Union[str, Sequence[str]] ) -> Union[AbstractColumn, BlockManager]: if isinstance(index, str): return self._columns[index] elif isinstance(index, Sequence): mgr = BlockManager() block_id_to_names = defaultdict(list) for name in index: if name not in self._column_to_block_id: if name in self: # non-blockable column mgr.add_column(col=self._columns[name], name=name) else: raise ValueError( f"`BlockManager` does not contain column '{name}'." ) else: # group blockable columns by block block_id_to_names[self._column_to_block_id[name]].append(name) # block refs for blockable columns for block_id, names in block_id_to_names.items(): block_ref = self._block_refs[block_id] mgr.update(block_ref[names]) mgr.reorder(order=index) return mgr else: raise ValueError( f"Unsupported index of type `{type(index)}` passed to `BlockManager`." ) def __setitem__(self, index: str, data: Union[str, Sequence[str]]): if isinstance(data, AbstractColumn): self.add_column(data, name=index) else: raise ValueError( f"Cannot set item with object of type `{type(data)}` on `BlockManager`." ) def __delitem__(self, key): self.remove(key) def __len__(self): return len(self._columns) @property def nrows(self): return 0 if len(self) == 0 else len(next(iter(self._columns.values()))) @property def ncols(self): return len(self) def __contains__(self, value): return value in self._columns def __iter__(self): return iter(self._columns)
[docs] def get_block_ref(self, name: str): return self._block_refs[self._column_to_block_id[name]]
[docs] def add_column(self, col: AbstractColumn, name: str): """Convert data to a meerkat column using the appropriate Column type.""" if len(self) > 0 and len(col) != self.nrows: raise ValueError( f"Cannot add column '{name}' with length {len(col)} to `BlockManager` " f" with length {self.nrows} columns." ) # col = col.view() if not col.is_blockable(): self._columns[name] = col else: self.update(BlockRef(columns={name: col}, block=col._block))
[docs] @classmethod def from_dict(cls, data: Mapping[str, object]): mgr = cls() for name, data in data.items(): col = AbstractColumn.from_data(data) mgr.add_column(col=col, name=name) return mgr
[docs] def write(self, path: str): meta = { "dtype": BlockManager, "columns": {}, "_column_order": list(self.keys()), } # prepare directories columns_dir = os.path.join(path, "columns") blocks_dir = os.path.join(path, "blocks") meta_path = os.path.join(path, "meta.yaml") if os.path.isdir(path): if ( os.path.exists(meta_path) and os.path.exists(columns_dir) and os.path.exists(blocks_dir) ): # if overwriting, ensure that old columns are removed shutil.rmtree(columns_dir) shutil.rmtree(blocks_dir) else: # if path already points to a dir that wasn't previously holding a # block manager, do not overwrite it. We'd like to protect against # situation in which user accidentally puts in an important directory raise IsADirectoryError( f"Cannot write `BlockManager`. {path} is a directory." ) os.makedirs(path, exist_ok=True) os.makedirs(blocks_dir) os.makedirs(columns_dir) # consolidate before writing # we also want to consolidate unitary groups (i.e. groups with only one block # ref) so that we don't write any data not actually in the dataframe # because of this we need to make sure lambda columns know about there # dependencies self.consolidate(consolidate_unitary_groups=True) # TODO: change this back # maintain a dictionary mapping column ids to paths where they are written # so that lambda blocks that depend on those columns can refer to them # appropriately written_inputs: Dict[int, str] = {} for block_id, block_ref in self.topological_block_refs(): block: AbstractBlock = block_ref.block block_dir = os.path.join(blocks_dir, str(block_id)) if isinstance(block, LambdaBlock): block.write(block_dir, written_inputs=written_inputs) else: block.write(block_dir) for name, column in block_ref.items(): column_dir = os.path.join(columns_dir, name) os.makedirs(column_dir, exist_ok=True) # don't write the data, reference the block meta["columns"][name] = { **column._get_meta(), "block": { "block_dir": os.path.relpath(block_dir, path), "block_index": _serialize_block_index(column._block_index), "mmap": block.is_mmap, }, } column._write_state(column_dir) # add the written column to the inputs written_inputs[id(column)] = os.path.relpath(column_dir, path) # write columns not in a block for name, column in self._columns.items(): if name in meta["columns"]: continue meta["columns"][name] = column._get_meta() column.write(os.path.join(columns_dir, name)) # TODO(sabri): move this above and add to written inputs # Save the metadata as a yaml file # sort_keys=Flase is required so that the columns are written in topological # order yaml.dump(meta, open(meta_path, "w"), sort_keys=False)
[docs] @classmethod def read( cls, path: str, columns: Sequence[str] = None, **kwargs, ) -> BlockManager: """Load a DataPanel stored on disk.""" # Load the metadata meta = dict( yaml.load(open(os.path.join(path, "meta.yaml")), Loader=MeerkatLoader) ) # maintain a dictionary mapping from paths to columns # so that lambda blocks that depend on those columns don't load them again read_inputs: Dict[int, AbstractColumn] = {} blocks = {} mgr = cls() for name, col_meta in meta["columns"].items(): column_dir = os.path.join(path, "columns", name) # load a subset of columns if columns is not None and name not in columns: continue if "block" in col_meta: # read block or fetch it from `blocks` if it's already been read block_meta = col_meta["block"] if block_meta["block_dir"] not in blocks: blocks[block_meta["block_dir"]] = AbstractBlock.read( os.path.join(path, block_meta["block_dir"]), mmap=block_meta.get("mmap", False), read_inputs=read_inputs, ) block = blocks[block_meta["block_dir"]] # read column, passing in a block_view col = col_meta["dtype"].read( column_dir, _data=block[_deserialize_block_index(block_meta["block_index"])], _meta=col_meta, **kwargs, ) mgr.add_column(col, name) read_inputs[os.path.relpath(column_dir, path)] = col else: mgr.add_column( col_meta["dtype"].read(path=column_dir, _meta=col_meta, **kwargs), name, ) mgr.reorder(meta["_column_order"]) return mgr
def _repr_pandas_(self, max_rows: int = None): if max_rows is None: max_rows = meerkat.config.display.max_rows cols = {} formatters = {} for name, column in self._columns.items(): cols[name], formatters[name] = column._repr_pandas_(max_rows=max_rows) if self.nrows > max_rows: pd_index = np.concatenate( ( np.arange(max_rows // 2), np.zeros(1), np.arange(self.nrows - max_rows // 2, self.nrows), ), ) else: pd_index = np.arange(self.nrows) df = pd.DataFrame(cols) df = df.set_index(pd_index.astype(int)) return df, formatters
[docs] def view(self): mgr = BlockManager() for name, col in self.items(): mgr.add_column(col.view(), name) return mgr
[docs] def copy(self): mgr = BlockManager() for name, col in self.items(): mgr.add_column(col.copy(), name) return mgr
def _serialize_block_index(index: BlockIndex) -> Union[Dict, str, int]: if index is not None and not isinstance(index, (int, str, slice)): raise ValueError("Can only serialize `BlockIndex` objects.") elif isinstance(index, slice): return {"start": index.start, "stop": index.stop, "step": index.step} return index def _deserialize_block_index(index: Union[Dict, int, str]) -> BlockIndex: if isinstance(index, Dict): return slice(index["start"], index["stop"], index["step"]) return index