Deze les behandelt het integreren van Celery, een asynchrone takenwachtrij, met een Flask webapplicatie, en hoe je logging gebruikt voor monitoring en debugging. Je leert hoe je complexe taken op de achtergrond kunt uitvoeren en hoe je problemen kunt opsporen.
Webapplicaties hebben vaak taken die lang duren, zoals het verwerken van grote bestanden, het verzenden van e-mails of het genereren van rapporten. Als je deze taken direct in de webserver uitvoert, blokkeert de applicatie en wordt de gebruikerservaring slecht. Celery is een krachtig open-source distributed task queue waarmee je deze taken asynchroon op de achtergrond kunt uitvoeren.
Voordelen van Celery:
Laten we Celery installeren en integreren in een Flask-project. Eerst installeren we de benodigde libraries via pip:
pip install Flask celery redis python-dotenv
Redis als Broker: Celery heeft een broker nodig om taken op te slaan en te beheren. We gebruiken Redis als onze broker (een snelle in-memory databas). Je moet Redis installeren op je systeem. Op Ubuntu/Debian:
sudo apt update
sudo apt install redis-server
Flask App Setup: Maak een Flask app, een celery.py
bestand voor de celery configuration en een .env
bestand voor de configuratie:
app.py
:
import os
from flask import Flask
from dotenv import load_dotenv
load_dotenv()
app = Flask(__name__)
app.config['CELERY_BROKER_URL'] = os.environ.get('CELERY_BROKER_URL', 'redis://localhost:6379/0')
app.config['CELERY_RESULT_BACKEND'] = os.environ.get('CELERY_RESULT_BACKEND', 'redis://localhost:6379/0')
from . import celery
@app.route('/')
def index():
return "Hallo, Celery wereld!"
if __name__ == '__main__':
app.run(debug=True)
celery.py
:
import os
from celery import Celery
from dotenv import load_dotenv
load_dotenv()
CELERY_BROKER_URL = os.environ.get('CELERY_BROKER_URL', 'redis://localhost:6379/0')
CELERY_RESULT_BACKEND = os.environ.get('CELERY_RESULT_BACKEND', 'redis://localhost:6379/0')
celery_app = Celery('app', broker=CELERY_BROKER_URL, backend=CELERY_RESULT_BACKEND, include=['app.tasks'])
# Optional configuration, see the celery documentation.
celery_app.conf.update(
CELERY_TASK_SERIALIZER='json',
CELERY_RESULT_SERIALIZER='json',
CELERY_ACCEPT_CONTENT=['json'],
CELERY_TIMEZONE='Europe/Amsterdam', # Set timezone to your local timezone
CELERY_ENABLE_UTC=True, # Use UTC for consistency
)
.env
:
CELERY_BROKER_URL=redis://localhost:6379/0
CELERY_RESULT_BACKEND=redis://localhost:6379/0
Let op: Vergeet niet om een __init__.py
bestand in de map te maken waar app.py
en celery.py
zich bevinden.
Taken worden in Celery gedefinieerd als Python-functies die worden versierd met @celery_app.task
. Laten we een eenvoudige taak maken die een bericht print en een tijdje wacht:
app/tasks.py
: (Maak een map 'app' met een bestand 'tasks.py' in dezelfde map als app.py en celery.py)
import time
from celery import shared_task
@shared_task
def lange_taak(n):
print(f"Taak gestart, wacht {n} seconden...")
time.sleep(n)
print("Taak voltooid!")
return f"Taak voltooid na {n} seconden"
Taken Uitvoeren: Importeer de taak in app.py
en roep deze aan. Let op, we hoeven de functie niet rechtstreeks aan te roepen. We roepen de delay()
methode aan. Dit stuurt de taak naar de Celery broker.
app.py
:
import os
from flask import Flask
from dotenv import load_dotenv
from .tasks import lange_taak
load_dotenv()
app = Flask(__name__)
app.config['CELERY_BROKER_URL'] = os.environ.get('CELERY_BROKER_URL', 'redis://localhost:6379/0')
app.config['CELERY_RESULT_BACKEND'] = os.environ.get('CELERY_RESULT_BACKEND', 'redis://localhost:6379/0')
from . import celery
@app.route('/')
def index():
result = lange_taak.delay(5) # Stuur de taak naar de broker
return f"Taak gestart met taak-ID: {result.id}"
if __name__ == '__main__':
app.run(debug=True)
Celery Worker Opstarten: Open een terminal en navigeer naar de map met celery.py
en app.py
. Start de Celery worker:
celery -A app worker -l info
Nu, als je de Flask applicatie bezoekt in je browser, zul je zien dat de webpagina direct reageert. De lange taak wordt op de achtergrond uitgevoerd door de Celery worker, met een vertraging van 5 seconden.
Logging is cruciaal voor het debuggen en monitoren van taken. Celery biedt loggers die je kunt gebruiken in je taken. Laten we de lange_taak modificeren om logging te gebruiken:
app/tasks.py
:
import time
import logging
from celery import shared_task
logger = logging.getLogger(__name__)
@shared_task
def lange_taak(n):
logger.info(f"Taak gestart, wacht {n} seconden...")
time.sleep(n)
logger.warning("Taak bijna voltooid!")
logger.info("Taak voltooid!")
return f"Taak voltooid na {n} seconden"
Logging Configuratie: Configureer logging in je Celery config of via de command line flags van Celery. De meest voorkomende is -l info
(voor INFO logging). Je kan ook een log file definieren.
Celery Worker met Logging: Start de worker met de -l info
flag:
celery -A app worker -l info
Je ziet nu info messages in de console van de worker. Wanneer je de taak uitvoert via de webapp, zul je de logging output zien in de console van de worker.
Verschillende log levels (DEBUG, INFO, WARNING, ERROR, CRITICAL) helpen bij het beheren van de hoeveelheid informatie die je ziet. Gebruik DEBUG voor gedetailleerde informatie, INFO voor algemene status, WARNING voor potentiële problemen, ERROR voor fouten en CRITICAL voor ernstige problemen.
Voorbeeld: Voeg debug logging toe aan de taak:
app/tasks.py
:
import time
import logging
from celery import shared_task
logger = logging.getLogger(__name__)
@shared_task
def lange_taak(n):
logger.debug(f"DEBUG: Taak gestart, wacht {n} seconden...")
logger.info(f"INFO: Taak gestart, wacht {n} seconden...")
time.sleep(n)
logger.warning("WARNING: Taak bijna voltooid!")
logger.info("INFO: Taak voltooid!")
return f"Taak voltooid na {n} seconden"
Als je de worker start met -l debug
, zie je alle log entries. Met -l info
, zul je alleen INFO, WARNING, ERROR en CRITICAL messages zien.
Let op: Pas de log levels aan om de output te filteren en te focussen op relevante informatie tijdens het debuggen.
Explore advanced insights, examples, and bonus exercises to deepen understanding.
We hebben Celery geconfigureerd, taken gedefinieerd en uitgevoerd in de achtergrond, en logging toegepast voor monitoring. Deze verdiepende sessie bouwt voort op die fundamenten en duikt dieper in geavanceerde configuraties, best practices en real-world scenario's voor enterprise-level toepassingen.
Naast de basisconfiguratie zijn er cruciale instellingen voor enterprise-level toepassingen. Denk aan:
Voorbeeld: Stel je wilt een taak 'email_verzenden' limiteren tot 1000 emails per minuut. Je kunt dit in Celery configureren:
from celery import Celery
app = Celery('mijn_app', broker='amqp://user:password@localhost//')
app.conf.task_default_queue = 'default'
app.conf.task_routes = {
'mijn_app.tasks.email_verzenden': {'queue': 'email', 'rate_limit': '1000/m'}
}
Gebruik Celery flower of andere monitoring tools (zoals Prometheus en Grafana, geïntegreerd via Celery's exporter) om de performance te monitoren, queues te bekijken, taken te inspecteren en bottlenecks te identificeren. Regelmatige monitoring is essentieel voor het optimaliseren van de performance.
Tip: Stel alerts in op belangrijke metrics zoals queue lengte, taak wachttijd en failed tasks. Dit geeft je proactief inzicht in mogelijke problemen.
Configureer Celery om te werken met twee RabbitMQ-brokers (e.g., op verschillende servers). Test de failover door de primary broker te stoppen en te verifiëren dat taken nog steeds worden verwerkt.
Implementeer rate limiting op een bestaande Celery-taak (bijvoorbeeld het verzenden van een e-mail). Test de limieten door meerdere taken snel te starten.
In de echte wereld worden achtergrondprocessen met Celery op grote schaal gebruikt voor:
Enterprise-niveau implementaties vereisen vaak robuuste configuraties, monitoring en schaalbaarheid om de prestaties en betrouwbaarheid te garanderen.
Implementeer een taak die afhankelijk is van een andere taak (chaining). Test foutafhandeling in de chained taken, inclusief retry mechanismen en logging.
Maak een nieuwe Celery taak die twee getallen optelt en het resultaat logt. Maak de taak oproepbaar via een route in je Flask applicatie. Test verschillende combinaties van argumenten.
Wijzig de logging in je Celery taak om verschillende logniveaus (DEBUG, INFO, WARNING, ERROR) te gebruiken. Start de Celery worker met verschillende `-l` opties (bijvoorbeeld `debug`, `info`) en observeer de output. Wat is het effect van de verschillende logniveaus?
Maak een Celery taak die een divide-by-zero fout kan genereren. Voeg try-except blocks toe om deze fout te detecteren en log een ERROR message. Probeer de taak te activeren en observeer de log output.
Implementeer een systeem voor het verzenden van periodieke e-mails (bijvoorbeeld nieuwsbrieven) naar een database van gebruikers. Gebruik Celery voor het asynchroon verzenden van e-mails en logging om de status van de verzendingen te volgen. Overweeg om een scheduling mechanisme (bijvoorbeeld Celery beat) te gebruiken om de e-mails periodiek te verzenden.
Bereid je voor op het volgende gedeelte over Celery Beat, het scheduling van taken in Celery. Onderzoek hoe je taken periodiek kunt laten uitvoeren.
We're automatically tracking your progress. Sign up for free to keep your learning paths forever and unlock advanced features like detailed analytics and personalized recommendations.