Big Tech Interview: Implementations of Alex Xu’s System Design Algorithms (Python)— Part 1
Alex Xu shared an awesome guide on the most important concepts to learn if you want to ace your System Design Interview. But he just left it without going deep into how it's implemented. Without the implementation, it's very difficult to appreciate these concepts' beauty. I have gone ahead and sourced the implementations with a handy test to demonstrate each of the concepts.
QuadTree
class Point:
def __init__(self, x, y):
self.x = x
self.y = y
def __repr__(self):
return f'({self.x}, {self.y})'
def __str__(self):
return self.__repr__()
class Rectangle:
def __init__(self, x, y, w, h):
self.x = x
self.y = y
self.w = w
self.h = h
def contains(self, point):
return point.x >= self. x - self.w and \
point.x <= self.x + self.w and \
point.y >= self.y - self.h and \
point.y <= self.y + self.h
def intersects(self, range):
return not(
range.x - range.w > self.x + self.w or \
range.x + self.w < self.x - self.w or \
range.y - range.h > self.y + self.h or \
range.y + self.h < self.y - self.h
)
class QuadTree:
def __init__(self, boundary, capacity):
self.boundary = boundary
self.capacity = capacity
self.points = []
self.divided = False
def subdivide(self):
ne = Rectangle(
self.boundary.x + self.boundary.w / 2,
self.boundary.y - self.boundary.h / 2,
self.boundary.w / 2,
self.boundary.h / 2)
self.northeast = QuadTree(ne, self.capacity)
nw = Rectangle(
self.boundary.x - self.boundary.w / 2,
self.boundary.y - self.boundary.h / 2,
self.boundary.w / 2,
self.boundary.h / 2)
self.northwest = QuadTree(nw, self.capacity)
se = Rectangle(
self.boundary.x + self.boundary.w / 2,
self.boundary.y + self.boundary.h / 2,
self.boundary.w / 2,
self.boundary.h / 2)
self.southeast = QuadTree(se, self.capacity)
sw = Rectangle(
self.boundary.x - self.boundary.w / 2,
self.boundary.y + self.boundary.h / 2,
self.boundary.w / 2,
self.boundary.h / 2)
self.southwest = QuadTree(sw, self.capacity)
self.divided = True
def insert(self, point):
if not self.boundary.contains(point):
return
if len(self.points) < self.capacity:
self.points.append(point)
else:
if not self.divided:
self.subdivide()
self.northeast.insert(point)
self.northwest.insert(point)
self.southeast.insert(point)
self.southwest.insert(point)
def query(self, range):
found = []
if not self.boundary.intersects(range):
return found
else:
for point in self.points:
if range.contains(point):
found.append(point)
if self.divided:
ne_found = self.northeast.query(range)
nw_found = self.northwest.query(range)
se_found = self.southeast.query(range)
sw_found = self.southwest.query(range)
for point in ne_found:
found.append(point)
for point in nw_found:
found.append(point)
for point in se_found:
found.append(point)
for point in sw_found:
found.append(point)
return found
def print(self, tabs=0):
add_tabs = "".join(["\t" for _ in range(tabs)])
print(f'{add_tabs}{self.points}')
if self.divided:
if self.northeast.points:
self.northeast.print(tabs=tabs+1)
if self.northwest.points:
self.northwest.print(tabs=tabs+1)
if self.southeast.points:
self.southeast.print(tabs=tabs+1)
if self.southwest.points:
self.southwest.print(tabs=tabs+1)
Testing
from quadtree import QuadTree, Point, Rectangle
import random
boundary = Rectangle(200, 200, 200, 200)
quad_tree = QuadTree(boundary, 4)
print(quad_tree)
for i in range(50):
p = Point(random.randint(0, 200), random.randint(0, 200))
quad_tree.insert(p)
quad_tree.print()
range = Rectangle(250, 250, 107, 75)
print(quad_tree.query(range))
Consistent Hashing
from hashlib import sha256
from bisect import bisect, bisect_left
class ConsistentHashing(object):
def hash_fn(self, key: str, total_slots: int) -> int:
hsh = sha256()
hsh.update(bytes(key.encode('utf-8')))
return int(hsh.hexdigest(), 16) % total_slots
def add_node(self, node) -> int:
if len(self._keys) == self.total_slots:
raise Exception("hash space is full")
key = hash_fn(node.host, self.total_slots)
index = bisect(self._keys, key)
if index > 0 and self._keys[index - 1] == key:
raise Exception("collision occured")
self.nodes.insert(index, node)
self._keys.insert(index, key)
return key
def remove_node(self, node) -> int:
if (len(self._keys)) == 0:
raise Exception("hash space is empty")
key = hash_fn(node.host, self.total_slots)
index = bisect_left(self._keys, key)
if index >= len(self._keys) or self._keys[index] != key:
raise Exception("node does not exist")
self._keys.pop(index)
self.nodes.pop(index)
return key
def assign(self, item: str) -> str:
key = hash_fn(item, self.total_slots)
index = bisect_left(self._keys, key)
return self.nodes[index]
Testing
from hashlib import sha256
hsh = sha256()
hsh.update('abc'.encode('utf-8'))
x = hsh.hexdigest()
y = int(x, 16)
z = y % 10
print(f'String: abc')
print(f'Total Slots: 10')
print(f"Hex Digest: {x}")
print(f"Int Conversion: {y}")
print(f"Index: {z}")
Leaky Bucket
class LeakyBucket(object):
def __init__(self, bucket, time_unit, forward_callback, drop_callback):
self.bucket = bucket
self.token = 0
self.time_unit = time_unit
self.forward_callback = forward_callback
self.drop_callback = drop_callback
def handle(self, packet):
size_left = self.bucket - self.token
if size_left > 0:
self.token += 1
self.forward_callback(packet)
self.token -= 1
else:
self.drop_callback(packet)
Testing
from leakybucket import LeakyBucket
import time
def forward(packet):
print(f'Packet Forwarded: {packet}')
def drop(packet):
print(f'Packet Dropped: {packet}')
throttle = LeakyBucket(1, 1, forward, drop)
packet = 0
while True:
time.sleep(0.2)
throttle.handle(packet)
packet += 1
Token Bucket
import time
class TokenBucket:
def __init__(self, tokens, time_unit, forward_callback, drop_callback):
self.tokens = tokens
self.time_unit = time_unit
self.forward_callback = forward_callback
self.drop_callback = drop_callback
self.bucket = tokens
self.last_check = time.time()
def handle(self, packet):
current = time.time()
time_passed = current - self.last_check
self.last_check = current
self.bucket = self.bucket + \
time_passed * (self.tokens / self.time_unit)
if self.bucket > self.tokens:
self.bucket = self.tokens
if self.bucket < 1:
self.drop_callback(packet)
else:
self.bucket = self.bucket - 1
self.forward_callback(packet)
Testing
from tokenbucket import TokenBucket
import time
def forward(packet):
print(f'Packet Forwarded: {packet}')
def drop(packet):
print(f'Packet Dropped: {packet}')
throttle = TokenBucket(1, 1, forward, drop)
packet = 0
while True:
time.sleep(0.2)
throttle.handle(packet)
packet += 1
Bloom Filter
import math
import mmh3
from bitarray import bitarray
class BloomFilter(object):
def __init__(self, items_count, fp_prod):
self.fp_prod = fp_prod
self.size = self.get_size(items_count, fp_prod)
self.hash_count = self.get_hash_count(self.size, items_count)
self.bit_array = bitarray(self.size)
self.bit_array.setall(0)
def add(self, item):
digests = []
for i in range(self.hash_count):
digest = mmh3.hash(item, i) % self.size
digests.append(digest)
self.bit_array[digest] = True
def check(self, item):
for i in range(self.hash_count):
digest = mmh3.hash(item, i) % self.size
if self.bit_array[digest] == False:
return False
return True
def get_size(self, n, p):
m = -(n * math.log(p))/(math.log(2)**2)
return int(m)
def get_hash_count(self, m, n):
k = (m/n) * math.log(2)
return int(k)
Testing
from bloomfilter import BloomFilter
from random import shuffle
n = 20
p = 0.05
bloom_filter =BloomFilter(n, p)
print("Size of bit array: {}".format(bloom_filter.size))
print("False positive Probability: {}".format(bloom_filter.fp_prod))
print("Number of hash functions: {}".format(bloom_filter.hash_count))
word_present = ['abound', 'colorful']
word_absent = ['bluff']
for item in word_present:
bloom_filter.add(item)
shuffle(word_present)
shuffle(word_absent)
for word in word_absent:
if bloom_filter.check(word):
print("'{}' is present!".format(word))
else:
print("'{}' is not present".format(word))
Merkle Tree
from typing import List
import hashlib
class Node:
def __init__(self, left, right, value: str, content, is_copied=False):
self.left: Node = left
self.right: Node = right
self.value = value
self.content = content
self.is_copied = is_copied
def hash(val: str) -> str:
return hashlib.sha256(val.encode('utf-8')).hexdigest()
def __str__(self):
return (str(self.value))
def copy(self):
return Node(self.left, self.right, self.value, self.content, True)
class MerkleTree:
def __init__(self, values: List[str]):
self.__buildTree(values)
def __buildTree(self, values: List[str]) -> None:
leaves: List[Node] = [Node(None, None, Node.hash(e), e) for e in values]
if len(leaves) % 2 == 1:
leaves.append(leaves[-1].copy())
self.root: Node = self.__buildTreeRec(leaves)
def __buildTreeRec(self, nodes: List[Node]) -> Node:
if len(nodes) % 2 == 1:
nodes.append(nodes[-1].copy())
half: int = len(nodes) // 2
if len(nodes) == 2:
return Node(nodes[0], nodes[1], Node.hash(nodes[0].value + nodes[0].value), nodes[0].content + " + " + nodes[1].content)
left: Node = self.__buildTreeRec(nodes[:half])
right: Node = self.__buildTreeRec(nodes[half:])
value: str = Node.hash(left.value + right.value)
content: str = f'{left.content} + {right.content}'
return Node(left, right, value, content)
def printTree(self) -> None:
self.__printTreeRec(self.root)
def __printTreeRec(self, node: Node) -> None:
if node != None:
if node.left != None:
print("Left: " + str(node.left))
print("Right: " + str(node.right))
else:
print("Input")
if node.is_copied:
print('(Padding)')
print("Value: " + str(node.value))
print("Content: " + str(node.content))
print("")
self.__printTreeRec(node.left)
self.__printTreeRec(node.right)
def getRootHash(self) -> str:
return self.root.value
Testing
from merkletree import MerkleTree
elems = ["GforG", "A", "Computer", "Science"]
print("Inputs: ")
print(*elems, sep=" | ")
mktree = MerkleTree(elems)
mktree.printTree()
Count-min Sketch
import math
import mmh3
from bitarray import bitarray
class CountMinSketch(object):
def __init__(self, items_count, fp_prod):
self.fp_prod = fp_prod
self.size = self.get_size(items_count, fp_prod)
self.hash_count = self.get_hash_count(self.size, items_count)
self.count_arrays = [[0 for _ in range(self.size)] for _ in range(self.hash_count)]
def add(self, item):
for i in range(self.hash_count):
digest = mmh3.hash(item, i) % self.size
self.count_arrays[i][digest] += 1
def getFrequency(self, item):
result = 1000000000
for i in range(self.hash_count):
digest = mmh3.hash(item, i) % self.size
result = min(result, self.count_arrays[i][digest])
return result if result < 1000000000 else 0
def get_size(self, n, p):
m = -(n * math.log(p))/(math.log(2)**2)
return int(m)
def get_hash_count(self, m, n):
k = (m/n) * math.log(2)
return int(k)
Testing
from countminsketches import CountMinSketch
n = 20
p = 0.05
cms = CountMinSketch(n, p)
print("Size of bit array: {}".format(cms.size))
print("False positive Probability: {}".format(cms.fp_prod))
print("Number of hash functions: {}".format(cms.hash_count))
word_present = ['abound', 'colorful', 'abound', 'abound']
word_absent = ['bluff']
for item in word_present:
cms.add(item)
print("Word Frequencies")
print("--------------------")
for item in set(word_present):
print(f"{item}: {cms.getFrequency(item)}")
for item in set(word_absent):
print(f"{item}: {cms.getFrequency(item)}")