Waarom kiezen voor maatwerk als er kant-en-klare oplossingen zijn?
- Flexibiliteit: Pas limieten aan voor jouw specifieke gebruikssituaties en gebruikersniveaus
- Prestaties: Optimaliseer voor jouw infrastructuur en verkeerspatronen
- Controle: Stem elk aspect van hoe je API-gebruik beheert fijn af
- Leren: Krijg diepere inzichten in het gedrag en gebruikspatronen van je API
Nu we op dezelfde golflengte zitten, laten we in de details duiken!
De Bouwstenen: Rate Limiting Algoritmen
In het hart van elke rate limiting oplossing liggen algoritmen. Laten we enkele populaire verkennen en zien hoe we ze kunnen implementeren:
1. Token Bucket Algoritme
Stel je een emmer voor die met een constante snelheid met tokens wordt gevuld. Elke API-aanroep verbruikt een token. Als de emmer leeg is, wordt de aanvraag geweigerd. Eenvoudig, maar effectief!
import time
class TokenBucket:
def __init__(self, capacity, fill_rate):
self.capacity = capacity
self.fill_rate = fill_rate
self.tokens = capacity
self.last_fill = time.time()
def consume(self, tokens):
now = time.time()
time_passed = now - self.last_fill
self.tokens = min(self.capacity, self.tokens + time_passed * self.fill_rate)
self.last_fill = now
if self.tokens >= tokens:
self.tokens -= tokens
return True
return False
# Gebruik
bucket = TokenBucket(capacity=100, fill_rate=10) # 100 tokens, vult 10 per seconde bij
if bucket.consume(1):
print("Aanvraag toegestaan")
else:
print("Rate limit overschreden")
2. Leaky Bucket Algoritme
Denk aan een emmer met een klein gaatje aan de onderkant. Aanvragen vullen de emmer, en ze "lekken" eruit met een constante snelheid. Als de emmer overloopt, worden binnenkomende aanvragen gedropt.
from collections import deque
import time
class LeakyBucket:
def __init__(self, capacity, leak_rate):
self.capacity = capacity
self.leak_rate = leak_rate
self.bucket = deque()
self.last_leak = time.time()
def add(self):
now = time.time()
self._leak(now)
if len(self.bucket) < self.capacity:
self.bucket.append(now)
return True
return False
def _leak(self, now):
leak_time = (now - self.last_leak) * self.leak_rate
while self.bucket and self.bucket[0] <= now - leak_time:
self.bucket.popleft()
self.last_leak = now
# Gebruik
bucket = LeakyBucket(capacity=5, leak_rate=0.5) # 5 aanvragen, lekt 1 elke 2 seconden
if bucket.add():
print("Aanvraag toegestaan")
else:
print("Rate limit overschreden")
3. Fixed Window Counter
Deze is eenvoudig: verdeel de tijd in vaste vensters en tel de aanvragen in elk venster. Reset de teller wanneer een nieuw venster begint.
import time
class FixedWindowCounter:
def __init__(self, window_size, max_requests):
self.window_size = window_size
self.max_requests = max_requests
self.current_window = time.time() // window_size
self.request_count = 0
def allow_request(self):
current_time = time.time()
window = current_time // self.window_size
if window > self.current_window:
self.current_window = window
self.request_count = 0
if self.request_count < self.max_requests:
self.request_count += 1
return True
return False
# Gebruik
counter = FixedWindowCounter(window_size=60, max_requests=100) # 100 aanvragen per minuut
if counter.allow_request():
print("Aanvraag toegestaan")
else:
print("Rate limit overschreden")
Implementeren in een API Gateway
Nu we onze algoritmen hebben, laten we eens kijken hoe we ze kunnen integreren in een API-gateway. We gebruiken FastAPI voor dit voorbeeld, maar het concept geldt ook voor andere frameworks.
from fastapi import FastAPI, Request, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from TokenBucket import TokenBucket
app = FastAPI()
# Voeg CORS-middleware toe
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Maak een rate limiter voor elke client
rate_limiters = {}
@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
client_ip = request.client.host
if client_ip not in rate_limiters:
rate_limiters[client_ip] = TokenBucket(capacity=100, fill_rate=10)
if not rate_limiters[client_ip].consume(1):
raise HTTPException(status_code=429, detail="Rate limit overschreden")
response = await call_next(request)
return response
@app.get("/")
async def root():
return {"message": "Hallo, rate-limited wereld!"}
Deze setup creëert een aparte rate limiter voor elk client IP, waardoor 100 aanvragen worden toegestaan met een bijvulsnelheid van 10 tokens per seconde.
Geavanceerde Technieken en Overwegingen
Als je je eigen rate limiting oplossing implementeert, houd dan deze punten in gedachten:
1. Gedistribueerde Rate Limiting
Wanneer je API op meerdere servers draait, heb je een manier nodig om rate limiting data te synchroniseren. Overweeg het gebruik van een gedistribueerde cache zoals Redis:
import redis
class DistributedRateLimiter:
def __init__(self, redis_url, key_prefix, limit, window):
self.redis = redis.from_url(redis_url)
self.key_prefix = key_prefix
self.limit = limit
self.window = window
def is_allowed(self, identifier):
key = f"{self.key_prefix}:{identifier}"
current = self.redis.get(key)
if current is None:
self.redis.set(key, 1, ex=self.window)
return True
elif int(current) < self.limit:
self.redis.incr(key)
return True