""" CountingBloomFilter, python implementation
License: MIT
Author: Tyler Barrus (barrust@gmail.com)
URL: https://github.com/barrust/counting_bloom
"""
from array import array
from pathlib import Path
from struct import Struct
from typing import ByteString, Union
from probables.blooms.bloom import BloomFilter
from probables.constants import UINT32_T_MAX, UINT64_T_MAX
from probables.exceptions import InitializationError
from probables.hashes import HashFuncT, HashResultsT, KeyT
from probables.utilities import is_hex_string, is_valid_file, resolve_path
MISMATCH_MSG = "The parameter second must be of type CountingBloomFilter"
def _verify_not_type_mismatch(second: "CountingBloomFilter") -> bool:
"""verify that there is not a type mismatch"""
return isinstance(second, (CountingBloomFilter))
[docs]
class CountingBloomFilter(BloomFilter):
"""Simple Counting Bloom Filter implementation for use in python;
It can read and write the same format as the c version
(https://github.com/barrust/counting_bloom)
Args:
est_elements (int): The number of estimated elements to be added
false_positive_rate (float): The desired false positive rate
filepath (str): Path to file to load
hex_string (str): Hex based representation to be loaded
hash_function (function): Hashing strategy function to use `hf(key, number)`
Returns:
CountingBloomFilter: A Counting Bloom Filter object
Note:
Initialization order of operations:
1) From file
2) From Hex String
3) From params"""
__slots__ = ("_filepath",)
def __init__(
self,
est_elements: Union[int, None] = None,
false_positive_rate: Union[float, None] = None,
filepath: Union[str, Path, None] = None,
hex_string: Union[str, None] = None,
hash_function: Union[HashFuncT, None] = None,
) -> None:
"""setup the basic values needed"""
self._filepath = None
super().__init__(est_elements, false_positive_rate, filepath, hex_string, hash_function)
def _load_init(self, filepath, hash_function, hex_string, est_elements, false_positive_rate):
"""Handle setting params and loading everything as needed"""
self._bits_per_elm = 1.0
self._type = "counting"
self._typecode = "I"
if is_valid_file(filepath):
self._filepath = resolve_path(filepath)
self._load(self._filepath, hash_function)
elif is_hex_string(hex_string):
self._load_hex(hex_string, hash_function)
else:
if est_elements is None or false_positive_rate is None:
raise InitializationError("Insufecient parameters to set up the Counting Bloom Filter")
# calc values
fpr, n_hashes, n_bits = self._get_optimized_params(est_elements, false_positive_rate)
self._set_values(est_elements, fpr, n_hashes, n_bits, hash_function)
self._bloom_length = n_bits
self._bloom = array(self._typecode, [0]) * self._bloom_length
_IMPT_STRUCT = Struct("I")
[docs]
@classmethod
def frombytes(cls, b: ByteString, hash_function: Union[HashFuncT, None] = None) -> "CountingBloomFilter":
"""
Args:
b (ByteString): the bytes to load as a Counting Bloom Filter
hash_function (function): Hashing strategy function to use `hf(key, number)`
Returns:
CountingBloomFilter: A Counting Bloom Filter object
"""
offset = cls._FOOTER_STRUCT.size
est_els, els_added, fpr, n_hashes, n_bits = cls._parse_footer(cls._FOOTER_STRUCT, bytes(b[-1 * offset :]))
blm = CountingBloomFilter(est_elements=est_els, false_positive_rate=fpr, hash_function=hash_function)
blm._set_values(est_els, fpr, n_hashes, n_bits, hash_function)
blm._els_added = els_added
blm._parse_bloom_array(b, cls._IMPT_STRUCT.size * blm.bloom_length)
return blm
def __str__(self) -> str:
"""string representation of the counting bloom filter"""
on_disk = "no" if self.is_on_disk is False else "yes"
cnt = sum(x for x in self._bloom if x > 0)
total = sum(self._bloom)
largest = max(self._bloom)
largest_idx = (self._bloom).index(largest)
fullness = cnt / self.number_bits
els_added = total // self.number_hashes
return (
"CountingBloom:\n"
f"\tbits: {self.number_bits}\n"
f"\testimated elements: {self.estimated_elements}\n"
f"\tnumber hashes: {self.number_hashes}\n"
f"\tmax false positive rate: {self.false_positive_rate:.6f}\n"
f"\telements added: {self.elements_added}\n"
f"\tcurrent false positive rate: {self.current_false_positive_rate():.6f}\n"
f"\tis on disk: {on_disk}\n"
f"\tindex fullness: {fullness:.6}\n"
f"\tmax index usage: {largest}\n"
f"\tmax index id: {largest_idx}\n"
f"\tcalculated elements: {els_added}\n"
)
[docs]
def add(self, key: KeyT, num_els: int = 1) -> int: # type: ignore
"""Add the key to the Counting Bloom Filter
Args:
key (str): The element to be inserted
num_els (int): Number of times to insert the element
Returns:
int: Maximum number of insertions"""
return self.add_alt(self.hashes(key), num_els)
[docs]
def add_alt(self, hashes: HashResultsT, num_els: int = 1) -> int: # type: ignore
"""Add the element represented by hashes into the Counting Bloom Filter
Args:
hashes (list): A list of integers representing the key to insert
num_els (int): Number of times to insert the element
Returns:
int: Maximum number of insertions"""
# NOTE: this will increment indices each time it is viewed. Not sure if that is "correct"
# if not then we will need to update this and the C version
indices = [hashes[i] % self._bloom_length for i in range(self._number_hashes)]
vals = [self._bloom[k] + num_els for k in indices]
for i, v in enumerate(vals):
k = indices[i]
if v > UINT32_T_MAX:
self._bloom[k] = UINT32_T_MAX
vals[i] = UINT32_T_MAX
else:
self._bloom[k] += num_els # This keeps the original methodology
self.elements_added = min(self.elements_added + num_els, UINT64_T_MAX)
return min(vals)
[docs]
def check(self, key: KeyT) -> int: # type: ignore
"""Check if the key is likely in the Counting Bloom Filter
Args:
key (str): The element to be checked
Returns:
int: Maximum number of insertions"""
return self.check_alt(self.hashes(key))
[docs]
def check_alt(self, hashes: HashResultsT) -> int: # type: ignore
"""Check if the element represented by hashes is in the Counting
Bloom Filter
Args:
hashes (list): A list of integers representing the key to check
Returns:
int: Maximum number of insertions"""
return min(self._bloom[x % self.number_bits] for x in hashes)
[docs]
def remove(self, key: KeyT, num_els: int = 1) -> int:
"""Remove the element from the counting bloom
Args:
key (str): The element to be removed
num_els (int): Number of times to remove the element
Returns:
int: Maximum number of insertions after the removal"""
return self.remove_alt(self.hashes(key), num_els)
[docs]
def remove_alt(self, hashes: HashResultsT, num_els: int = 1) -> int:
"""Remvoe the element represented by hashes from the Counting Bloom Filter
Args:
hashes (list): A list of integers representing the key to remove
num_els (int): Number of times to remove the element
Returns:
int: Maximum number of insertions after the removal"""
indices = [hashes[i] % self._bloom_length for i in range(self._number_hashes)]
vals = [self._bloom[k] for k in indices]
min_val = min(vals)
if min_val == UINT32_T_MAX: # cannot remove if we have hit the max
return UINT32_T_MAX
if min_val == 0:
return 0
to_remove = num_els if min_val > num_els else min_val
for k in indices:
if self._bloom[k] < UINT32_T_MAX: # only remove if less than UINT32_T_MAX
self._bloom[k] -= to_remove
self.elements_added -= to_remove
return min_val - to_remove
[docs]
def intersection(self, second: "CountingBloomFilter") -> Union["CountingBloomFilter", None]: # type: ignore
"""Take the intersection of two Counting Bloom Filters
Args:
second (CountingBloomFilter): The Bloom Filter with which to take the intersection
Returns:
CountingBloomFilter: The new Counting Bloom Filter containing the union
Raises:
TypeError: When second is not a :class:`CountingBloomFilter`
Note:
The elements_added property will be set to the estimated number of unique elements \
added as found in estimate_elements()
Note:
If `second` is not of the same size (false_positive_rate and est_elements) then \
this will return `None`"""
if not _verify_not_type_mismatch(second):
raise TypeError(MISMATCH_MSG)
if self._verify_bloom_similarity(second) is False:
return None
res = CountingBloomFilter(
est_elements=self.estimated_elements,
false_positive_rate=self.false_positive_rate,
hash_function=self.hash_function,
)
for i in range(self.bloom_length):
if self._bloom[i] > 0 and second._bloom[i] > 0:
tmp = self._bloom[i] + second._bloom[i]
res.bloom[i] = tmp
res.elements_added = res.estimate_elements()
return res
[docs]
def jaccard_index(self, second: "CountingBloomFilter") -> Union[float, None]: # type:ignore
"""Take the Jaccard Index of two Counting Bloom Filters
Args:
second (CountingBloomFilter): The Bloom Filter with which to take the jaccard index
Returns:
float: A numeric value between 0 and 1 where 1 is identical and 0 means completely different
Raises:
TypeError: When second is not a :class:`CountingBloomFilter`
Note:
The Jaccard Index is based on the unique set of elements added and not the number of each element added
Note:
If `second` is not of the same size (false_positive_rate and est_elements) then this will return `None`"""
if not _verify_not_type_mismatch(second):
raise TypeError(MISMATCH_MSG)
if self._verify_bloom_similarity(second) is False:
return None
count_union = 0
count_inter = 0
for i in range(self.bloom_length):
if self._bloom[i] > 0 or second._bloom[i] > 0:
count_union += 1
if self._bloom[i] > 0 and second._bloom[i] > 0:
count_inter += 1
if count_union == 0:
return 1.0
return count_inter / count_union
[docs]
def union(self, second: "CountingBloomFilter") -> Union["CountingBloomFilter", None]: # type:ignore
"""Return a new Countiong Bloom Filter that contains the union of
the two
Args:
second (CountingBloomFilter): The Counting Bloom Filter with which to calculate the union
Returns:
CountingBloomFilter: The new Counting Bloom Filter containing the union
Raises:
TypeError: When second is not a :class:`CountingBloomFilter`
Note:
The elements_added property will be set to the estimated number of unique elements added as \
found in estimate_elements()
Note:
If `second` is not of the same size (false_positive_rate and est_elements) then this will return `None`"""
if not _verify_not_type_mismatch(second):
raise TypeError(MISMATCH_MSG)
if self._verify_bloom_similarity(second) is False:
return None
res = CountingBloomFilter(
est_elements=self.estimated_elements,
false_positive_rate=self.false_positive_rate,
hash_function=self.hash_function,
)
for i in range(self.bloom_length):
tmp = self._bloom[i] + second._bloom[i]
res._bloom[i] = tmp
res.elements_added = res.estimate_elements()
return res
def _cnt_number_bits_set(self) -> int:
"""calculate the total number of set bits in the bloom"""
return sum(1 for x in self._bloom if x > 0)