Source code for meerkat.writers.numpy_writer

import os
from pathlib import Path

from numpy.lib.format import open_memmap

from meerkat.columns.abstract import AbstractColumn
from meerkat.columns.numpy_column import NumpyArrayColumn
from meerkat.writers.abstract import AbstractWriter


[docs]class NumpyMemmapWriter(AbstractWriter): def __init__( self, path: str = None, dtype: str = "float32", mode: str = "r", shape: tuple = None, output_type: type = NumpyArrayColumn, template: AbstractColumn = None, *args, **kwargs, ): super(NumpyMemmapWriter, self).__init__(*args, **kwargs) # File used to store data self.file = None # Location of the pointer self._pointer = 0 # If `path` is specified self.path = path self.dtype = dtype self.shape = shape if path is not None: self.open(path=path, dtype=dtype, mode=mode, shape=shape) self.output_type = output_type self.template = template
[docs] def open( self, path: str, dtype: str = "float32", mode: str = "w+", shape: tuple = None, ) -> None: assert shape is not None, "Must specify `shape`." # Make all dirs to path os.makedirs(str(Path(path).absolute().parent), exist_ok=True) # Open the file as a memmap self.file = open_memmap(path, dtype=dtype, mode=mode, shape=shape) self._pointer = 0 self.path = path self.shape = shape
[docs] def write(self, arr, **kwargs) -> None: self.file[self._pointer : self._pointer + len(arr)] = arr self._pointer += len(arr)
[docs] def flush(self): """Close the mmap file and reopen to release memory.""" self.file.flush() self.file.base.close() # ‘r+’ Open existing file for reading and writing. self.file = open_memmap(self.file.filename, mode="r+")
[docs] def finalize(self, *args, **kwargs) -> AbstractColumn: self.flush() data = self.file if self.template is not None: if isinstance(data, AbstractColumn): data = data.data data = self.template._clone(data=data) else: data = self.output_type(data) return data
[docs] def close(self, *args, **kwargs) -> None: pass