Python Backend Interview Handbook

A senior engineer teaching another engineer how to crack backend interviews. Not a Python encyclopedia. Not competitive programming. Optimized for: interview signal, production thinking, and real backend engineering.


Table of Contents


Part 1: Core Python for Backend Interviews


1.1 Python Data Model & Memory

Level 1 — Must Know

Python passes everything by object reference — not by value, not by C++ reference. Understanding this prevents the most common class of Python bugs.

# ❌ Mutable default argument — the #1 Python gotcha
def append_item(item, lst=[]):  # lst is created ONCE at definition time
    lst.append(item)
    return lst
 
append_item(1)  # [1]
append_item(2)  # [1, 2] — bug! shared across all calls
 
# ✅ Correct pattern
def append_item(item, lst=None):
    if lst is None:
        lst = []
    lst.append(item)
    return lst
# is vs ==
a = [1, 2, 3]
b = a           # same object
c = [1, 2, 3]   # equal but different object
 
a == c   # True  — equality
a is c   # False — identity
a is b   # True  — same object

Reference counting + cyclic GC — Python frees memory when refcount hits 0. Cycles handled by garbage collector.

Level 2 — Strong Candidate Knowledge

Interning: Python interns small integers (-5 to 256) and short strings. This is why a is b can be True for small integers without explicit assignment.

a = 256; b = 256; a is b  # True  (interned)
a = 257; b = 257; a is b  # False (not interned) — watch out!

__slots__ — Eliminate __dict__ per-instance overhead for high-volume objects.

class Point:
    __slots__ = ['x', 'y']  # No __dict__ created per instance
    def __init__(self, x, y):
        self.x = x
        self.y = y
 
# 3–5x less memory than a regular class
# Use case: ORM row objects, graph nodes, DTOs at scale (1M+ instances)

Level 3 — Deep Internals

CPython uses reference counting + cyclic garbage collector. The GIL (Global Interpreter Lock) ensures only one thread executes Python bytecode at a time:

  • CPU-bound: threads don't give true parallelism → use multiprocessing
  • I/O-bound: threads work, asyncio is better for high concurrency

🎯 Interview Reality

What interviewers actually ask:

  • "Explain Python's memory model" → mention reference counting, GC, GIL
  • "Why is mutable default argument a bug?" → one object created at function definition time
  • "What's the difference between is and ==?" → identity vs equality

Strong candidate answer for mutable default: "The default argument is evaluated once when the function is defined, not each call. All calls share the same list object. Fix: use None as default, create a new list inside the function body."

Common trap: Saying Python is "pass by reference" — it's pass-by-object-reference (the reference itself is passed by value).

Follow-up: "When would you use __slots__?" "When I know attributes at class definition time and I'm creating millions of instances — like ORM row objects, graph nodes, or data transfer objects. Saves the per-instance __dict__ overhead."


⚙️ Production Thinking

# Memory optimization for bulk data processing
# Without __slots__: ~400 bytes/object
# With __slots__:    ~56  bytes/object
class LogEntry:
    __slots__ = ['timestamp', 'level', 'message', 'service']
 
# Processing 1M log entries: saves ~350MB RAM

1.2 Built-in Data Structures

Level 1 — Must Know

StructureBest forKey Complexity
listOrdered, mutable sequenceO(1) append, O(n) insert/search
dictKey-value, O(1) lookupO(1) avg get/set. Ordered since 3.7
setUniqueness, membership testsO(1) avg lookup
dequeQueue/stack with O(1) both endsO(1) appendleft/popleft
heapqPriority queueO(log n) push/pop
from collections import deque, defaultdict, Counter
import heapq
 
# deque — O(1) at both ends (use instead of list for queues)
q = deque([1, 2, 3])
q.appendleft(0)  # O(1) — list.insert(0, x) is O(n)!
q.popleft()      # O(1)
 
# Counter — frequency counting
c = Counter(['apple', 'banana', 'apple'])
c.most_common(2)  # [('apple', 2), ('banana', 1)]
 
# defaultdict — clean grouping, no KeyError
graph = defaultdict(list)
graph['a'].append('b')
 
# heapq — min-heap (negate values for max-heap)
heap = []
heapq.heappush(heap, 3)
heapq.heappush(heap, 1)
heapq.heappop(heap)  # 1 (min)

Level 2 — Strong Candidate Knowledge

# Use set not list for membership tests
VALID_STATUSES = {'active', 'inactive', 'pending'}  # O(1)
if user.status in VALID_STATUSES: ...               # vs list: O(n)
 
# LRU Cache using OrderedDict
from collections import OrderedDict
 
class LRUCache:
    def __init__(self, capacity: int):
        self.cache = OrderedDict()
        self.capacity = capacity
 
    def get(self, key: int) -> int:
        if key not in self.cache:
            return -1
        self.cache.move_to_end(key)  # Mark as recently used
        return self.cache[key]
 
    def put(self, key: int, value: int) -> None:
        if key in self.cache:
            self.cache.move_to_end(key)
        self.cache[key] = value
        if len(self.cache) > self.capacity:
            self.cache.popitem(last=False)  # Evict LRU

Level 3 — Deep Internals

CPython dict uses open-addressing hash table with load factor < 2/3 before rehashing (O(n) but amortized O(1) per insert). Dict keys must be hashable (immutable). Since Python 3.7, dicts maintain insertion order.


🎯 Interview Reality

  • "How would you count word frequency?" → Counter
  • "Implement a queue in Python" → deque with append/popleft
  • "Why is list.insert(0, x) slow?" → O(n) shift; use deque

Strong answer: "I'd use collections.deque. It's a doubly linked list — appendleft and popleft are O(1). A list is O(n) for front inserts because it has to shift all elements."

Follow-up: "When would you use OrderedDict now that dicts are ordered?" "Mostly for move_to_end() — useful for implementing LRU cache. Otherwise regular dict is fine."


1.3 Comprehensions & Generators

Level 1 — Must Know

# List comprehension — creates full list in memory
squares = [x**2 for x in range(10) if x % 2 == 0]
 
# Dict comprehension — transform dicts (common in APIs)
user_map = {user.id: user for user in users}
 
# Generator expression — lazy, constant memory
total = sum(x**2 for x in range(1_000_000))  # No list created!
 
# Rule: generator when iterating once over large data
#       list when you need random access, len(), or multiple passes
# Generator function — for streaming/pipeline patterns
def read_large_file(filepath):
    with open(filepath) as f:
        for line in f:
            yield line.strip()  # O(1) memory regardless of file size
 
# Process line by line — never loads full file
for line in read_large_file('large_log.txt'):
    process(line)

Level 2 — Strong Candidate Knowledge

# Generator pipelines — Unix pipe style
def parse_logs(lines):
    for line in lines:
        yield json.loads(line)
 
def filter_errors(records):
    for record in records:
        if record['level'] == 'ERROR':
            yield record
 
# Compose — processes one item at a time, O(1) memory
lines   = read_large_file('app.log')
records = parse_logs(lines)
errors  = filter_errors(records)
 
# yield from — delegate to sub-generator
def flatten(nested):
    for item in nested:
        if isinstance(item, list):
            yield from flatten(item)
        else:
            yield item

🎯 Interview Reality

  • "How do you process a file with 10M lines without loading it all?" → generator function
  • "What's the difference between [x for x in ...] and (x for x in ...)?" → list vs generator

Common trap: Using list comprehension inside sum() — always use generator expression:

sum(x**2 for x in ...)    # ✅ generator — no intermediate list
sum([x**2 for x in ...])  # ❌ creates full list first

⚙️ Production Thinking

Generators are the right tool for:

  • Processing multi-GB log files without OOM
  • Streaming API responses (StreamingResponse in FastAPI)
  • DB pagination — yield page by page, not all rows at once
  • ETL pipelines — parse → filter → transform → load

1.4 Decorators & Closures

Level 1 — Must Know

import functools
import time
 
# Decorator with arguments — three levels of nesting
def retry(max_attempts=3, exceptions=(Exception,)):
    def decorator(func):
        @functools.wraps(func)  # Preserves __name__, __doc__
        def wrapper(*args, **kwargs):
            for attempt in range(max_attempts):
                try:
                    return func(*args, **kwargs)
                except exceptions as e:
                    if attempt == max_attempts - 1:
                        raise
                    time.sleep(2 ** attempt)  # Exponential backoff
        return wrapper
    return decorator
 
@retry(max_attempts=3, exceptions=(ConnectionError, TimeoutError))
def call_external_api(url):
    return requests.get(url)

Level 2 — Strong Candidate Knowledge

# Closure — function that captures outer scope variables
def make_counter(start=0):
    count = start  # closed-over variable
 
    def increment():
        nonlocal count  # must declare to modify outer var
        count += 1
        return count
 
    return increment
 
counter = make_counter(10)
counter()  # 11
counter()  # 12
 
# Classic closure bug in loops
funcs = [lambda: i for i in range(3)]
[f() for f in funcs]   # [2, 2, 2] — all capture same i!
 
# Fix: capture by value via default argument
funcs = [lambda i=i: i for i in range(3)]
[f() for f in funcs]   # [0, 1, 2] ✅
# Class-based decorator — for stateful decorators
class RateLimiter:
    def __init__(self, max_calls, period):
        self.max_calls = max_calls
        self.period = period
        self.calls = []
 
    def __call__(self, func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            now = time.time()
            self.calls = [c for c in self.calls if now - c < self.period]
            if len(self.calls) >= self.max_calls:
                raise Exception("Rate limit exceeded")
            self.calls.append(now)
            return func(*args, **kwargs)
        return wrapper
 
@RateLimiter(max_calls=100, period=60)
def api_endpoint():
    pass

Level 3 — Deep Internals

Closures capture variables by reference, not value — the cell object in __closure__ stores the shared reference. This is why the loop variable capture bug happens.


🎯 Interview Reality

  • "Implement a retry decorator" → common Python exercise
  • "What does @functools.wraps do?" → preserves __name__, __doc__; without it func.__name__ returns 'wrapper'
  • "What's a closure?" → function that captures variables from its enclosing scope

Strong answer for retry: "A retry decorator wraps the function in a loop. On exception, it catches it, waits with exponential backoff, and retries. In production I'd add jitter to the backoff to avoid thundering herd when many clients retry simultaneously."


⚙️ Production Thinking

# Auth decorator pattern — common in FastAPI
def require_permission(permission: str):
    def decorator(func):
        @functools.wraps(func)
        async def wrapper(*args, **kwargs):
            request = kwargs.get('request') or args[0]
            if not request.user.has_permission(permission):
                raise HTTPException(403, "Insufficient permissions")
            return await func(*args, **kwargs)
        return wrapper
    return decorator
 
@router.delete("/users/{id}")
@require_permission("users:delete")
async def delete_user(id: int, request: Request):
    ...

1.5 OOP & Design Patterns

Level 1 — Must Know

from abc import ABC, abstractmethod
from dataclasses import dataclass, field
 
# Abstract Base Class — enforce interface contracts
class Repository(ABC):
    @abstractmethod
    def get(self, id: int): pass
 
    @abstractmethod
    def save(self, entity): pass
 
# Dataclass — use instead of plain dicts in APIs
@dataclass
class UserDTO:
    id: int
    name: str
    email: str
    roles: list[str] = field(default_factory=list)
    active: bool = True

@classmethod vs @staticmethod:

  • @classmethod receives the class as cls → use for alternative constructors: User.from_dict()
  • @staticmethod is a regular function namespaced to the class → use for utility functions

Level 2 — Strong Candidate Knowledge

# Strategy pattern — swap algorithms at runtime
class PricingStrategy(ABC):
    @abstractmethod
    def calculate(self, amount: float) -> float: ...
 
class RegularPricing(PricingStrategy):
    def calculate(self, amount): return amount
 
class DiscountPricing(PricingStrategy):
    def __init__(self, discount): self.discount = discount
    def calculate(self, amount): return amount * (1 - self.discount)
 
# Observer pattern — event-driven systems
class EventBus:
    def __init__(self):
        self._subscribers = defaultdict(list)
 
    def subscribe(self, event_type: str, handler):
        self._subscribers[event_type].append(handler)
 
    def publish(self, event_type: str, data):
        for handler in self._subscribers[event_type]:
            handler(data)
 
bus = EventBus()
bus.subscribe('user.created', send_welcome_email)
bus.subscribe('user.created', create_default_settings)
bus.publish('user.created', {'id': 1, 'email': 'a@b.com'})
# Repository pattern — decouple data access from business logic
class UserRepository(ABC):
    @abstractmethod
    async def get_by_id(self, user_id: int) -> Optional[User]: pass
 
class SQLUserRepository(UserRepository):
    def __init__(self, db: AsyncSession):
        self.db = db
 
    async def get_by_id(self, user_id: int) -> Optional[User]:
        result = await self.db.execute(select(User).where(User.id == user_id))
        return result.scalar_one_or_none()
 
class MockUserRepository(UserRepository):
    def __init__(self): self._users = {}
 
    async def get_by_id(self, user_id: int) -> Optional[User]:
        return self._users.get(user_id)
 
# In tests, inject MockUserRepository — no DB needed

🎯 Interview Reality

  • "@classmethod vs @staticmethod?" → classmethod gets cls (factory methods); staticmethod gets nothing (utility)
  • "How would you design a plugin system?" → ABC + registry
  • "Explain Repository pattern" → decouple business logic from data access; makes mocking trivial

Strong answer: "I'd use the Repository pattern. The service layer depends on the abstract interface. In tests I inject a MockRepository — no DB connection needed. In production I inject the SQLAlchemy implementation. This makes unit tests fast and lets me swap the storage layer."


1.6 Error Handling

Level 1 — Must Know

# Custom exceptions — always define for your domain
class AppError(Exception):
    """Base exception"""
    pass
 
class NotFoundError(AppError):
    def __init__(self, resource: str, id):
        self.resource = resource
        self.id = id
        super().__init__(f"{resource} with id={id} not found")
 
class ValidationError(AppError):
    def __init__(self, field: str, message: str):
        self.field = field
        super().__init__(f"Validation failed on {field}: {message}")
 
# Context manager for resource management
from contextlib import contextmanager
 
@contextmanager
def transaction(db):
    try:
        yield db
        db.commit()
    except Exception:
        db.rollback()
        raise

Level 2 — Strong Candidate Knowledge

# Exception chaining — preserve original cause
try:
    result = external_api.call()
except requests.RequestException as e:
    raise ServiceUnavailableError("Payment service down") from e
    # Original traceback preserved in __cause__

⚙️ Production Thinking

# Global exception handler in FastAPI — map domain errors to HTTP
@app.exception_handler(NotFoundError)
async def not_found_handler(request: Request, exc: NotFoundError):
    return JSONResponse(
        status_code=404,
        content={"error": "NOT_FOUND", "message": str(exc), "resource": exc.resource}
    )
 
# Clean separation: domain raises domain errors,
# HTTP layer maps them to HTTP responses

1.7 Type Hints & Typing

Level 1 — Must Know

from typing import Optional, Union, TypeVar
 
T = TypeVar('T')
 
def get_user(user_id: int) -> Optional[User]: ...
 
def first(items: list[T]) -> Optional[T]:
    return items[0] if items else None

Level 2 — Strong Candidate Knowledge

from typing import Protocol, TypedDict, Literal
 
# Protocol — structural subtyping (duck typing + type safety)
class Saveable(Protocol):
    def save(self) -> None: ...
 
# TypedDict — typed dicts for JSON structures
class UserResponse(TypedDict):
    id: int
    name: str
    email: str
 
# Literal — constrain to specific values
Status = Literal['active', 'inactive', 'banned']
def update_status(user_id: int, status: Status) -> None: ...

🎯 Interview Reality

"What's the difference between Protocol vs ABC?" "Protocol enables structural subtyping — if an object has the right methods, it satisfies the Protocol without explicit inheritance. This is more flexible for third-party libraries. ABCs require explicit inheritance. I prefer Protocol for defining interfaces between services."


Part 2: DSA for Backend Interviews

The Goal: Not competitive programming. Pattern recognition and clear communication under interview pressure.


2.1 Arrays & HashMaps

Pattern Recognition

Arrays + HashMaps cover ~60% of backend interview problems.

Core insight: When you need O(1) lookup, complement, or frequency — reach for a hashmap.

# Pattern 1: Two Sum — hashmap for O(n) complement lookup
def two_sum(nums: list[int], target: int) -> list[int]:
    seen = {}  # value -> index
    for i, num in enumerate(nums):
        complement = target - num
        if complement in seen:
            return [seen[complement], i]
        seen[num] = i
    return []
# Time: O(n)  Space: O(n)
# Pattern 2: Frequency map — top-K, duplicates, anagrams
from collections import Counter
 
def top_k_frequent(nums: list[int], k: int) -> list[int]:
    return [x for x, _ in Counter(nums).most_common(k)]
 
# Pattern 3: Group Anagrams — canonical key for grouping
def group_anagrams(strs: list[str]) -> list[list[str]]:
    groups = defaultdict(list)
    for s in strs:
        key = tuple(sorted(s))  # Canonical form
        groups[key].append(s)
    return list(groups.values())

🎯 Interview Reality

Approach for array problems:

  1. Clarify: sorted? duplicates allowed? return indices or values?
  2. State O(n²) brute force first
  3. Optimize: "I'll trade O(n) space for O(n) time with a hashmap"
  4. Code, then test with edge cases

Common traps:

  • Forgetting duplicates: [3, 3] with target 6
  • Off-by-one in indices
  • Not asking about constraints (empty array, negatives)

⚙️ Production Thinking

# Real-world: idempotency check for API requests
class IdempotencyStore:
    def __init__(self):
        self._store: dict[str, dict] = {}
 
    def is_duplicate(self, key: str) -> bool:
        return key in self._store
 
    def store_result(self, key: str, result: dict, ttl: int = 86400):
        self._store[key] = {'result': result, 'expires_at': time.time() + ttl}
 
# In production: use Redis for distributed idempotency

2.2 Sliding Window

Pattern Recognition

Two pointers maintaining a window. Use for: subarray/substring with constraint, running metrics over a range, rate limiting.

# Fixed-size window — max sum of k elements
def max_sum_k(nums: list[int], k: int) -> int:
    window_sum = sum(nums[:k])
    max_sum = window_sum
 
    for i in range(k, len(nums)):
        window_sum += nums[i] - nums[i - k]  # Add new, remove old
        max_sum = max(max_sum, window_sum)
 
    return max_sum
# Variable-size window — longest substring without repeating chars
def length_of_longest_substring(s: str) -> int:
    seen = {}   # char -> last index
    left = 0
    max_len = 0
 
    for right, char in enumerate(s):
        if char in seen and seen[char] >= left:
            left = seen[char] + 1  # Shrink window
        seen[char] = right
        max_len = max(max_len, right - left + 1)
 
    return max_len
 
# Template: expand right → update state → shrink left while invalid → update result
# Sliding window rate limiter — DIRECT production usage
from collections import deque
import time
 
class SlidingWindowRateLimiter:
    def __init__(self, max_requests: int, window_seconds: int):
        self.max_requests = max_requests
        self.window = window_seconds
        self.requests: dict[str, deque] = defaultdict(deque)
 
    def is_allowed(self, user_id: str) -> bool:
        now = time.time()
        q = self.requests[user_id]
        while q and q[0] < now - self.window:
            q.popleft()
        if len(q) < self.max_requests:
            q.append(now)
            return True
        return False

🎯 Interview Reality

Template to memorize:

left = 0; result = 0; window_state = {}
for right in range(len(arr)):
    # 1. Add arr[right] to window
    # 2. Shrink left while constraint violated
    # 3. Update result
    result = max(result, right - left + 1)

Common mistakes:

  • Not cleaning up window state when shrinking
  • Wrong size formula: size = right - left + 1

2.3 Stack

Pattern Recognition

Stack = LIFO. Use when: "previous greater/smaller element", bracket matching, undo/redo, monotonic optimization.

# Valid parentheses
def is_valid(s: str) -> bool:
    stack = []
    pairs = {')': '(', '}': '{', ']': '['}
    for char in s:
        if char in '({[':
            stack.append(char)
        elif not stack or stack[-1] != pairs[char]:
            return False
        else:
            stack.pop()
    return len(stack) == 0
 
# Monotonic stack — next greater element
def next_greater_element(nums: list[int]) -> list[int]:
    result = [-1] * len(nums)
    stack = []  # Indices waiting for their next greater
 
    for i, num in enumerate(nums):
        while stack and nums[stack[-1]] < num:
            result[stack.pop()] = num
        stack.append(i)
 
    return result

🎯 Interview Reality

When to think "stack":

  • "Process X and undo" → stack
  • "Nested structure validation" → stack
  • "Previous/next larger element" → monotonic stack

2.4 Queue & BFS

Pattern Recognition

Queue = FIFO. BFS uses a queue. Use for: shortest path (unweighted), level-order traversal, dependency resolution.

from collections import deque
 
# BFS template
def bfs(graph: dict, start: str) -> list[str]:
    visited = {start}
    queue = deque([start])
    order = []
 
    while queue:
        node = queue.popleft()
        order.append(node)
        for neighbor in graph.get(node, []):
            if neighbor not in visited:
                visited.add(neighbor)
                queue.append(neighbor)
 
    return order
 
# Level-by-level BFS — when depth matters
def bfs_levels(graph: dict, start: str) -> list[list[str]]:
    visited = {start}
    queue = deque([start])
    levels = []
 
    while queue:
        level = []
        for _ in range(len(queue)):  # Process one level at a time
            node = queue.popleft()
            level.append(node)
            for neighbor in graph.get(node, []):
                if neighbor not in visited:
                    visited.add(neighbor)
                    queue.append(neighbor)
        levels.append(level)
 
    return levels

⚙️ Production Thinking

# BFS for dependency/service startup order (Kahn's algorithm)
def topological_sort(dependencies: dict[str, list[str]]) -> list[str]:
    in_degree = defaultdict(int)
    all_nodes = set()
    for node, deps in dependencies.items():
        all_nodes.add(node)
        for dep in deps:
            in_degree[node] += 1
            all_nodes.add(dep)
 
    queue = deque([n for n in all_nodes if in_degree[n] == 0])
    order = []
    while queue:
        node = queue.popleft(); order.append(node)
        for dep, deps in dependencies.items():
            if node in deps:
                in_degree[dep] -= 1
                if in_degree[dep] == 0:
                    queue.append(dep)
 
    if len(order) != len(all_nodes):
        raise ValueError("Cycle detected")
    return order

2.5 Heap / Priority Queue

Pattern Recognition

Heap = efficient min/max retrieval. Python only has min-heap — negate values for max-heap.

import heapq
 
# Top-K largest — min-heap of size k, O(n log k)
def top_k_largest(nums: list[int], k: int) -> list[int]:
    heap = []
    for num in nums:
        heapq.heappush(heap, num)
        if len(heap) > k:
            heapq.heappop(heap)  # Remove smallest
    return sorted(heap, reverse=True)
 
# K-way merge — merge K sorted lists
def merge_k_sorted(lists: list[list[int]]) -> list[int]:
    result = []
    heap = [(lst[0], i, 0) for i, lst in enumerate(lists) if lst]
    heapq.heapify(heap)
 
    while heap:
        val, li, ei = heapq.heappop(heap)
        result.append(val)
        if ei + 1 < len(lists[li]):
            heapq.heappush(heap, (lists[li][ei + 1], li, ei + 1))
 
    return result
# Backend usage: merge paginated results from multiple DB shards
 
# Median of stream
class MedianFinder:
    def __init__(self):
        self.lo = []  # max-heap (negate) for lower half
        self.hi = []  # min-heap for upper half
 
    def add_num(self, num: int):
        heapq.heappush(self.lo, -num)
        heapq.heappush(self.hi, -heapq.heappop(self.lo))
        if len(self.hi) > len(self.lo):
            heapq.heappush(self.lo, -heapq.heappop(self.hi))
 
    def find_median(self) -> float:
        if len(self.lo) > len(self.hi):
            return -self.lo[0]
        return (-self.lo[0] + self.hi[0]) / 2

🎯 Interview Reality

Key insight: Python only has min-heap. For max-heap: negate values.

Time complexity:

  • heappush/heappop: O(log n)
  • heapify: O(n) — build heap from list
  • nlargest/nsmallest(k, n): O(n log k)

Common mistake: Using max(list) in a loop — O(n²); use heap for repeated min/max queries.


Pattern Recognition

Binary search is not just "find element in sorted array." It's:

  • Find the boundary where a condition changes
  • Find the minimum/maximum that satisfies a condition
# Standard binary search
def binary_search(nums: list[int], target: int) -> int:
    left, right = 0, len(nums) - 1
    while left <= right:
        mid = left + (right - left) // 2
        if nums[mid] == target: return mid
        elif nums[mid] < target: left = mid + 1
        else: right = mid - 1
    return -1
 
# Binary search on answer — "minimize X such that condition holds"
def minimize(feasible_fn, lo: int, hi: int) -> int:
    while lo < hi:
        mid = (lo + hi) // 2
        if feasible_fn(mid):
            hi = mid        # mid works, try smaller
        else:
            lo = mid + 1    # mid fails, try larger
    return lo
 
# Find first position (left boundary)
def find_first(nums: list[int], target: int) -> int:
    left, right = 0, len(nums)
    result = -1
    while left < right:
        mid = (left + right) // 2
        if nums[mid] == target:
            result = mid
            right = mid     # Keep searching left
        elif nums[mid] < target:
            left = mid + 1
        else:
            right = mid
    return result

🎯 Interview Reality

Recognizing binary search:

  • "Find minimum X such that..." → binary search on answer
  • Array is sorted/rotated → binary search
  • Monotonically increasing/decreasing condition → binary search

Common mistakes:

  • Off-by-one: left <= right vs left < right (depends on pattern)
  • Infinite loop: not updating both left and right

2.7 DFS & Backtracking

# DFS — iterative (avoids stack overflow on deep graphs)
def dfs_iterative(graph, start):
    visited = set(); stack = [start]; result = []
    while stack:
        node = stack.pop()
        if node in visited: continue
        visited.add(node); result.append(node)
        stack.extend(reversed(graph.get(node, [])))
    return result
 
# Backtracking template — combinations/subsets
def combination_sum(candidates: list[int], target: int) -> list[list[int]]:
    result = []
 
    def dfs(start: int, current: list[int], remaining: int):
        if remaining == 0:
            result.append(current[:]); return
        if remaining < 0:
            return  # Prune
 
        for i in range(start, len(candidates)):
            current.append(candidates[i])
            dfs(i + 1, current, remaining - candidates[i])
            current.pop()  # Undo choice
 
    dfs(0, [], target)
    return result
# Pattern: choose → explore → unchoose

🎯 Interview Reality

  • "All combinations/subsets/permutations" → backtracking
  • Recursion vs iteration → recursion is cleaner; mention Python default limit of 1000

In interviews say: "For production code I'd use iterative DFS to avoid stack overflow on deep graphs. Python's default recursion limit is 1000 — I'd mention that and increase it or use iteration."


⚙️ Production Thinking

# DFS for role/permission hierarchy traversal
def get_all_permissions(role_id: str, role_graph: dict) -> set[str]:
    visited = set(); all_perms = set()
 
    def dfs(role: str):
        if role in visited: return
        visited.add(role)
        all_perms.update(role_graph[role].get('permissions', []))
        for parent in role_graph[role].get('inherits_from', []):
            dfs(parent)
 
    dfs(role_id)
    return all_perms

2.8 Prefix Sum

Pattern Recognition

Precompute cumulative sums for O(1) range queries.

# Build: O(n)  Query: O(1)
def build_prefix(nums: list[int]) -> list[int]:
    prefix = [0] * (len(nums) + 1)
    for i, num in enumerate(nums):
        prefix[i + 1] = prefix[i] + num
    return prefix
 
def range_sum(prefix: list[int], left: int, right: int) -> int:
    return prefix[right + 1] - prefix[left]
 
# Subarray sum equals K — prefix + hashmap
def subarray_sum(nums: list[int], k: int) -> int:
    count = 0; prefix = 0
    seen = {0: 1}  # prefix_sum -> frequency
 
    for num in nums:
        prefix += num
        count += seen.get(prefix - k, 0)
        seen[prefix] = seen.get(prefix, 0) + 1
 
    return count

⚙️ Production Thinking

Prefix sum powers: analytics dashboards, billing calculations, SLA monitoring, time-series range queries. Example: "total page views between day 10 and day 50" → build prefix once, answer any range in O(1).


2.9 Dynamic Programming

When to Recognize DP

Signal words: "minimum/maximum", "number of ways", "can/cannot achieve", "longest/shortest subsequence." DP = optimal substructure + overlapping subproblems.

# Bottom-up DP template
def dp_template(n: int) -> int:
    dp = [0] * (n + 1)
    dp[0] = base_case
    for i in range(1, n + 1):
        dp[i] = transition(dp, i)
    return dp[n]
 
# 0/1 Knapsack — resource allocation
def knapsack(weights: list[int], values: list[int], capacity: int) -> int:
    n = len(weights)
    dp = [[0] * (capacity + 1) for _ in range(n + 1)]
    for i in range(1, n + 1):
        for w in range(capacity + 1):
            dp[i][w] = dp[i-1][w]  # Don't take item
            if weights[i-1] <= w:
                dp[i][w] = max(dp[i][w], dp[i-1][w - weights[i-1]] + values[i-1])
    return dp[n][capacity]
# Backend usage: feature flag allocation, resource scheduling

🎯 Interview Reality

DP in backend interviews: Less common than arrays/graphs but shows up for optimization problems. Interviewers care about recognizing it and explaining the recurrence clearly.

Always start with: "For each item I have two choices: include or exclude. The recurrence is dp[i][w] = max(dp[i-1][w], dp[i-1][w-wi] + vi). Base case: 0 items or 0 capacity → 0 value."


2.10 Graph Algorithms

# Detect cycle in directed graph (3-color DFS)
def has_cycle(graph: dict) -> bool:
    WHITE, GRAY, BLACK = 0, 1, 2
    color = {node: WHITE for node in graph}
 
    def dfs(node) -> bool:
        color[node] = GRAY
        for neighbor in graph.get(node, []):
            if color[neighbor] == GRAY: return True  # Back edge = cycle
            if color[neighbor] == WHITE and dfs(neighbor): return True
        color[node] = BLACK
        return False
 
    return any(dfs(n) for n in graph if color[n] == WHITE)
 
# Dijkstra's — weighted shortest path
def dijkstra(graph: dict, start: str) -> dict[str, float]:
    distances = {node: float('inf') for node in graph}
    distances[start] = 0
    heap = [(0, start)]
 
    while heap:
        dist, node = heapq.heappop(heap)
        if dist > distances[node]: continue
        for neighbor, weight in graph.get(node, []):
            new_dist = dist + weight
            if new_dist < distances[neighbor]:
                distances[neighbor] = new_dist
                heapq.heappush(heap, (new_dist, neighbor))
 
    return distances
 
# Union-Find — connected components
class UnionFind:
    def __init__(self, n: int):
        self.parent = list(range(n))
        self.rank = [0] * n
 
    def find(self, x: int) -> int:
        if self.parent[x] != x:
            self.parent[x] = self.find(self.parent[x])  # Path compression
        return self.parent[x]
 
    def union(self, x: int, y: int) -> bool:
        px, py = self.find(x), self.find(y)
        if px == py: return False
        if self.rank[px] < self.rank[py]: px, py = py, px
        self.parent[py] = px
        if self.rank[px] == self.rank[py]: self.rank[px] += 1
        return True

2.11 DSA Interview Communication Guide

5-Step Framework

  1. Clarify (2 min) — duplicates? constraints? edge cases?
  2. Example (2 min) — trace through a small example manually
  3. Approach (3 min) — state brute force, then optimize; get buy-in before coding
  4. Code (10–15 min) — clean variable names, explain choices as you go
  5. Test (3 min) — trace your example, check edge cases

Complexity Template

Always analyze BOTH time AND space:

Time:  O(n log n) — one sort O(n log n) + one linear pass O(n)
Space: O(n)       — hashmap in worst case (all unique elements)

Part 3: Async Python & FastAPI


3.1 Understanding async/await

Level 1 — Must Know

Core mental model: asyncio is cooperative multitasking on a single thread. Tasks voluntarily yield control at await points. No OS thread switching — just callbacks.

import asyncio
 
# Sequential — 0.2s total
async def sequential():
    a = await fetch_user(1)  # suspend, wait 0.1s
    b = await fetch_user(2)  # suspend, wait 0.1s
    return a, b
 
# Concurrent — ~0.1s total
async def concurrent():
    a, b = await asyncio.gather(fetch_user(1), fetch_user(2))
    return a, b
 
# With error handling
results = await asyncio.gather(*tasks, return_exceptions=True)
successes = [r for r in results if not isinstance(r, Exception)]

Level 2 — Strong Candidate Knowledge

# Semaphore — limit concurrency (protect downstream services)
async def fetch_all(urls: list[str]) -> list[dict]:
    semaphore = asyncio.Semaphore(10)  # Max 10 concurrent
 
    async def fetch_one(url: str) -> dict:
        async with semaphore:
            return await fetch(url)
 
    return await asyncio.gather(*[fetch_one(u) for u in urls])
 
# Timeout
result = await asyncio.wait_for(fetch(url), timeout=5.0)
 
# Task group (Python 3.11+)
async with asyncio.TaskGroup() as tg:
    task1 = tg.create_task(fetch_user(1))
    task2 = tg.create_task(fetch_user(2))
# Both done here, any exception propagates
# CPU-bound in async context — WRONG vs CORRECT
# ❌ WRONG — blocks event loop
async def compute(data: bytes) -> str:
    return hashlib.sha256(data).hexdigest()
 
# ✅ CORRECT — offload to thread pool
from concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor(max_workers=4)
 
async def compute(data: bytes) -> str:
    loop = asyncio.get_event_loop()
    return await loop.run_in_executor(
        executor, lambda: hashlib.sha256(data).hexdigest()
    )

Level 3 — Deep Internals

The event loop is a single-threaded selector that polls I/O events. When a coroutine does await socket.read(), the OS registers a callback for when data arrives. The event loop runs other coroutines until data is ready. This is why async handles thousands of concurrent connections on one thread.


🎯 Interview Reality

  • "What happens if you call time.sleep() in an async function?" → blocks the entire event loop; use asyncio.sleep()
  • "When would you NOT use asyncio?" → CPU-bound work; use multiprocessing
  • "What's the difference between asyncio.gather and asyncio.wait?" → gather returns results in order; wait gives more control over cancellation

Strong event loop answer: "The event loop is a single-threaded scheduler. When a coroutine hits await on I/O, it registers a callback with the OS and suspends. The loop picks up the next runnable coroutine. When I/O completes, the OS notifies the loop, which resumes the coroutine. This is why async is great for I/O-bound work — one thread can handle thousands of concurrent connections."

GIL question: "asyncio runs on a single thread, so the GIL isn't a concern. For CPU-bound tasks I'd use multiprocessing to bypass the GIL, or loop.run_in_executor with a ProcessPoolExecutor."


3.2 FastAPI

Level 1 — Must Know

from fastapi import FastAPI, Depends, HTTPException, status, Query, Path
from pydantic import BaseModel, EmailStr, validator
 
app = FastAPI(title="User API", version="1.0.0")
 
class UserCreate(BaseModel):
    name: str
    email: EmailStr
    age: int
 
    @validator('age')
    def age_valid(cls, v):
        if not 0 <= v <= 150:
            raise ValueError('age must be 0–150')
        return v
 
class UserResponse(BaseModel):
    id: int; name: str; email: str
    class Config: from_attributes = True
 
@app.post("/users", response_model=UserResponse, status_code=201)
async def create_user(payload: UserCreate, db: AsyncSession = Depends(get_db)):
    user = User(**payload.dict())
    db.add(user)
    await db.commit()
    await db.refresh(user)
    return user
 
@app.get("/users/{user_id}", response_model=UserResponse)
async def get_user(
    user_id: int = Path(..., gt=0),
    db: AsyncSession = Depends(get_db)
):
    user = await db.get(User, user_id)
    if not user:
        raise HTTPException(status_code=404, detail="User not found")
    return user

Level 2 — Strong Candidate Knowledge

# Dependency injection — the heart of FastAPI
async def get_db() -> AsyncGenerator[AsyncSession, None]:
    async with AsyncSessionLocal() as session:
        try:
            yield session
            await session.commit()
        except Exception:
            await session.rollback()
            raise
 
async def get_current_user(
    token: str = Depends(oauth2_scheme),
    db: AsyncSession = Depends(get_db)
) -> User:
    payload = verify_token(token)  # raises if invalid
    user = await db.get(User, payload['sub'])
    if not user:
        raise HTTPException(401, "User not found")
    return user
 
# Chain dependencies
async def get_active_user(user: User = Depends(get_current_user)) -> User:
    if not user.is_active:
        raise HTTPException(403, "Inactive user")
    return user
 
# Middleware — cross-cutting concerns
@app.middleware("http")
async def request_id_middleware(request: Request, call_next):
    request_id = str(uuid.uuid4())
    request.state.request_id = request_id
    response = await call_next(request)
    response.headers["X-Request-ID"] = request_id
    return response
 
# Background tasks — fire and forget after response
@app.post("/users")
async def create_user(payload: UserCreate, background_tasks: BackgroundTasks, ...):
    user = await create_user_in_db(db, payload)
    background_tasks.add_task(send_welcome_email, user.email)  # After response
    background_tasks.add_task(notify_analytics, user.id)
    return user

🎯 Interview Reality

BackgroundTasks vs Celery:

BackgroundTasksCelery
ProcessSameSeparate workers
Persistent on crashNoYes (broker)
RetriesManualBuilt-in
SchedulingNoYes (beat)
Best forQuick post-request workHeavy/scheduled jobs

Strong answer on DI: "FastAPI's DI resolves dependencies before calling the endpoint. I can chain them — get_active_user depends on get_current_user which depends on get_db. FastAPI handles call order and caches results within a request. The endpoint just declares what it needs."


3.3 JWT & Authentication

Level 1 — Must Know

import jwt
from passlib.context import CryptContext
from datetime import datetime, timedelta
 
pwd_context = CryptContext(schemes=["bcrypt"])
SECRET_KEY = "your-256-bit-secret"  # From env in production
 
def create_access_token(user_id: int, roles: list[str]) -> str:
    payload = {
        "sub": str(user_id),
        "roles": roles,
        "exp": datetime.utcnow() + timedelta(minutes=30),
        "iat": datetime.utcnow(),
        "type": "access"
    }
    return jwt.encode(payload, SECRET_KEY, algorithm="HS256")
 
def verify_token(token: str) -> dict:
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
        if payload.get("type") != "access":
            raise InvalidTokenError("Wrong token type")
        return payload
    except jwt.ExpiredSignatureError:
        raise TokenExpiredError("Token has expired")
    except jwt.InvalidTokenError as e:
        raise InvalidTokenError(f"Invalid token: {e}")
 
# Password hashing — bcrypt is intentionally slow (~250ms at cost=12)
def hash_password(password: str) -> str:
    return pwd_context.hash(password)
 
def verify_password(plain: str, hashed: str) -> bool:
    return pwd_context.verify(plain, hashed)

Level 2 — Strong Candidate Knowledge

# Refresh token rotation — production pattern
async def refresh_tokens(refresh_token: str, db) -> tuple[str, str]:
    try:
        payload = jwt.decode(refresh_token, SECRET_KEY, algorithms=["HS256"])
    except jwt.InvalidTokenError:
        raise InvalidTokenError("Invalid refresh token")
 
    # Check not revoked
    record = await db.execute(
        select(RefreshToken).where(
            RefreshToken.token == refresh_token,
            RefreshToken.revoked == False
        )
    )
    if not record.scalar_one_or_none():
        raise InvalidTokenError("Token already used or revoked")
 
    # Revoke old, issue new (rotation)
    await db.execute(
        update(RefreshToken)
        .where(RefreshToken.token == refresh_token)
        .values(revoked=True)
    )
    new_access = create_access_token(int(payload["sub"]), payload["roles"])
    new_refresh = create_refresh_token(int(payload["sub"]))
    await db.commit()
    return new_access, new_refresh

🎯 Interview Reality

  • "JWT is signed, NOT encrypted — never put sensitive data in payload"
  • "How do you invalidate a JWT?" → Redis blocklist of JTIs; short expiry + refresh token rotation
  • "Why not sessions?" → stateless; works across multiple servers without shared state

Strong answer on JWT tradeoffs: "JWTs can't be invalidated before expiry since they're stateless. I'd keep access tokens short-lived (15–30 min) and use refresh token rotation — each use issues a new refresh token and revokes the old. For immediate invalidation (logout/ban), I maintain a Redis blocklist of revoked JTIs."


3.4 Database & SQLAlchemy

Level 1 — Must Know

from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
 
class Base(DeclarativeBase): pass
 
class User(Base):
    __tablename__ = "users"
    id: Mapped[int] = mapped_column(primary_key=True)
    email: Mapped[str] = mapped_column(String(255), unique=True, index=True)
    name: Mapped[str] = mapped_column(String(100))
    created_at: Mapped[datetime] = mapped_column(server_default=func.now())
    orders: Mapped[list["Order"]] = relationship(back_populates="user")
 
# The N+1 Problem — most common DB performance bug
# ❌ WRONG: N+1 queries
users = (await db.execute(select(User))).scalars().all()
for user in users:
    orders = await db.execute(select(Order).where(Order.user_id == user.id))  # N extra!
 
# ✅ CORRECT: eager loading
from sqlalchemy.orm import selectinload, joinedload
 
users = await db.execute(
    select(User).options(selectinload(User.orders))  # One extra query, not N
)
# joinedload — for one-to-one/few (JOIN instead of separate query)

Level 2 — Strong Candidate Knowledge

# Cursor pagination — O(1) seek vs offset O(n) scan
async def paginate_cursor(db, cursor: Optional[int] = None, limit: int = 20):
    query = select(User).order_by(User.id).limit(limit + 1)
    if cursor:
        query = query.where(User.id > cursor)
    users = (await db.execute(query)).scalars().all()
    has_next = len(users) > limit
    return users[:limit], users[-1].id if has_next else None
 
# Optimistic locking — handle concurrent updates
async def transfer(db, from_id: int, to_id: int, amount: int):
    result = await db.execute(select(Account).where(Account.id == from_id))
    account = result.scalar_one()
 
    updated = await db.execute(
        update(Account)
        .where(Account.id == from_id, Account.version == account.version)
        .values(balance=Account.balance - amount, version=Account.version + 1)
    )
    if updated.rowcount == 0:
        raise ConcurrentModificationError("Account modified concurrently")

🎯 Interview Reality

  • "Explain the N+1 problem" → 1 query for list + N queries per item; fix with selectinload/joinedload
  • "Cursor vs offset pagination?" → offset does full O(n) scan; cursor seeks by index O(1)
  • "How do you handle concurrent writes?" → optimistic locking (version column) or SELECT FOR UPDATE

Strong answer: "N+1 happens when you load a collection, then loop and query each item's relationships. Fix: eager loading — selectinload for one-to-many (separate batch query) or joinedload for one-to-one (JOIN). I'd also add query logging in staging to catch N+1 before production."


3.5 Redis

Level 1 — Must Know

Data StructureBest for
StringCounters, simple caching, sessions
HashObject fields (user profile)
ListQueues, recent activity feeds
SetUnique members, tags, friend lists
Sorted SetLeaderboards, delayed jobs, sliding window rate limiting
# Cache-aside pattern — standard approach
async def get_user_cached(user_id: int) -> Optional[dict]:
    cache_key = f"user:{user_id}"
    cached = await r.get(cache_key)
    if cached:
        return json.loads(cached)
 
    user = await db.get(User, user_id)
    if user:
        await r.set(cache_key, json.dumps(user.to_dict()), ex=300)
    return user.to_dict() if user else None
 
# Distributed lock with Lua (atomic release)
@asynccontextmanager
async def redis_lock(r, lock_name: str, timeout: int = 10):
    lock_value = str(uuid.uuid4())
    lock_key = f"lock:{lock_name}"
    acquired = await r.set(lock_key, lock_value, nx=True, ex=timeout)
    if not acquired:
        raise LockNotAcquiredError(lock_name)
    try:
        yield
    finally:
        release_script = """
        if redis.call("get", KEYS[1]) == ARGV[1] then
            return redis.call("del", KEYS[1])
        end return 0
        """
        await r.eval(release_script, 1, lock_key, lock_value)

🎯 Interview Reality

Cache stampede prevention: "When cache expires, many requests hit the DB simultaneously. Solutions: (1) Mutex — only one request fetches, others wait; (2) Probabilistic early expiry — randomly refresh before TTL expires; (3) Stale-while-revalidate — return stale data while refreshing async."

Redis vs Memcached: "Redis has more data structures, persistence, pub/sub, and Lua scripting. Memcached is simpler and faster for pure caching with no persistence needs. In most backend stacks today, Redis is the default choice."


3.6 Celery & Background Jobs

Level 1 — Must Know

from celery import Celery
 
celery = Celery(
    "myapp",
    broker="redis://localhost:6379/0",   # Task queue
    backend="redis://localhost:6379/1",  # Result store
)
 
celery.conf.update(
    task_soft_time_limit=300,       # 5 min — raises SoftTimeLimitExceeded
    task_time_limit=360,            # 6 min — kills worker
    worker_max_tasks_per_child=1000 # Restart to prevent memory leaks
)
 
@celery.task(bind=True, max_retries=3)
def send_email(self, user_id: int, template: str):
    try:
        user = get_user(user_id)
        email_service.send(user.email, template)
    except EmailServiceError as e:
        raise self.retry(exc=e, countdown=2 ** self.request.retries)
 
# Call tasks
send_email.delay(user_id=1, template="welcome")            # Async now
send_email.apply_async(args=[1, "welcome"], countdown=60)  # Delayed

3.7 Logging & Monitoring

Level 1 — Must Know

# Structured JSON logging — always use in production
import logging, json
from contextvars import ContextVar
 
request_id_var: ContextVar[str] = ContextVar('request_id', default='')
 
class JSONFormatter(logging.Formatter):
    def format(self, record: logging.LogRecord) -> str:
        return json.dumps({
            "timestamp": datetime.utcnow().isoformat(),
            "level": record.levelname,
            "message": record.getMessage(),
            "request_id": request_id_var.get(),
            "module": record.module,
            "line": record.lineno,
            **({"exception": self.formatException(record.exc_info)}
               if record.exc_info else {})
        })
 
# Health check endpoint — required for load balancers, k8s
@app.get("/health")
async def health_check():
    checks = {}
    status_code = 200
    try:
        await db.execute(text("SELECT 1")); checks["database"] = "ok"
    except Exception as e:
        checks["database"] = f"error: {e}"; status_code = 503
    try:
        await redis.ping(); checks["redis"] = "ok"
    except Exception as e:
        checks["redis"] = f"error: {e}"; status_code = 503
    return JSONResponse(
        content={"status": "ok" if status_code == 200 else "degraded", "checks": checks},
        status_code=status_code
    )
# /health/live  — is process running? (don't restart if DB is down)
# /health/ready — ready to serve traffic? (remove from LB if degraded)

3.8 Testing

Level 1 — Must Know

import pytest
from httpx import AsyncClient, ASGITransport
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
 
@pytest.fixture
async def db():
    engine = create_async_engine("sqlite+aiosqlite:///:memory:")
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)
    async with async_sessionmaker(engine)() as session:
        yield session
    await engine.dispose()
 
@pytest.fixture
async def client(db):
    app.dependency_overrides[get_db] = lambda: db
    async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as ac:
        yield ac
    app.dependency_overrides.clear()
 
@pytest.mark.asyncio
async def test_create_user(client):
    response = await client.post("/users", json={"name": "Alice", "email": "a@b.com", "age": 25})
    assert response.status_code == 201
    data = response.json()
    assert data["name"] == "Alice"
    assert "id" in data
 
# Mocking external services
from unittest.mock import AsyncMock, patch
 
@pytest.mark.asyncio
async def test_email_sent_on_register(client):
    with patch('myapp.services.email.send_email', new_callable=AsyncMock) as mock_email:
        await client.post("/users", json={"name": "Alice", "email": "a@b.com"})
        mock_email.assert_called_once_with("a@b.com", "welcome")

🎯 Interview Reality

Testing pyramid for backend APIs:

  1. Unit tests (~60%): Business logic, validators — no DB, no HTTP
  2. Integration tests (~30%): API endpoints with real test DB, mocked external services
  3. E2E tests (~10%): Full stack, staging environment

What interviewers want to hear: "I write unit tests for business logic in isolation using mocks. Integration tests cover the API layer with a real test database — I use SQLite in-memory or TestContainers. I mock external APIs like Stripe or SendGrid. For critical paths, I have contract tests to verify service boundaries."


Part 4: PostgreSQL, System Design & Architecture


4.1 PostgreSQL for Backend Engineers

Level 1 — Must Know

-- EXPLAIN ANALYZE — your debugging superpower
EXPLAIN (ANALYZE, BUFFERS, FORMAT TEXT)
SELECT u.id, u.name, COUNT(o.id) as order_count
FROM users u
LEFT JOIN orders o ON o.user_id = u.id
WHERE u.created_at > NOW() - INTERVAL '30 days'
GROUP BY u.id, u.name
ORDER BY order_count DESC LIMIT 20;
 
-- What to look for:
-- ❌ Seq Scan on large table → add index
-- ❌ High rows estimate mismatch → run ANALYZE (stale statistics)
-- ✅ Index Scan / Index Only Scan
 
-- Indexing strategy
CREATE INDEX CONCURRENTLY idx_users_email ON users(email);
 
-- Composite — equality conditions FIRST
CREATE INDEX idx_orders_user_status ON orders(user_id, status);
-- Useful for: WHERE user_id = ? AND status = ?
-- NOT useful for: WHERE status = ? (no user_id prefix)
 
-- Partial — only index a subset (smaller, faster)
CREATE INDEX idx_active_users ON users(email) WHERE is_active = true;
 
-- Covering — include extra cols to avoid heap fetch
CREATE INDEX idx_orders_covering ON orders(user_id)
  INCLUDE (id, status, created_at);

Level 2 — Strong Candidate Knowledge

-- SELECT FOR UPDATE — pessimistic locking for bank transfers
BEGIN;
SELECT * FROM accounts WHERE id = 1 FOR UPDATE;  -- Lock this row
UPDATE accounts SET balance = balance - 100 WHERE id = 1;
COMMIT;
 
-- SKIP LOCKED — queue processing pattern (no blocking)
SELECT * FROM jobs WHERE status = 'pending'
LIMIT 1 FOR UPDATE SKIP LOCKED;
 
-- Window functions — analytics without multiple queries
SELECT user_id, amount,
    SUM(amount) OVER (PARTITION BY user_id ORDER BY order_date) AS running_total,
    ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY order_date DESC) AS recency_rank
FROM orders;
 
-- JSONB for flexible schemas
CREATE TABLE events (
    id SERIAL PRIMARY KEY,
    type VARCHAR(50),
    payload JSONB,
    created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_events_payload ON events USING gin(payload);
SELECT * FROM events WHERE payload @> '{"action": "login"}';

🎯 Interview Reality

  • "How do you optimize a slow query?" → EXPLAIN ANALYZE, check Seq Scans, add appropriate indexes
  • "What isolation level for bank transfers?" → REPEATABLE READ or SERIALIZABLE
  • "What is MVCC?" → PostgreSQL never overwrites; creates new row versions; VACUUM reclaims dead rows

Strong answer: "First I'd run EXPLAIN ANALYZE to see the plan. I'd look for Seq Scans on large tables, stale statistics (row count mismatch), and expensive sorts. Common fixes: composite index on WHERE/JOIN columns, partial index for common filters, ANALYZE to update statistics."


4.2 System Design Fundamentals

Level 1 — Must Know

ConceptDefinitionWhen to use
Horizontal scalingAdd more serversStateless services, read replicas
Vertical scalingBigger serverDB writes before horizontal
ShardingSplit DB by keyuser_id % N for user data
ReplicationCopy data across serversRead replicas, high availability
CDNCache at edgeStatic assets, images, JS/CSS
Message queueAsync communicationDecouple services, buffer spikes

Level 2 — Strong Candidate Knowledge

CAP Theorem: In a distributed system, you can have at most 2 of: Consistency, Availability, Partition Tolerance. Since partitions happen, you choose CP or AP.

CP examples: PostgreSQL, Redis cluster with quorum
AP examples: Cassandra, DynamoDB, CouchDB

QPS estimation template:
10M users × 10% DAU = 1M DAU
1M × 10 requests/day = 10M req/day
10M / 86400 ≈ 115 QPS average, 1150 QPS peak (10x multiplier)

🎯 Interview Reality

System design answer structure:

  1. Clarify requirements and scale (2–3 min)
  2. Estimate QPS, storage, bandwidth (1–2 min)
  3. High-level design: boxes and arrows (3–5 min)
  4. Deep dive on requested component (10–15 min)
  5. Discuss tradeoffs proactively

What separates senior answers:

  • Mid: "I'd cache this in Redis."
  • Senior: "I'd cache product listings in Redis with 5-min TTL. On miss, I'd use a mutex to prevent stampede — only one request fetches from DB while others wait. Monitor hit rate — if below 80%, TTL is too short."

4.3 Docker & Deployment

# Production Dockerfile — multi-stage build
FROM python:3.12-slim AS builder
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
 
FROM python:3.12-slim AS runtime
WORKDIR /app
RUN useradd -m -u 1000 appuser  # Non-root for security
COPY --from=builder /usr/local/lib/python3.12/site-packages /usr/local/lib/python3.12/site-packages
COPY --chown=appuser:appuser . .
USER appuser
EXPOSE 8000
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]
# docker-compose.yml — local dev
version: '3.8'
services:
  api:
    build: .
    ports: ["8000:8000"]
    environment:
      - DATABASE_URL=postgresql+asyncpg://postgres:password@db:5432/mydb
      - REDIS_URL=redis://redis:6379/0
    depends_on:
      db: { condition: service_healthy }
 
  db:
    image: postgres:15
    environment: { POSTGRES_DB: mydb, POSTGRES_USER: postgres, POSTGRES_PASSWORD: password }
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U postgres"]
      interval: 5s; retries: 5
 
  worker:
    build: .
    command: celery -A celery_app worker --loglevel=info
    depends_on: [redis, db]
 
  redis:
    image: redis:7-alpine

4.4 Alembic Migrations

# Best practices:
# 1. Never delete a column in the same migration as removing code
# 2. Always test downgrade() works
# 3. Use CONCURRENTLY for index creation in production
# 4. Data migrations go in separate scripts, not schema migrations
 
# Zero-downtime deployment order:
# Phase 1: Deploy migration (add nullable column)
# Phase 2: Deploy app code (reads new column, writes both)
# Phase 3: Backfill data (background job)
# Phase 4: Add NOT NULL constraint, remove old column
 
def upgrade():
    # Step 1: Add nullable (instant, no lock)
    op.add_column('users', sa.Column('phone', sa.String(20), nullable=True))
 
def downgrade():
    op.drop_column('users', 'phone')

4.5 CI/CD

# .github/workflows/ci.yml
name: CI
on: [push, pull_request]
 
jobs:
  test:
    runs-on: ubuntu-latest
    services:
      postgres:
        image: postgres:15
        env: { POSTGRES_DB: testdb, POSTGRES_USER: postgres, POSTGRES_PASSWORD: password }
        options: --health-cmd pg_isready --health-interval 10s --health-retries 5
      redis:
        image: redis:7
        options: --health-cmd "redis-cli ping" --health-interval 10s
 
    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-python@v5
        with: { python-version: '3.12', cache: 'pip' }
      - run: pip install -r requirements.txt
      - name: Run migrations
        run: alembic upgrade head
        env: { DATABASE_URL: postgresql+asyncpg://postgres:password@localhost:5432/testdb }
      - name: Run tests
        run: pytest tests/ -v --cov=myapp --cov-report=xml
      - name: Lint
        run: ruff check . && mypy myapp/

4.6 Configuration & Secrets Management

from pydantic_settings import BaseSettings
from functools import lru_cache
 
class Settings(BaseSettings):
    DATABASE_URL: str
    JWT_SECRET_KEY: str
    REDIS_URL: str = "redis://localhost:6379/0"
    ENV: str = "development"
    DEBUG: bool = False
    LOG_LEVEL: str = "INFO"
 
    class Config:
        env_file = ".env"
 
@lru_cache()  # Read env once — singleton pattern
def get_settings() -> Settings:
    return Settings()
 
settings = get_settings()

Interview question: How do you manage secrets?

"In development, I use .env files with pydantic-settings — .env is in .gitignore. In production, secrets come from environment variables injected by infrastructure: AWS Secrets Manager, Kubernetes Secrets, or Vault. I never commit secrets. For rotation: secret versioning + zero-downtime deploy."


Part 5: Machine Coding Problems

Machine coding = 90-minute design + implement a functional backend system. Interviewers evaluate: architecture thinking, code quality, API design, error handling, scalability awareness.

How to approach:

  1. Clarify requirements for 5–10 min — don't over-engineer
  2. Define data models first — they drive everything
  3. Define API contracts before writing handlers
  4. Build working skeleton first, then add auth/caching/etc.
  5. State out loud what you'd add if you had more time

Problem 1: URL Shortener

Requirements

  • Shorten a URL, return short code
  • Redirect short code to original URL
  • Track click counts, custom aliases, TTL

Data Model

class ShortenedURL(Base):
    __tablename__ = "shortened_urls"
    id = Column(Integer, primary_key=True)
    original_url = Column(String(2048), nullable=False)
    short_code = Column(String(10), unique=True, index=True)
    click_count = Column(Integer, default=0)
    expires_at = Column(DateTime, nullable=True)
    created_at = Column(DateTime, server_default=func.now())
    is_active = Column(Boolean, default=True)

Core Implementation

ALPHABET = string.ascii_letters + string.digits  # 62 chars
 
# Base62 encode DB ID — guaranteed unique, no collision
def id_to_code(id: int) -> str:
    code = []
    while id > 0:
        code.append(ALPHABET[id % 62])
        id //= 62
    return ''.join(reversed(code)) or ALPHABET[0]
 
class ShortenRequest(BaseModel):
    url: HttpUrl
    custom_alias: Optional[str] = None
    ttl_days: Optional[int] = None
 
@app.post("/shorten", response_model=ShortenResponse)
async def shorten_url(request: ShortenRequest, db=Depends(get_db)):
    url_obj = ShortenedURL(
        original_url=str(request.url),
        expires_at=datetime.utcnow() + timedelta(days=request.ttl_days)
                   if request.ttl_days else None
    )
    db.add(url_obj)
    await db.flush()  # Get auto-increment ID
    url_obj.short_code = request.custom_alias or id_to_code(url_obj.id)
    await db.commit()
    return ShortenResponse(short_url=f"https://short.ly/{url_obj.short_code}")
 
@app.get("/{short_code}")
async def redirect(short_code: str, background_tasks: BackgroundTasks, db=Depends(get_db)):
    # Check Redis cache first
    cached = await redis.get(f"url:{short_code}")
    if cached:
        background_tasks.add_task(increment_click, short_code)
        return RedirectResponse(cached, status_code=301)
 
    url_obj = (await db.execute(
        select(ShortenedURL).where(
            ShortenedURL.short_code == short_code,
            ShortenedURL.is_active == True
        )
    )).scalar_one_or_none()
 
    if not url_obj: raise HTTPException(404, "Not found")
    if url_obj.expires_at and url_obj.expires_at < datetime.utcnow():
        raise HTTPException(410, "URL expired")
 
    await redis.set(f"url:{short_code}", url_obj.original_url, ex=3600)
    background_tasks.add_task(increment_click, short_code)
    return RedirectResponse(url_obj.original_url, status_code=301)

Scaling Discussion

Hot URL / Cache stampede:

"Popular URLs will hammer the DB on every redirect. I'd cache short_code → original_url in Redis. Cache hit rate should be 99%+ for viral URLs."

Click count accuracy vs performance:

"For viral URLs, updating click_count on every request creates write contention. I'd batch: Redis INCR per request, background job flushes to DB every 60s. Trades perfect accuracy for scalability."


Problem 2: Rate Limiter

Fixed Window vs Sliding Window

Fixed window has a boundary problem: 100 requests at 00:59 + 100 at 01:01 = 200 requests in 2 seconds. Sliding window tracks exact timestamps.

# Sliding Window (Redis ZSET)
async def is_rate_limited(user_id: str, limit: int = 100, window: int = 60) -> bool:
    key = f"rl:{user_id}"
    now = time.time()
    pipe = redis.pipeline()
    await pipe.zremrangebyscore(key, 0, now - window)  # Remove old
    await pipe.zadd(key, {str(now): now})
    await pipe.zcard(key)
    await pipe.expire(key, window)
    results = await pipe.execute()
    return results[2] > limit
 
# Middleware with response headers
@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
    user_id = getattr(request.state, 'user_id', request.client.host)
    limited = await is_rate_limited(user_id)
    if limited:
        return JSONResponse(
            status_code=429,
            content={"error": "RATE_LIMIT_EXCEEDED"},
            headers={"Retry-After": "60"}
        )
    response = await call_next(request)
    return response

Problem 3: Task Manager API

class TaskStatus(str, Enum):
    TODO = "todo"; IN_PROGRESS = "in_progress"
    DONE = "done"; CANCELLED = "cancelled"
 
class TaskPriority(str, Enum):
    LOW = "low"; MEDIUM = "medium"; HIGH = "high"; CRITICAL = "critical"
 
@app.get("/tasks", response_model=list[TaskResponse])
async def list_tasks(
    status: Optional[TaskStatus] = None,
    assignee_id: Optional[int] = None,
    page: int = Query(1, ge=1),
    page_size: int = Query(20, le=100),
    db=Depends(get_db), user=Depends(get_active_user)
):
    filters = []
    if status: filters.append(Task.status == status)
    if assignee_id: filters.append(Task.assignee_id == assignee_id)
    query = select(Task)
    if filters: query = query.where(and_(*filters))
    query = query.offset((page - 1) * page_size).limit(page_size)
    return (await db.execute(query)).scalars().all()
 
@app.patch("/tasks/{task_id}", response_model=TaskResponse)
async def update_task(
    task_id: int, payload: TaskUpdate,
    background_tasks: BackgroundTasks,
    db=Depends(get_db), user=Depends(get_active_user)
):
    task = await db.get(Task, task_id)
    if not task: raise HTTPException(404)
    if task.created_by != user.id and not user.is_admin:
        raise HTTPException(403, "Not authorized")
 
    old_assignee = task.assignee_id
    for field, val in payload.dict(exclude_unset=True).items():
        setattr(task, field, val)
    await db.commit()
 
    if payload.assignee_id and payload.assignee_id != old_assignee:
        background_tasks.add_task(notify_user, payload.assignee_id, task.title)
    return task

Problem 4: Notification Service

# Clean architecture: decouple logic from delivery mechanism
class NotificationChannel(str, Enum):
    EMAIL = "email"; SMS = "sms"; PUSH = "push"
 
class NotificationSender(ABC):
    @abstractmethod
    async def send(self, notification) -> bool: pass
 
class EmailSender(NotificationSender):
    async def send(self, notification) -> bool:
        template = await get_template(notification.template_id)
        rendered = template.render(**notification.context)
        try:
            await sendgrid.send_email(
                to=notification.context['email'],
                subject=rendered.subject,
                body=rendered.body
            )
            return True
        except Exception as e:
            logger.error(f"Email failed: {e}")
            return False
 
class NotificationService:
    def __init__(self):
        self.senders = {
            NotificationChannel.EMAIL: EmailSender(),
            NotificationChannel.SMS: SMSSender(),
        }
 
    async def send_bulk(self, notifications: list) -> dict:
        semaphore = asyncio.Semaphore(50)
 
        async def send_one(n):
            async with semaphore:
                return await self.senders[n.channel].send(n)
 
        results = await asyncio.gather(*[send_one(n) for n in notifications], return_exceptions=True)
        return {"sent": sum(1 for r in results if r is True),
                "failed": sum(1 for r in results if r is not True)}
 
# Celery task — async delivery with retries
@celery.task(bind=True, max_retries=3, default_retry_delay=60)
def deliver_notification(self, notification_dict: dict):
    try:
        success = asyncio.run(NotificationService().send(Notification(**notification_dict)))
        if not success: raise DeliveryError()
        update_status(notification_dict['id'], 'delivered')
    except Exception as e:
        update_status(notification_dict['id'], 'failed')
        raise self.retry(exc=e)

Problem 5: Auth Service

@app.post("/auth/register", response_model=TokenResponse, status_code=201)
async def register(payload: RegisterRequest, db=Depends(get_db)):
    if await get_user_by_email(db, payload.email):
        raise HTTPException(409, "Email already registered")
    user = User(
        email=payload.email,
        name=payload.name,
        password_hash=pwd_context.hash(payload.password)
    )
    db.add(user); await db.commit()
    send_welcome_email.delay(user.id)
    return create_token_response(user)
 
@app.post("/auth/login", response_model=TokenResponse)
async def login(form=Depends(OAuth2PasswordRequestForm), db=Depends(get_db)):
    user = await get_user_by_email(db, form.username)
    # Constant-time comparison — prevents timing attacks
    if not user or not pwd_context.verify(form.password, user.password_hash):
        raise HTTPException(401, "Invalid credentials",
                            headers={"WWW-Authenticate": "Bearer"})
    if not user.is_active:
        raise HTTPException(403, "Account disabled")
    return create_token_response(user)
 
@app.post("/auth/password-reset/request")
async def request_reset(email: EmailStr, db=Depends(get_db)):
    user = await get_user_by_email(db, email)
    if user:  # Always return 200 — prevent email enumeration!
        send_reset_email.delay(user.email, create_reset_token(user.id))
    return {"message": "If an account exists, you'll receive a reset link"}
 
@app.post("/auth/logout")
async def logout(token=Depends(oauth2_scheme), user=Depends(get_current_user)):
    payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
    jti = payload.get("jti")
    if jti:
        await redis.set(f"blocklist:{jti}", "1", ex=1800)  # Blocklist until expiry
    return {"message": "Logged out successfully"}

Machine Coding Evaluation Criteria

DimensionWhat interviewers look for
API DesignRESTful, proper status codes, clear contracts
Data ModelingAppropriate types, indexes on FK, relationships
Error HandlingDomain errors, HTTP mapping, meaningful messages
SecurityInput validation, auth checks, no plaintext passwords
Code QualitySeparation of concerns, readable, no magic numbers
ScalabilityCaching, async, pagination, connection pooling
Edge CasesDuplicates, concurrent requests, expiry, empty inputs

Common mistakes to avoid:

  1. Jumping to code before clarifying requirements
  2. No input validation (Pydantic is your friend)
  3. Missing 404, 403, 409 error cases
  4. No authentication on protected endpoints
  5. Calling time.sleep() in async endpoints
  6. Hardcoded credentials or config
  7. No indexes on foreign keys and frequently queried columns

Part 6: Interview Communication & Behavioral


6.1 "Tell Me About Yourself"

90 seconds, structured, ends with why you're here.

Template:

"I'm a backend engineer with [X] years of experience in [domain]. Most recently at [Company], I [one impressive thing]. My primary stack is Python — FastAPI, async SQLAlchemy, Redis. I'm looking for [what you want] because [why this company]."

Strong Example:

"I'm a backend engineer with 4 years of experience, mostly fintech. At my current company I led migration of our payment service from monolith to microservices, reducing P99 from 800ms to 120ms. I own the core auth system serving 2M users. I'm looking for a role where I can own backend architecture end-to-end, which is why I'm excited about this position."


6.2 How to Talk About Projects — STAR Framework

ElementWhat to cover
SituationSystem scale, team size, initial state
TaskYour specific ownership/responsibility
ActionTechnical decisions and — critically — WHY
ResultMeasurable outcome: latency, throughput, uptime, cost

Example — caching project:

Situation: Product listing API timing out at 3–4s. 50k users, 500 req/min peak.

Task: Investigate and fix the performance issue.

Action: EXPLAIN ANALYZE revealed 15 DB queries per request (N+1 on categories). Fixed with selectinload — dropped to 800ms. Added Redis cache-aside with 5-min TTL. Added rate limiting at 100 req/min/user.

Result: P95 dropped from 3.4s to 85ms. Cache hit rate 94%. Zero timeouts for 6 months.


6.3 How to Explain Technical Decisions

Weak: "I used Redis for caching."

Strong: "I used Redis because our product catalog is read-heavy with infrequent writes — ideal cache candidate. Redis specifically because it's already in our stack, supports TTL natively, and gives O(1) gets/sets. Memcached would also work, but Redis gives us sorted sets for leaderboards if needed later."

Rule: Always justify choices, not just state them. Mention the tradeoffs you considered.


6.4 "Tell Me About a Failure"

Template:

  1. What happened (factual, take ownership)
  2. Why it happened (root cause, no excuses)
  3. Immediate action (incident response)
  4. Permanent fix (systemic prevention)

Example:

"I deployed a migration that dropped a column still being read by production — 4-minute outage. Root cause: I ran the schema change before deploying new code (wrong order). Immediately rolled back. Post-mortem: established a deployment runbook (code first, then schema), added schema compatibility checks in CI. Zero migration-related outages since."


6.5 Common Follow-up Questions & Strong Answers

"How would you handle [service going down]?"

"Add circuit breakers — after 5 failures in 10 seconds, open the circuit and fail fast with a fallback. Retries with exponential backoff for transient failures. Health checks and auto-restart in Kubernetes."

"What if the database becomes a bottleneck?"

"Identify: read-heavy or write-heavy? For reads: read replicas + Redis cache. For writes: vertical scaling first (cheaper), then sharding by user_id if needed. For both: CQRS — separate read and write paths."

"How do you debug a performance issue in production?"

"Start with metrics: which endpoint is slow? APM traces to find bottleneck — DB, external service, or computation? For DB: EXPLAIN ANALYZE, check missing indexes, check connection pool exhaustion. Use py-spy for async profiling without stopping the process."

"How do you ensure consistency in distributed systems?"

"Depends on requirement. For financial: saga pattern or 2-phase commit. For non-critical: eventual consistency with idempotent operations. Redis distributed locks for operations that must not run concurrently."


6.6 Questions to Ask Interviewers

Technical:

  • "What's the biggest technical challenge the team is facing right now?"
  • "How does the team handle on-call and incident response?"
  • "How long does a change take to reach production?"
  • "How do you handle zero-downtime database migrations?"

Team & Culture:

  • "How much ownership do engineers have over architecture decisions?"
  • "What does success look like in the first 6 months?"

6.7 Concurrency Quick Reference

PatternWhen to usePython tool
Async I/OMany concurrent I/O operationsasyncio, FastAPI
Thread poolI/O + existing sync librariesThreadPoolExecutor
Process poolCPU-bound parallel workProcessPoolExecutor
Queue-based workersBackground jobs, retries, schedulingCelery
Rate limitingProtect downstream servicesasyncio.Semaphore
Concurrent requestsFan-out to multiple servicesasyncio.gather
# Decision tree
if task_is_cpu_bound:
    use(ProcessPoolExecutor)
elif task_needs_retry_and_persistence:
    use(Celery)
elif task_is_io_bound and high_concurrency:
    use(asyncio)
elif task_is_io_bound and existing_sync_library:
    use(ThreadPoolExecutor)

Part 7: Quick Reference, Production Patterns & Interview Checklist


7.1 HTTP Status Codes — Must Know

CodeWhen to use
200 OKSuccessful GET, PUT, PATCH
201 CreatedSuccessful POST that created a resource
204 No ContentSuccessful DELETE or action with no body
400 Bad RequestMalformed request, missing required fields
401 UnauthorizedNot authenticated (missing/invalid token)
403 ForbiddenAuthenticated but not authorized
404 Not FoundResource does not exist
409 ConflictDuplicate resource (email already exists)
422 Unprocessable EntityValidation failed (FastAPI/Pydantic default)
429 Too Many RequestsRate limited
500 Internal Server ErrorUnexpected server error
503 Service UnavailableDependency down (DB, external API)

7.2 Common Python Bugs — Interview Traps

# Bug 1: Mutable default argument
def f(lst=[]):   ...  # ❌
def f(lst=None): ...  # ✅
 
# Bug 2: Late binding closures
funcs = [lambda: i for i in range(3)]     # [2, 2, 2] ❌
funcs = [lambda i=i: i for i in range(3)] # [0, 1, 2] ✅
 
# Bug 3: Modifying list while iterating
for x in lst:
    if cond(x): lst.remove(x)             # ❌ skips elements
lst = [x for x in lst if not cond(x)]    # ✅
 
# Bug 4: Bare except catches everything
except:            # ❌ catches KeyboardInterrupt, SystemExit
except Exception:  # ✅
 
# Bug 5: String concatenation in loops — O(n²)
result = ""
for s in strings: result += s     # ❌
result = "".join(strings)         # ✅ O(n)
 
# Bug 6: Blocking the event loop
async def f(): time.sleep(5)          # ❌ blocks entire loop
async def f(): await asyncio.sleep(5) # ✅
 
# Bug 7: Coroutine not awaited
user = get_user()        # ❌ returns coroutine object
user = await get_user()  # ✅

7.3 Big-O Quick Reference

OperationData StructureTime Complexity
Lookup by keydict / setO(1) average
AppendlistO(1) amortized
Insert at indexlistO(n)
SearchlistO(n)
heappush / heappopheapO(log n)
Sortlist.sort()O(n log n)
BFS / DFSgraphO(V + E)
Binary searchsorted arrayO(log n)
Build prefix sumarrayO(n)
Range sum queryprefix arrayO(1)

Space complexity reminders:

  • Recursion depth = O(depth) stack space
  • BFS queue = O(width of tree/graph)
  • Memoization = O(number of subproblems)

7.4 Pre-ship Checklists

Database Performance

  • EXPLAIN ANALYZE on all new queries
  • No Seq Scans on tables > 10K rows
  • Foreign keys have indexes
  • WHERE clause columns have indexes
  • No N+1 queries (use selectinload/joinedload)
  • Connection pool configured (pool_size, max_overflow)
  • Query timeout configured
  • Migrations are backwards-compatible
  • Long-running migrations use CONCURRENTLY

Security

  • No hardcoded credentials
  • Passwords hashed with bcrypt (cost factor ≥ 12)
  • SQL injection impossible (ORM/parameterized queries only)
  • Input validation on all endpoints (Pydantic)
  • Authentication on all protected endpoints
  • CORS not set to * in production
  • Rate limiting on auth endpoints
  • Password reset doesn't leak email existence (always return 200)
  • Sensitive data not logged
  • HTTPS enforced

7.5 Interview Day Checklist

DSA Interview

  • Clarify before coding (duplicates? constraints? return type?)
  • State brute force, then optimize — get buy-in before coding
  • Write clean, readable variable names
  • Test with your own example, then edge cases
  • State time AND space complexity unprompted

System Design

  • Ask about scale before designing
  • Draw high-level diagram first
  • Justify every choice ("I chose X because...")
  • Mention tradeoffs proactively
  • Ask "what component should I dive deeper on?"

Machine Coding

  • Clarify requirements for 5–10 min
  • Define data models first
  • Start with working skeleton, then add features
  • State what you'd add if you had more time

7.6 The "Senior Engineer" Mindset

What separates senior-level from mid-level answers:

Mid-levelSenior-level
"I'd add an index.""I'd add a composite index on (user_id, status) — equality condition first. Created CONCURRENTLY to avoid locks. I'd also check if a partial index on active users only would be smaller."
"I'd use async.""Async because this endpoint makes 3 external API calls. With gather(), total time = max of 3, not sum. For the CPU-bound step, I'd offload to Celery with ProcessPoolExecutor workers."
"I'd cache this in Redis.""Redis cache-aside with 5-min TTL. On miss, mutex to prevent stampede. Event-driven invalidation on write. Monitor hit rate — if below 80%, TTL is too short."

Senior engineers think in terms of:

  • Specific numbers and thresholds
  • Edge cases and failure modes
  • Tradeoffs, not just solutions
  • What could go wrong
  • How to monitor and debug it

7.7 30-Day Study Plan

WeekFocus
Week 1Core Python (decorators, generators, OOP) + Arrays + Hashmaps + Sliding Window + Stack/Queue
Week 2Heap + Binary Search + BFS/DFS + PostgreSQL (indexes, EXPLAIN ANALYZE) + SQLAlchemy
Week 3FastAPI deep dive + async patterns + Redis + JWT auth + Celery
Week 4Machine coding projects + System design + Behavioral STAR stories + Mock interviews

Build these from scratch:

  1. URL Shortener with Redis caching + click tracking
  2. Task Manager with JWT auth + role-based access
  3. Rate limiter middleware (sliding window)
  4. Celery-based notification service with retries
  5. Paginated API with cursor pagination + EXPLAIN ANALYZE verification

7.8 Topic Priority by Interview Type

Interview TypeHigh PriorityMedium Priority
Startup (early stage)FastAPI, PostgreSQL, Docker, auth, deploymentRedis, Celery, testing
Product company (Series B+)DSA (arrays/graphs/DP), system design, SQL optimizationFastAPI, async, Redis
Big tech (FAANG)All DSA patterns, distributed system designPython internals, concurrency
Fintech / HealthcareConsistency, transactions, security, audit trailsPerformance, caching

7.9 Resources

DSA Practice:

  • LeetCode — Apply these filters - Difficulty: Medium, tagged: Hash Table, Two Pointers, Sliding Window, BFS/DFS, Heap
  • Grind 75 list — focused 75 problems covering all patterns

System Design:

  • Designing Data-Intensive Applications by Kleppmann — the bible
  • ByteByteGo — visual explanations
  • High Scalability blog — real architecture stories

Python Backend:

  • FastAPI official docs
  • SQLAlchemy 2.0 docs
  • Architecture Patterns with Python (O'Reilly)

This document was created with ❤️ for Python Developers who want to crack backend interviews, by Anubhaw. Feel free to suggest improvements or connect @ ContactMe