""" BloomFilter and BloomFiter on Disk, python implementation
License: MIT
Author: Tyler Barrus (barrust@gmail.com)
"""
from array import array
from typing import Optional
from probables.hashes import KeyT, SimpleHashT, fnv_1a_32
from probables.utilities import Bitarray
[docs]
class QuotientFilter:
"""Simple Quotient Filter implementation
Args:
quotient (int): The size of the quotient to use
hash_function (function): Hashing strategy function to use `hf(key, number)`
Returns:
QuotientFilter: The initialized filter
Raises:
ValueError:
Note:
The size of the QuotientFilter will be 2**q"""
__slots__ = (
"_q",
"_r",
"_size",
"_elements_added",
"_hash_func",
"_int_type_code",
"_bits_per_elm",
"_is_occupied",
"_is_continuation",
"_is_shifted",
"_filter",
)
def __init__(self, quotient: int = 20, hash_function: Optional[SimpleHashT] = None): # needs to be parameterized
if quotient < 3 or quotient > 31:
raise ValueError(
f"Quotient filter: Invalid quotient setting; quotient must be between 3 and 31; {quotient} was provided"
)
self._q = quotient
self._r = 32 - quotient
self._size = 1 << self._q # same as 2**q
self._elements_added = 0
self._hash_func: SimpleHashT = fnv_1a_32 if hash_function is None else hash_function # type: ignore
# ensure we use the smallest type possible to reduce memory wastage
if self._r <= 8:
self._int_type_code = "B"
self._bits_per_elm = 8
elif self._r <= 16:
self._int_type_code = "I"
self._bits_per_elm = 16
else:
self._int_type_code = "L"
self._bits_per_elm = 32
self._is_occupied = Bitarray(self._size)
self._is_continuation = Bitarray(self._size)
self._is_shifted = Bitarray(self._size)
self._filter = array(self._int_type_code, [0]) * self._size
def __contains__(self, val: KeyT) -> bool:
"""setup the `in` keyword"""
return self.check(val)
@property
def quotient(self) -> int:
"""int: The size of the quotient, in bits"""
return self._q
@property
def remainder(self) -> int:
"""int: The size of the remainder, in bits"""
return self._r
@property
def num_elements(self) -> int:
"""int: The total size of the filter"""
return self._size
@property
def elements_added(self) -> int:
"""int: The number of elements added to the filter"""
return self._elements_added
@property
def bits_per_elm(self):
"""int: The number of bits used per element"""
return self._bits_per_elm
[docs]
def add(self, key: KeyT) -> None:
"""Add key to the quotient filter
Args:
key (str|bytes): The element to add"""
_hash = self._hash_func(key, 0)
key_quotient = _hash >> self._r
key_remainder = _hash & ((1 << self._r) - 1)
if not self._contains(key_quotient, key_remainder):
# TODO, add it here
self._add(key_quotient, key_remainder)
[docs]
def check(self, key: KeyT) -> bool:
"""Check to see if key is likely in the quotient filter
Args:
key (str|bytes): The element to add
Return:
bool: True if likely encountered, False if definately not"""
_hash = self._hash_func(key, 0)
key_quotient = _hash >> self._r
key_remainder = _hash & ((1 << self._r) - 1)
return self._contains(key_quotient, key_remainder)
def _shift_insert(self, k, v, start, j, flag):
if self._is_occupied[j] == 0 and self._is_continuation[j] == 0 and self._is_shifted[j] == 0:
self._filter[j] = v
self._is_occupied[k] = 1
self._is_continuation[j] = 1 if j != start else 0
self._is_shifted[j] = 1 if j != k else 0
else:
i = (j + 1) & (self._size - 1)
while True:
f = self._is_occupied[i] + self._is_continuation[i] + self._is_shifted[i]
temp = self._is_continuation[i]
self._is_continuation[i] = self._is_continuation[j]
self._is_continuation[j] = temp
self._is_shifted[i] = 1
temp = self._filter[i]
self._filter[i] = self._filter[j]
self._filter[j] = temp
if f == 0:
break
i = (i + 1) & (self._size - 1)
self._filter[j] = v
self._is_occupied[k] = 1
self._is_continuation[j] = 1 if j != start else 0
self._is_shifted[j] = 1 if j != k else 0
if flag == 1:
self._is_continuation[(j + 1) & (self._size - 1)] = 1
def _get_start_index(self, k):
j = k
cnts = 0
while True:
if j == k or self._is_occupied[j] == 1:
cnts += 1
if self._is_shifted[j] == 1:
j = (j - 1) & (self._size - 1)
else:
break
while True:
if self._is_continuation[j] == 0:
if cnts == 1:
break
cnts -= 1
j = (j + 1) & (self._size - 1)
return j
def _add(self, q: int, r: int):
if self._is_occupied[q] == 0 and self._is_continuation[q] == 0 and self._is_shifted[q] == 0:
self._filter[q] = r
self._is_occupied[q] = 1
else:
start_idx = self._get_start_index(q)
if self._is_occupied[q] == 0:
self._shift_insert(q, r, start_idx, start_idx, 0)
else:
orig_start_idx = start_idx
starts = 0
f = (
self._is_occupied.check_bit(start_idx)
+ self._is_continuation.check_bit(start_idx)
+ self._is_shifted.check_bit(start_idx)
)
while starts == 0 and f != 0 and r > self._filter[start_idx]:
start_idx = (start_idx + 1) & (self._size - 1)
if self._is_continuation[start_idx] == 0:
starts += 1
f = (
self._is_occupied.check_bit(start_idx)
+ self._is_continuation.check_bit(start_idx)
+ self._is_shifted.check_bit(start_idx)
)
if starts == 1:
self._shift_insert(q, r, orig_start_idx, start_idx, 0)
else:
self._shift_insert(q, r, orig_start_idx, start_idx, 1)
self._elements_added += 1
def _contains(self, q: int, r: int) -> bool:
if self._is_occupied[q] == 0:
return False
start_idx = self._get_start_index(q)
starts = 0
meta_bits = (
self._is_occupied.check_bit(start_idx)
+ self._is_continuation.check_bit(start_idx)
+ self._is_shifted.check_bit(start_idx)
)
while meta_bits != 0:
if self._is_continuation[start_idx] == 0:
starts += 1
if starts == 2 or self._filter[start_idx] > r:
break
if self._filter[start_idx] == r:
return True
start_idx = (start_idx + 1) & (self._size - 1)
meta_bits = (
self._is_occupied.check_bit(start_idx)
+ self._is_continuation.check_bit(start_idx)
+ self._is_shifted.check_bit(start_idx)
)
return False