""" BloomFilter and BloomFiter on Disk, python implementation
License: MIT
Author: Tyler Barrus (barrust@gmail.com)
"""
from array import array
from typing import Iterator, List, Optional
from probables.exceptions import QuotientFilterError
from probables.hashes import KeyT, SimpleHashT, fnv_1a_32
from probables.utilities import Bitarray
[docs]
class QuotientFilter:
"""Simple Quotient Filter implementation
Args:
quotient (int): The size of the quotient to use
auto_expand (bool): Automatically expand or not
hash_function (function): Hashing strategy function to use `hf(key, number)`
Returns:
QuotientFilter: The initialized filter
Raises:
QuotientFilterError: Raised when unable to initialize
Note:
The size of the QuotientFilter will be 2**q"""
__slots__ = (
"_q",
"_r",
"_size",
"_elements_added",
"_hash_func",
"_int_type_code",
"_bits_per_elm",
"_is_occupied",
"_is_continuation",
"_is_shifted",
"_filter",
"_max_load_factor",
"_auto_resize",
)
def __init__(
self, quotient: int = 20, auto_expand: bool = True, hash_function: Optional[SimpleHashT] = None
): # needs to be parameterized
if quotient < 3 or quotient > 31:
raise QuotientFilterError(
f"Invalid quotient setting; quotient must be between 3 and 31; {quotient} was provided"
)
self.__set_params(quotient, auto_expand, hash_function)
def __set_params(self, quotient: int, auto_expand: bool, hash_function: Optional[SimpleHashT]):
self._q: int = quotient
self._r: int = 32 - quotient
self._size: int = 1 << self._q # same as 2**q
self._elements_added: int = 0
self._auto_resize: bool = auto_expand
self._hash_func: SimpleHashT = fnv_1a_32 if hash_function is None else hash_function # type: ignore
self._max_load_factor: float = 0.85
# ensure we use the smallest type possible to reduce memory wastage
if self._r <= 8:
self._int_type_code = "B"
self._bits_per_elm = 8
elif self._r <= 16:
self._int_type_code = "I"
self._bits_per_elm = 16
else:
self._int_type_code = "L"
self._bits_per_elm = 32
self._is_occupied = Bitarray(self._size)
self._is_continuation = Bitarray(self._size)
self._is_shifted = Bitarray(self._size)
self._filter = array(self._int_type_code, [0]) * self._size
def __contains__(self, val: KeyT) -> bool:
"""setup the `in` keyword"""
return self.check(val)
@property
def quotient(self) -> int:
"""int: The size of the quotient, in bits"""
return self._q
@property
def remainder(self) -> int:
"""int: The size of the remainder, in bits"""
return self._r
@property
def num_elements(self) -> int:
"""int: The total size of the filter"""
return self._size
@property
def elements_added(self) -> int:
"""int: The number of elements added to the filter"""
return self._elements_added
@property
def bits_per_elm(self) -> int:
"""int: The number of bits used per element"""
return self._bits_per_elm
@property
def size(self) -> int:
"""int: The number of bins available in the filter
Note:
same as `num_elements`"""
return self._size
@property
def load_factor(self) -> float:
"""float: The load factor of the filter"""
return self._elements_added / self._size
@property
def auto_expand(self) -> bool:
"""bool: Will the quotient filter automatically expand"""
return self._auto_resize
@auto_expand.setter
def auto_expand(self, val: bool):
"""change the auto expand property"""
self._auto_resize = bool(val)
@property
def max_load_factor(self) -> float:
"""float: The maximum allowed load factor after which auto expanding should occur"""
return self._max_load_factor
@max_load_factor.setter
def max_load_factor(self, val: float):
"""set the maximum load factor"""
self._max_load_factor = float(val)
[docs]
def add(self, key: KeyT) -> None:
"""Add key to the quotient filter
Args:
key (str|bytes): The element to add
Raises:
QuotientFilterError: Raised when no locations are available in which to insert"""
_hash = self._hash_func(key, 0)
self.add_alt(_hash)
[docs]
def add_alt(self, _hash: int) -> None:
"""Add the pre-hashed value to the quotient filter
Args:
_hash (int): The element to add
Raises:
QuotientFilterError: Raised when no locations are available in which to insert"""
if self._auto_resize and self.load_factor >= self._max_load_factor:
self.resize()
key_quotient = _hash >> self._r
key_remainder = _hash & ((1 << self._r) - 1)
if self._contained_at_loc(key_quotient, key_remainder) == -1:
self._add(key_quotient, key_remainder)
[docs]
def check(self, key: KeyT) -> bool:
"""Check to see if key is likely in the quotient filter
Args:
key (str|bytes): The element to add
Return:
bool: True if likely encountered, False if definately not"""
_hash = self._hash_func(key, 0)
return self.check_alt(_hash)
[docs]
def check_alt(self, _hash: int) -> bool:
"""Check to see if the pre-calculated hash is likely in the quotient filter
Args:
_hash (int): The element to add
Return:
bool: True if likely encountered, False if definately not"""
key_quotient = _hash >> self._r
key_remainder = _hash & ((1 << self._r) - 1)
return not self._contained_at_loc(key_quotient, key_remainder) == -1
[docs]
def hashes(self) -> Iterator[int]:
"""A generator over the hashes in the quotient filter
Yields:
int: The next hash stored in the quotient filter"""
queue: List[int] = []
# find first empty location
start = 0
while not self._is_empty_element(start):
start += 1
cur_quot = 0
for i in range(start, self._size + start): # this will allow for wrap-arounds
idx = i % self._size
is_occupied = self._is_occupied.check_bit(idx)
is_continuation = self._is_continuation.check_bit(idx)
is_shifted = self._is_shifted.check_bit(idx)
# Nothing here, keep going
if is_occupied + is_continuation + is_shifted == 0:
assert len(queue) == 0
continue
if is_occupied == 1: # keep track of the indicies that match a hashed quotient
queue.append(idx)
# run start
if self._is_run_start(idx):
cur_quot = queue.pop(0)
yield (cur_quot << self._r) + self._filter[idx]
[docs]
def get_hashes(self) -> List[int]:
"""Get the hashes from the quotient filter as a list
Returns:
list(int): The hash values stored in the quotient filter"""
return list(self.hashes())
[docs]
def resize(self, quotient: Optional[int] = None) -> None:
"""Resize the quotient filter to use the new quotient size
Args:
quotient (int): The new quotient to use
Note:
If `None` is provided, the quotient filter will double in size (quotient + 1)
Raises:
QuotientFilterError: When the new quotient will not accommodate the elements already added"""
if quotient is None:
quotient = self._q + 1
if self.elements_added >= (1 << quotient):
raise QuotientFilterError("Unable to shrink since there will be too many elements in the quotient filter")
if quotient < 3 or quotient > 31:
raise QuotientFilterError(
f"Invalid quotient setting; quotient must be between 3 and 31; {quotient} was provided"
)
hashes = self.get_hashes()
for i in range(self._size):
self._filter[i] = 0
self.__set_params(quotient, self._auto_resize, self._hash_func)
for _h in hashes:
self.add_alt(_h)
[docs]
def merge(self, second: "QuotientFilter") -> None:
"""Merge the `second` quotient filter into the first
Args:
second (QuotientFilter): The quotient filter to merge
Note:
The hashing function between the two filters should match
Note:
Errors can occur if the quotient filter being inserted into does not expand (i.e., auto_expand=False)"""
if self._hash_func("test", 0) != second._hash_func("test", 0):
raise QuotientFilterError("Hash functions do not match")
for _h in second.hashes():
self.add_alt(_h)
def _shift_insert(self, q: int, r: int, orig_idx: int, insert_idx: int, flag: int):
if self._is_empty_element(insert_idx):
self._filter[insert_idx] = r
self._is_occupied[q] = 1
self._is_continuation[insert_idx] = 1 if insert_idx != orig_idx else 0
self._is_shifted[insert_idx] = 1 if insert_idx != q else 0
else:
next_idx = (insert_idx + 1) & (self._size - 1)
while True:
was_empty = self._is_empty_element(next_idx)
temp = self._is_continuation[next_idx]
self._is_continuation[next_idx] = self._is_continuation[insert_idx]
self._is_continuation[insert_idx] = temp
self._is_shifted[next_idx] = 1
temp = self._filter[next_idx]
self._filter[next_idx] = self._filter[insert_idx]
self._filter[insert_idx] = temp
if was_empty:
break
next_idx = (next_idx + 1) & (self._size - 1)
self._filter[insert_idx] = r
self._is_occupied[q] = 1
self._is_continuation[insert_idx] = 1 if insert_idx != orig_idx else 0
self._is_shifted[insert_idx] = 1 if insert_idx != q else 0
if flag == 1:
self._is_continuation[(insert_idx + 1) & (self._size - 1)] = 1
def _get_start_index(self, quotient: int) -> int:
if self._is_empty_element(quotient):
return quotient
j = quotient
cnts: int = 0
while True:
if j == quotient or self._is_occupied[j] == 1:
cnts += 1
if self._is_shifted[j] == 1:
j = (j - 1) & (self._size - 1)
else:
break
while True:
if self._is_continuation[j] == 0:
if cnts == 1:
break
cnts -= 1
j = (j + 1) & (self._size - 1)
return j
def _add(self, q: int, r: int):
if self._size == self._elements_added:
raise QuotientFilterError("Unable to insert the element due to insufficient space")
if self._is_empty_element(q):
self._filter[q] = r
self._is_occupied[q] = 1
else:
start_idx = self._get_start_index(q)
if self._is_occupied[q] == 0:
self._shift_insert(q, r, start_idx, start_idx, 0)
else:
orig_start_idx = start_idx
starts = 0
f = (
self._is_occupied.check_bit(start_idx)
+ self._is_continuation.check_bit(start_idx)
+ self._is_shifted.check_bit(start_idx)
)
while starts == 0 and f != 0 and r > self._filter[start_idx]:
start_idx = (start_idx + 1) & (self._size - 1)
if self._is_continuation[start_idx] == 0:
starts += 1
f = (
self._is_occupied.check_bit(start_idx)
+ self._is_continuation.check_bit(start_idx)
+ self._is_shifted.check_bit(start_idx)
)
if starts == 1:
self._shift_insert(q, r, orig_start_idx, start_idx, 0)
else:
self._shift_insert(q, r, orig_start_idx, start_idx, 1)
self._elements_added += 1
def _contained_at_loc(self, q: int, r: int) -> int:
"""returns the index location of the element, or -1 if not present"""
if self._is_occupied[q] == 0:
return -1
start_idx = self._get_start_index(q)
starts = 0
while self._is_empty_element(start_idx) is False:
if self._is_continuation[start_idx] == 0:
starts += 1
if starts == 2 or self._filter[start_idx] > r:
break
if self._filter[start_idx] == r:
return start_idx
start_idx = (start_idx + 1) & (self._size - 1)
return -1
def _is_cluster_start(self, elt: int) -> bool:
return self._is_occupied[elt] == 1 and self._is_continuation[elt] == 0 and self._is_shifted[elt] == 0
def _is_run_start(self, elt: int) -> bool:
return not self._is_continuation[elt] == 1 and (self._is_occupied[elt] == 1 or self._is_shifted[elt] == 1)
def _is_empty_element(self, elt: int) -> bool:
return (
self._is_occupied.check_bit(elt) + self._is_continuation.check_bit(elt) + self._is_shifted.check_bit(elt)
) == 0
[docs]
def print(self):
"""show the bits and the run/cluster/continuation/empty status"""
for i in range(self._size):
# is_a = ""
if self._is_empty_element(i):
is_a = "Empty"
elif self._is_cluster_start(i):
is_a = "Cluster Start"
elif self._is_run_start(i):
is_a = "Run Start"
else:
is_a = "Continuation"
print(f"{i}\t--\t{self._is_occupied[i]}-{self._is_continuation[i]}-{self._is_shifted[i]}\t{is_a}")