Source code for probables.blooms.expandingbloom

""" Expanding and Rotating BloomFilter, python implementations
    License: MIT
    Author: Tyler Barrus (barrust@gmail.com)
    URL: https://github.com/barrust/pyprobables
"""

from array import array
from io import BytesIO, IOBase
from mmap import mmap
from pathlib import Path
from struct import Struct
from typing import ByteString, Tuple, Union

from probables.blooms.bloom import BloomFilter
from probables.exceptions import RotatingBloomFilterError
from probables.hashes import HashFuncT, HashResultsT, KeyT, default_fnv_1a
from probables.utilities import MMap, is_valid_file, resolve_path


[docs] class ExpandingBloomFilter: """Simple expanding Bloom Filter implementation for use in python; the Bloom Fiter will automatically expand, or grow, if the false positive rate is about to become greater than the desired false positive rate. Args: est_elements (int): The number of estimated elements to be added false_positive_rate (float): The desired false positive rate filepath (str): Path to file to load hash_function (function): Hashing strategy function to use `hf(key, number)` Returns: ExpandingBloomFilter: An expanding Bloom Filter object Note: Initialization order of operations: 1) Filepath 2) est_elements and false_positive_rate""" __slots__ = ( "_blooms", "__fpr", "__est_elements", "__hash_func", "_added_elements", ) def __init__( self, est_elements: Union[int, None] = None, false_positive_rate: Union[float, None] = None, filepath: Union[str, Path, None] = None, hash_function: Union[HashFuncT, None] = None, ): """initialize""" self._blooms = [] # type: ignore self.__fpr = false_positive_rate if false_positive_rate is not None else 0.0 self.__est_elements = est_elements if est_elements is not None else 100 self.__hash_func: HashFuncT self._added_elements = 0 # total added... if hash_function is not None: self.__hash_func = hash_function else: self.__hash_func = default_fnv_1a if filepath is not None and is_valid_file(filepath): self.__load(filepath) else: # add in the initial bloom filter! self.__add_bloom_filter() __FOOTER_STRUCT = Struct("QQQf") __S_INT64_STRUCT = Struct("Q") _BLOOM_ELEMENT_SIZE = Struct("B").size
[docs] @classmethod def frombytes(cls, b: ByteString, hash_function: Union[HashFuncT, None] = None) -> "ExpandingBloomFilter": """ Args: b (ByteString): The bytes to load as a Expanding Bloom Filter hash_function (function): Hashing strategy function to use `hf(key, number)` Returns: ExpandingBloomFilter: A Bloom Filter object """ size, est_els, added_els, fpr = cls._parse_footer(b) blm = ExpandingBloomFilter(est_elements=est_els, false_positive_rate=fpr, hash_function=hash_function) blm._parse_blooms(b, size) blm._added_elements = added_els return blm
def __contains__(self, key: KeyT) -> bool: """setup the `in` functionality""" return self.check(key) def __bytes__(self) -> bytes: """Export bloom filter to `bytes`""" with BytesIO() as f: self.export(f) return f.getvalue() @property def expansions(self) -> int: """int: The number of expansions""" return len(self._blooms) - 1 @property def false_positive_rate(self) -> float: """float: The desired false positive rate of the expanding Bloom Filter""" return self.__fpr @property def estimated_elements(self) -> int: """int: The original number of elements estimated to be in the Bloom Filter""" return self.__est_elements @property def elements_added(self) -> int: """int: The total number of elements added""" return self._added_elements @property def hash_function(self) -> HashFuncT: """int: The total number of elements added""" return self.__hash_func
[docs] def push(self) -> None: """Push a new expansion onto the Bloom Filter""" self.__add_bloom_filter()
[docs] def check(self, key: KeyT) -> bool: """Check to see if the key is in the Bloom Filter Args: key (str): The key to check for in the Bloom Filter Returns: bool: `True` if the element is likely present; `False` if definately not present""" hashes = self._blooms[0].hashes(key) return self.check_alt(hashes)
[docs] def check_alt(self, hashes: HashResultsT) -> bool: """Check to see if the hashes are in the Bloom Filter Args: hashes (list): The hash representation to check for in the Bloom Filter Returns: bool: `True` if the element is likely present; `False` if definately not present""" for blm in self._blooms: if blm.check_alt(hashes): return True return False
[docs] def add(self, key: KeyT, force: bool = False) -> None: """Add the key to the Bloom Filter Args: key (str): The element to be inserted force (bool): `True` will force it to be inserted, even if it likely has been inserted \ before `False` will only insert if not found in the Bloom Filter""" hashes = self._blooms[0].hashes(key) self.add_alt(hashes, force)
[docs] def add_alt(self, hashes: HashResultsT, force: bool = False) -> None: """Add the element represented by hashes into the Bloom Filter Args: hashes (list): A list of integers representing the key to insert force (bool): `True` will force it to be inserted, even if it likely has been inserted \ before `False` will only insert if not found in the Bloom Filter""" self._added_elements += 1 if force or not self.check_alt(hashes): self.__check_for_growth() self._blooms[-1].add_alt(hashes)
def __add_bloom_filter(self): """build a new bloom and add it on!""" blm = BloomFilter( est_elements=self.__est_elements, false_positive_rate=self.__fpr, hash_function=self.__hash_func, ) self._blooms.append(blm) def __check_for_growth(self): """detereming if the bloom filter should automatically grow""" if self._blooms[-1].elements_added >= self.__est_elements: self.__add_bloom_filter()
[docs] def export(self, file: Union[Path, str, IOBase, mmap]) -> None: """Export an expanding Bloom Filter, or subclass, to disk Args: filepath (str): The path to the file to import""" if not isinstance(file, (IOBase, mmap)): file = resolve_path(file) with open(file, "wb") as filepointer: self.export(filepointer) # type:ignore else: filepointer = file # type:ignore # add all the different Bloom bit arrays... for blm in self._blooms: filepointer.write(self.__S_INT64_STRUCT.pack(blm.elements_added)) blm.bloom.tofile(filepointer) filepointer.write( self.__FOOTER_STRUCT.pack( len(self._blooms), self.estimated_elements, self.elements_added, self.false_positive_rate, ) )
def __load(self, file: Union[Path, str, IOBase, mmap]): """load a file""" if not isinstance(file, (IOBase, mmap)): file = resolve_path(file) with MMap(file) as filepointer: self.__load(filepointer) else: size, est_els, els_added, fpr = self._parse_footer(file) # type: ignore self._blooms = [] self._added_elements = els_added self.__fpr = fpr self.__est_elements = est_els self._parse_blooms(file, size) # type:ignore @classmethod def _parse_footer(cls, b: ByteString) -> Tuple[int, int, int, float]: offset = cls.__FOOTER_STRUCT.size size, est_els, els_added, fpr = cls.__FOOTER_STRUCT.unpack(bytes(b[-1 * offset :])) return int(size), int(est_els), int(els_added), float(fpr) def _parse_blooms(self, b: ByteString, size: int) -> None: # reset the bloom list self._blooms = [] blm_size = 0 start = 0 end = 0 for _ in range(size): blm = BloomFilter( est_elements=self.__est_elements, false_positive_rate=self.__fpr, hash_function=self.__hash_func, ) if blm_size == 0: blm_size = self._BLOOM_ELEMENT_SIZE * blm.bloom_length end = start + self.__S_INT64_STRUCT.size + blm_size blm._els_added = int(self.__S_INT64_STRUCT.unpack(bytes(b[start : start + self.__S_INT64_STRUCT.size]))[0]) blm._bloom = array("B", bytes(b[start + self.__S_INT64_STRUCT.size : end])) self._blooms.append(blm) start = end
[docs] class RotatingBloomFilter(ExpandingBloomFilter): """Simple Rotating Bloom Filter implementation that allows for the "older" elements added to be removed, in chunks. As the queue fills up, those elements inserted earlier will be bulk removed. This also provides the user with the oportunity to force the removal instead of it being time based. Args: est_elements (int): The number of estimated elements to be added false_positive_rate (float): The desired false positive rate max_queue_size (int): This is the number is used to determine the maximum number of Bloom Filters. \ Total elements added is based on `max_queue_size * est_elements` filepath (str): Path to file to load hash_function (function): Hashing strategy function to use `hf(key, number)` Note: Initialization order of operations: 1) Filepath 2) est_elements and false_positive_rate """ __slots__ = ("_queue_size",) def __init__( self, est_elements: Union[int, None] = None, false_positive_rate: Union[float, None] = None, max_queue_size: int = 10, filepath: Union[str, Path, None] = None, hash_function: Union[HashFuncT, None] = None, ) -> None: """initialize""" super().__init__( est_elements=est_elements, false_positive_rate=false_positive_rate, filepath=filepath, hash_function=hash_function, ) self._queue_size = max_queue_size
[docs] @classmethod def frombytes( # type:ignore cls, b: ByteString, max_queue_size: int, hash_function: Union[HashFuncT, None] = None ) -> "RotatingBloomFilter": """ Args: b (ByteString): The bytes to load as a Expanding Bloom Filter max_queue_size (int): This is the number is used to determine the maximum number \ of Bloom Filters. Total elements added is based on `max_queue_size * est_elements` hash_function (function): Hashing strategy function to use `hf(key, number)` Returns: RotatingBloomFilter: A Bloom Filter object """ size, est_els, added_els, fpr = cls._parse_footer(b) blm = RotatingBloomFilter( est_elements=est_els, false_positive_rate=fpr, max_queue_size=max_queue_size, hash_function=hash_function ) blm._parse_blooms(b, size) blm._added_elements = added_els return blm
@property def max_queue_size(self) -> int: """int: The maximum size for the queue""" return self._queue_size @property def current_queue_size(self) -> int: """int: The current size of the queue""" return len(self._blooms)
[docs] def add_alt(self, hashes: HashResultsT, force: bool = False) -> None: """Add the element represented by hashes into the Bloom Filter Args: hashes (list): A list of integers representing the key to insert force (bool): `True` will force it to be inserted, even if it likely has been inserted \ before `False` will only insert if not found in the Bloom Filter""" self._added_elements += 1 if force or not self.check_alt(hashes): self.__rotate_bloom_filter() self._blooms[-1].add_alt(hashes)
[docs] def pop(self) -> None: """Pop the oldest Bloom Filter off of the queue without pushing a new Bloom Filter onto the queue Raises: RotatingBloomFilterError: Unable to rotate the Bloom Filter""" if self.current_queue_size == 1: msg = "Popping a Bloom Filter will result in an unusable system!" raise RotatingBloomFilterError(msg) self._blooms.pop(0)
[docs] def push(self) -> None: """Push a new bloom filter onto the queue and rotate if necessary""" self.__rotate_bloom_filter(force=True)
def __rotate_bloom_filter(self, force: bool = False): """handle determining if/when the Bloom Filter queue needs to be rotated""" blm = self._blooms[-1] ready_to_rotate = blm.elements_added == blm.estimated_elements no_need_to_pop = self.current_queue_size < self._queue_size if force and no_need_to_pop: self.__add_bloom_filter() elif force: # must need to be pop'd first! blm = self._blooms.pop(0) self.__add_bloom_filter() elif ready_to_rotate and no_need_to_pop: self.__add_bloom_filter() elif ready_to_rotate: blm = self._blooms.pop(0) self.__add_bloom_filter() def __add_bloom_filter(self): """build a new bloom and add it on!""" blm = BloomFilter( est_elements=self.estimated_elements, false_positive_rate=self.false_positive_rate, hash_function=self.hash_function, ) self._blooms.append(blm)