""" Counting Cuckoo Filter, python implementation
License: MIT
Author: Tyler Barrus (barrust@gmail.com)
"""
import random
from array import array
from io import IOBase
from mmap import mmap
from pathlib import Path
from struct import Struct
from typing import ByteString, List, Union
from probables.cuckoo.cuckoo import CuckooFilter
from probables.exceptions import CuckooFilterFullError
from probables.hashes import KeyT, SimpleHashT
from probables.utilities import MMap, resolve_path
[docs]
class CountingCuckooFilter(CuckooFilter):
"""Simple Counting Cuckoo Filter implementation
Args:
capacity (int): The number of bins
bucket_size (int): The number of buckets per bin
max_swaps (int): The number of cuckoo swaps before stopping
expansion_rate (int): The rate at which to expand
auto_expand (bool): If the filter should automatically expand
finger_size (int): The size of the fingerprint to use in bytes \
(between 1 and 4); exported as 4 bytes; up to the user to \
reset the size correctly on import
filename (str): The path to the file to load or None if no file
Returns:
CountingCuckooFilter: A Cuckoo Filter object"""
__slots__ = ("__unique_elements",)
def __init__(
self,
capacity: int = 10000,
bucket_size: int = 4,
max_swaps: int = 500,
expansion_rate: int = 2,
auto_expand: bool = True,
finger_size: int = 4,
filepath: Union[str, Path, None] = None,
hash_function: Union[SimpleHashT, None] = None,
) -> None:
"""setup the data structure"""
self.__unique_elements = 0
super().__init__(
capacity,
bucket_size,
max_swaps,
expansion_rate,
auto_expand,
finger_size,
filepath,
hash_function,
)
__COUNTING_CUCKOO_FOOTER_STRUCT = Struct("II")
__BIN_STRUCT = Struct("II")
[docs]
@classmethod
def init_error_rate(
cls,
error_rate: float,
capacity: int = 10000,
bucket_size: int = 4,
max_swaps: int = 500,
expansion_rate: int = 2,
auto_expand: bool = True,
hash_function: Union[SimpleHashT, None] = None,
):
"""Initialize a simple Cuckoo Filter based on error rate
Args:
error_rate (float):
capacity (int): The number of bins
bucket_size (int): The number of buckets per bin
max_swaps (int): The number of cuckoo swaps before stopping
expansion_rate (int): The rate at which to expand
auto_expand (bool): If the filter should automatically expand
hash_function (function): Hashing strategy function to use `hf(key)`
Returns:
CuckooFilter: A Cuckoo Filter object"""
cku = CountingCuckooFilter(
capacity=capacity,
bucket_size=bucket_size,
auto_expand=auto_expand,
max_swaps=max_swaps,
expansion_rate=expansion_rate,
hash_function=hash_function,
)
cku._set_error_rate(error_rate)
return cku
[docs]
@classmethod
def load_error_rate(
cls, error_rate: float, filepath: Union[str, Path], hash_function: Union[SimpleHashT, None] = None
):
"""Initialize a previously exported Cuckoo Filter based on error rate
Args:
error_rate (float):
filepath (str): The path to the file to load or None if no file
hash_function (function): Hashing strategy function to use \
`hf(key)`
Returns:
CuckooFilter: A Cuckoo Filter object
"""
filepath = resolve_path(filepath)
cku = CountingCuckooFilter(filepath=filepath, hash_function=hash_function)
cku._set_error_rate(error_rate)
return cku
[docs]
@classmethod
def frombytes(
cls, b: ByteString, error_rate: Union[float, None] = None, hash_function: Union[SimpleHashT, None] = None
) -> "CountingCuckooFilter":
"""
Args:
b (ByteString): The bytes to load as a Expanding Bloom Filter
error_rate (float): The error rate of the cuckoo filter, if used to generate the original filter
hash_function (function): Hashing strategy function to use `hf(key, number)`
Returns:
CountingCuckooFilter: A Bloom Filter object"""
cku = CountingCuckooFilter(hash_function=hash_function)
cku._load(b)
# if error rate is provided, use it
cku._set_error_rate(error_rate)
return cku
def __contains__(self, val: KeyT) -> bool:
"""setup the `in` keyword"""
if self.check(val) > 0:
return True
return False
@property
def unique_elements(self) -> int:
"""int: unique number of elements inserted"""
return self.__unique_elements
@property
def buckets(self) -> List[List["CountingCuckooBin"]]: # type: ignore
"""list(list): The buckets holding the fingerprints
Note:
Not settable"""
return self._buckets
[docs]
def load_factor(self) -> float:
"""float: How full the Cuckoo Filter is currently"""
return self.unique_elements / (self.capacity * self.bucket_size)
[docs]
def add(self, key: KeyT) -> None:
"""Add element key to the filter
Args:
key (str): The element to add
Raises:
CuckooFilterFullError: When element not inserted after maximum number of swaps or 'kicks'"""
idx_1, idx_2, fingerprint = self._generate_fingerprint_info(key)
is_present = self._check_if_present(idx_1, idx_2, fingerprint)
if is_present is not None:
for bucket in self.buckets[is_present]:
if fingerprint in bucket:
bucket.increment()
self._inserted_elements += 1
return
finger = self._insert_fingerprint_alt(fingerprint, idx_1, idx_2)
self._deal_with_insertion(finger)
[docs]
def check(self, key: KeyT) -> int: # type: ignore
"""Check if an element is in the filter
Args:
key (str): Element to check
Returns:
int: The number of times inserted into the filter"""
idx_1, idx_2, fingerprint = self._generate_fingerprint_info(key)
is_present = self._check_if_present(idx_1, idx_2, fingerprint)
val = 0
if is_present is not None:
# get the count out!
for bucket in self.buckets[is_present]:
if fingerprint in bucket:
val = bucket.count
break
return val
[docs]
def remove(self, key: KeyT) -> bool:
"""Remove an element from the filter
Args:
key (str): Element to remove"""
idx_1, idx_2, fingerprint = self._generate_fingerprint_info(key)
idx = self._check_if_present(idx_1, idx_2, fingerprint)
if idx is None:
return False
for bucket in self.buckets[idx]:
if fingerprint in bucket:
bucket.decrement()
self._inserted_elements -= 1
if bucket.count == 0:
self.buckets[idx].remove(bucket)
self.__unique_elements -= 1
return True
return False # catch this...
[docs]
def expand(self):
"""Expand the cuckoo filter"""
self._expand_logic(None)
[docs]
def export(self, file: Union[Path, str, IOBase, mmap]) -> None:
"""Export cuckoo filter to file
Args:
file (str): Path to file to export"""
if not isinstance(file, (IOBase, mmap)):
file = resolve_path(file)
with open(file, "wb") as filepointer:
self.export(filepointer) # type:ignore
else:
self.__bucket_decomposition(self.buckets, self.bucket_size).tofile(file)
# now put out the required information at the end
file.write(self.__COUNTING_CUCKOO_FOOTER_STRUCT.pack(self.bucket_size, self.max_swaps))
def _insert_fingerprint_alt(
self, fingerprint: int, idx_1: int, idx_2: int, count: int = 1
) -> Union["CountingCuckooBin", None]:
"""insert a fingerprint, but with a count parameter!"""
if self.__insert_element(fingerprint, idx_1, count):
self._inserted_elements += 1
self.__unique_elements += 1
return None
if self.__insert_element(fingerprint, idx_2, count):
self._inserted_elements += 1
self.__unique_elements += 1
return None
# we didn't insert, so now we need to randomly select one index to use
# and move things around to the other index, if possible, until we
# either move everything around or hit the maximum number of swaps
idx = random.choice([idx_1, idx_2])
prv_bin = CountingCuckooBin(fingerprint, 1)
for _ in range(self.max_swaps):
# select one element to be swapped out...
swap_elm = random.randint(0, self.bucket_size - 1)
swap_finger = self.buckets[idx][swap_elm]
prv_bin, self.buckets[idx][swap_elm] = swap_finger, prv_bin
# now find another place to put this fingerprint
index_1, index_2 = self._indicies_from_fingerprint(prv_bin.finger)
idx = index_2 if idx == index_1 else index_1
if self.__insert_element(prv_bin.finger, idx, prv_bin.count):
self._inserted_elements += 1
self.__unique_elements += 1
return None
# if we got here we have an error... we might need to know what is left
return prv_bin
def _check_if_present(self, idx_1: int, idx_2: int, fingerprint: int) -> Union[int, None]:
"""wrapper for checking if fingerprint is already inserted"""
if fingerprint in [x.finger for x in self.buckets[idx_1]]:
return idx_1
if fingerprint in [x.finger for x in self.buckets[idx_2]]:
return idx_2
return None
def _load(self, file: Union[Path, str, IOBase, mmap, bytes, ByteString]) -> None:
"""load a cuckoo filter from file"""
if not isinstance(file, (IOBase, mmap, bytes, bytearray, memoryview)):
file = resolve_path(file)
with MMap(file) as filepointer:
self._load(filepointer)
else:
self._parse_footer(file, self.__COUNTING_CUCKOO_FOOTER_STRUCT) # type: ignore
self._inserted_elements = 0
self._parse_buckets(file) # type: ignore
def _parse_buckets(self, d: ByteString) -> None:
"""Parse bytes to pull out and set the buckets"""
bin_size = self.__BIN_STRUCT.size
self._cuckoo_capacity = (len(bytes(d)) - bin_size) // bin_size // self.bucket_size
start = 0
end = bin_size
self._buckets = []
for i in range(self.capacity):
self.buckets.append([])
for _ in range(self.bucket_size):
finger, count = self.__BIN_STRUCT.unpack(bytes(d[start:end]))
if finger > 0:
ccb = CountingCuckooBin(finger, count)
self.buckets[i].append(ccb)
self._inserted_elements += count
self.__unique_elements += 1
start = end
end += bin_size
def _expand_logic(self, extra_fingerprint: "CountingCuckooBin") -> None:
"""the logic to acutally expand the cuckoo filter"""
# get all the fingerprints
fingerprints = self._setup_expand(extra_fingerprint)
self.__unique_elements = 0 # this needs to be reset!
for elm in fingerprints:
idx_1, idx_2 = self._indicies_from_fingerprint(elm.finger)
res = self._insert_fingerprint_alt(elm.finger, idx_1, idx_2, elm.count)
if res is not None: # again, this *shouldn't* happen
msg = "The CountingCuckooFilter failed to expand"
raise CuckooFilterFullError(msg)
def __insert_element(self, fingerprint, idx, count=1) -> bool:
"""insert an element"""
if len(self.buckets[idx]) < self.bucket_size:
self.buckets[idx].append(CountingCuckooBin(fingerprint, count))
return True
return False
@staticmethod
def __bucket_decomposition(buckets, bucket_size: int) -> array:
"""convert a list of buckets into a single array for export"""
arr = array("I")
for bucket in buckets:
for buck in bucket:
arr.extend(buck.get_array())
leftover = bucket_size - len(bucket)
arr.fromlist([0 for _ in range(leftover * 2)])
return arr
class CountingCuckooBin:
"""A container class for the counting cuckoo filter"""
# keep it lightweight
__slots__ = ["__bin"]
def __init__(self, fingerprint: int, count: int) -> None:
"""init"""
self.__bin = array("I", [fingerprint, count])
def __contains__(self, val: int) -> bool:
"""setup the `in` construct"""
return self.__bin[0] == val
def get_array(self):
"""return the array implementation"""
return self.__bin
@property
def finger(self) -> int:
"""fingerprint property"""
return self.__bin[0]
@property
def count(self) -> int:
"""count property"""
return self.__bin[1]
def __repr__(self) -> str:
"""how do we represent this?"""
return self.__str__()
def __str__(self) -> str:
"""convert it into a string"""
return f"(fingerprint:{self.__bin[0]} count:{self.__bin[1]})"
def increment(self) -> int:
"""increment"""
self.__bin[1] += 1
return self.__bin[1]
def decrement(self) -> int:
"""decrement"""
self.__bin[1] -= 1
return self.__bin[1]