Big Tech Interview: Implementations of Alex Xu’s System Design Algorithms (Python)— Part 1

David De
6 min readFeb 24, 2023

--

Ace your FAANG System Design Interview. Source: https://www.pinterest.com/pin/useful--438819557446398927/

Alex Xu shared an awesome guide on the most important concepts to learn if you want to ace your System Design Interview. But he just left it without going deep into how it's implemented. Without the implementation, it's very difficult to appreciate these concepts' beauty. I have gone ahead and sourced the implementations with a handy test to demonstrate each of the concepts.

QuadTree

class Point:
def __init__(self, x, y):
self.x = x
self.y = y

def __repr__(self):
return f'({self.x}, {self.y})'

def __str__(self):
return self.__repr__()

class Rectangle:
def __init__(self, x, y, w, h):
self.x = x
self.y = y
self.w = w
self.h = h

def contains(self, point):
return point.x >= self. x - self.w and \
point.x <= self.x + self.w and \
point.y >= self.y - self.h and \
point.y <= self.y + self.h

def intersects(self, range):
return not(
range.x - range.w > self.x + self.w or \
range.x + self.w < self.x - self.w or \
range.y - range.h > self.y + self.h or \
range.y + self.h < self.y - self.h
)

class QuadTree:
def __init__(self, boundary, capacity):
self.boundary = boundary
self.capacity = capacity
self.points = []
self.divided = False

def subdivide(self):
ne = Rectangle(
self.boundary.x + self.boundary.w / 2,
self.boundary.y - self.boundary.h / 2,
self.boundary.w / 2,
self.boundary.h / 2)
self.northeast = QuadTree(ne, self.capacity)

nw = Rectangle(
self.boundary.x - self.boundary.w / 2,
self.boundary.y - self.boundary.h / 2,
self.boundary.w / 2,
self.boundary.h / 2)
self.northwest = QuadTree(nw, self.capacity)

se = Rectangle(
self.boundary.x + self.boundary.w / 2,
self.boundary.y + self.boundary.h / 2,
self.boundary.w / 2,
self.boundary.h / 2)
self.southeast = QuadTree(se, self.capacity)

sw = Rectangle(
self.boundary.x - self.boundary.w / 2,
self.boundary.y + self.boundary.h / 2,
self.boundary.w / 2,
self.boundary.h / 2)
self.southwest = QuadTree(sw, self.capacity)
self.divided = True

def insert(self, point):
if not self.boundary.contains(point):
return

if len(self.points) < self.capacity:
self.points.append(point)
else:
if not self.divided:
self.subdivide()

self.northeast.insert(point)
self.northwest.insert(point)
self.southeast.insert(point)
self.southwest.insert(point)

def query(self, range):
found = []
if not self.boundary.intersects(range):
return found
else:
for point in self.points:
if range.contains(point):
found.append(point)

if self.divided:
ne_found = self.northeast.query(range)
nw_found = self.northwest.query(range)
se_found = self.southeast.query(range)
sw_found = self.southwest.query(range)

for point in ne_found:
found.append(point)
for point in nw_found:
found.append(point)
for point in se_found:
found.append(point)
for point in sw_found:
found.append(point)

return found

def print(self, tabs=0):
add_tabs = "".join(["\t" for _ in range(tabs)])
print(f'{add_tabs}{self.points}')

if self.divided:
if self.northeast.points:
self.northeast.print(tabs=tabs+1)
if self.northwest.points:
self.northwest.print(tabs=tabs+1)
if self.southeast.points:
self.southeast.print(tabs=tabs+1)
if self.southwest.points:
self.southwest.print(tabs=tabs+1)

Testing

from quadtree import QuadTree, Point, Rectangle
import random

boundary = Rectangle(200, 200, 200, 200)
quad_tree = QuadTree(boundary, 4)
print(quad_tree)

for i in range(50):
p = Point(random.randint(0, 200), random.randint(0, 200))
quad_tree.insert(p)

quad_tree.print()

range = Rectangle(250, 250, 107, 75)
print(quad_tree.query(range))

Consistent Hashing

from hashlib import sha256
from bisect import bisect, bisect_left

class ConsistentHashing(object):

def hash_fn(self, key: str, total_slots: int) -> int:
hsh = sha256()

hsh.update(bytes(key.encode('utf-8')))

return int(hsh.hexdigest(), 16) % total_slots

def add_node(self, node) -> int:
if len(self._keys) == self.total_slots:
raise Exception("hash space is full")

key = hash_fn(node.host, self.total_slots)

index = bisect(self._keys, key)

if index > 0 and self._keys[index - 1] == key:
raise Exception("collision occured")

self.nodes.insert(index, node)
self._keys.insert(index, key)

return key

def remove_node(self, node) -> int:

if (len(self._keys)) == 0:
raise Exception("hash space is empty")

key = hash_fn(node.host, self.total_slots)

index = bisect_left(self._keys, key)

if index >= len(self._keys) or self._keys[index] != key:
raise Exception("node does not exist")

self._keys.pop(index)
self.nodes.pop(index)

return key

def assign(self, item: str) -> str:
key = hash_fn(item, self.total_slots)

index = bisect_left(self._keys, key)

return self.nodes[index]

Testing

from hashlib import sha256

hsh = sha256()

hsh.update('abc'.encode('utf-8'))

x = hsh.hexdigest()
y = int(x, 16)
z = y % 10

print(f'String: abc')
print(f'Total Slots: 10')
print(f"Hex Digest: {x}")
print(f"Int Conversion: {y}")
print(f"Index: {z}")

Leaky Bucket

class LeakyBucket(object):
def __init__(self, bucket, time_unit, forward_callback, drop_callback):
self.bucket = bucket
self.token = 0
self.time_unit = time_unit
self.forward_callback = forward_callback
self.drop_callback = drop_callback

def handle(self, packet):
size_left = self.bucket - self.token
if size_left > 0:
self.token += 1
self.forward_callback(packet)
self.token -= 1
else:
self.drop_callback(packet)

Testing

from leakybucket import LeakyBucket
import time

def forward(packet):
print(f'Packet Forwarded: {packet}')

def drop(packet):
print(f'Packet Dropped: {packet}')

throttle = LeakyBucket(1, 1, forward, drop)

packet = 0

while True:
time.sleep(0.2)
throttle.handle(packet)
packet += 1

Token Bucket

import time

class TokenBucket:

def __init__(self, tokens, time_unit, forward_callback, drop_callback):
self.tokens = tokens
self.time_unit = time_unit
self.forward_callback = forward_callback
self.drop_callback = drop_callback
self.bucket = tokens
self.last_check = time.time()

def handle(self, packet):
current = time.time()
time_passed = current - self.last_check
self.last_check = current

self.bucket = self.bucket + \
time_passed * (self.tokens / self.time_unit)

if self.bucket > self.tokens:
self.bucket = self.tokens

if self.bucket < 1:
self.drop_callback(packet)
else:
self.bucket = self.bucket - 1
self.forward_callback(packet)

Testing

from tokenbucket import TokenBucket
import time

def forward(packet):
print(f'Packet Forwarded: {packet}')

def drop(packet):
print(f'Packet Dropped: {packet}')

throttle = TokenBucket(1, 1, forward, drop)

packet = 0

while True:
time.sleep(0.2)
throttle.handle(packet)
packet += 1

Bloom Filter

import math
import mmh3
from bitarray import bitarray

class BloomFilter(object):

def __init__(self, items_count, fp_prod):
self.fp_prod = fp_prod

self.size = self.get_size(items_count, fp_prod)

self.hash_count = self.get_hash_count(self.size, items_count)

self.bit_array = bitarray(self.size)

self.bit_array.setall(0)

def add(self, item):
digests = []
for i in range(self.hash_count):

digest = mmh3.hash(item, i) % self.size
digests.append(digest)

self.bit_array[digest] = True

def check(self, item):
for i in range(self.hash_count):
digest = mmh3.hash(item, i) % self.size
if self.bit_array[digest] == False:
return False

return True

def get_size(self, n, p):
m = -(n * math.log(p))/(math.log(2)**2)
return int(m)

def get_hash_count(self, m, n):
k = (m/n) * math.log(2)
return int(k)

Testing

from bloomfilter import BloomFilter
from random import shuffle

n = 20
p = 0.05

bloom_filter =BloomFilter(n, p)
print("Size of bit array: {}".format(bloom_filter.size))
print("False positive Probability: {}".format(bloom_filter.fp_prod))
print("Number of hash functions: {}".format(bloom_filter.hash_count))

word_present = ['abound', 'colorful']

word_absent = ['bluff']

for item in word_present:
bloom_filter.add(item)

shuffle(word_present)
shuffle(word_absent)

for word in word_absent:
if bloom_filter.check(word):
print("'{}' is present!".format(word))
else:
print("'{}' is not present".format(word))

Merkle Tree

from typing import List
import hashlib

class Node:

def __init__(self, left, right, value: str, content, is_copied=False):
self.left: Node = left
self.right: Node = right
self.value = value
self.content = content
self.is_copied = is_copied

def hash(val: str) -> str:
return hashlib.sha256(val.encode('utf-8')).hexdigest()

def __str__(self):
return (str(self.value))

def copy(self):
return Node(self.left, self.right, self.value, self.content, True)

class MerkleTree:
def __init__(self, values: List[str]):
self.__buildTree(values)

def __buildTree(self, values: List[str]) -> None:

leaves: List[Node] = [Node(None, None, Node.hash(e), e) for e in values]
if len(leaves) % 2 == 1:
leaves.append(leaves[-1].copy())
self.root: Node = self.__buildTreeRec(leaves)

def __buildTreeRec(self, nodes: List[Node]) -> Node:
if len(nodes) % 2 == 1:
nodes.append(nodes[-1].copy())

half: int = len(nodes) // 2

if len(nodes) == 2:
return Node(nodes[0], nodes[1], Node.hash(nodes[0].value + nodes[0].value), nodes[0].content + " + " + nodes[1].content)

left: Node = self.__buildTreeRec(nodes[:half])
right: Node = self.__buildTreeRec(nodes[half:])
value: str = Node.hash(left.value + right.value)
content: str = f'{left.content} + {right.content}'
return Node(left, right, value, content)

def printTree(self) -> None:
self.__printTreeRec(self.root)

def __printTreeRec(self, node: Node) -> None:
if node != None:
if node.left != None:
print("Left: " + str(node.left))
print("Right: " + str(node.right))
else:
print("Input")

if node.is_copied:
print('(Padding)')

print("Value: " + str(node.value))
print("Content: " + str(node.content))
print("")
self.__printTreeRec(node.left)
self.__printTreeRec(node.right)

def getRootHash(self) -> str:
return self.root.value

Testing

from merkletree import MerkleTree

elems = ["GforG", "A", "Computer", "Science"]

print("Inputs: ")
print(*elems, sep=" | ")
mktree = MerkleTree(elems)
mktree.printTree()

Count-min Sketch

import math
import mmh3
from bitarray import bitarray

class CountMinSketch(object):
def __init__(self, items_count, fp_prod):
self.fp_prod = fp_prod

self.size = self.get_size(items_count, fp_prod)

self.hash_count = self.get_hash_count(self.size, items_count)

self.count_arrays = [[0 for _ in range(self.size)] for _ in range(self.hash_count)]

def add(self, item):
for i in range(self.hash_count):
digest = mmh3.hash(item, i) % self.size
self.count_arrays[i][digest] += 1

def getFrequency(self, item):
result = 1000000000
for i in range(self.hash_count):
digest = mmh3.hash(item, i) % self.size
result = min(result, self.count_arrays[i][digest])
return result if result < 1000000000 else 0

def get_size(self, n, p):
m = -(n * math.log(p))/(math.log(2)**2)
return int(m)

def get_hash_count(self, m, n):
k = (m/n) * math.log(2)
return int(k)

Testing

from countminsketches import CountMinSketch

n = 20
p = 0.05

cms = CountMinSketch(n, p)
print("Size of bit array: {}".format(cms.size))
print("False positive Probability: {}".format(cms.fp_prod))
print("Number of hash functions: {}".format(cms.hash_count))

word_present = ['abound', 'colorful', 'abound', 'abound']

word_absent = ['bluff']

for item in word_present:
cms.add(item)

print("Word Frequencies")
print("--------------------")
for item in set(word_present):
print(f"{item}: {cms.getFrequency(item)}")

for item in set(word_absent):
print(f"{item}: {cms.getFrequency(item)}")

--

--

David De
David De

No responses yet