**Integratie Celery met Flask en Logging

Deze les behandelt het integreren van Celery, een asynchrone takenwachtrij, met een Flask webapplicatie, en hoe je logging gebruikt voor monitoring en debugging. Je leert hoe je complexe taken op de achtergrond kunt uitvoeren en hoe je problemen kunt opsporen.

Learning Objectives

  • Configureer Celery in een Flask-omgeving.
  • Definieer en voer taken uit op de achtergrond met Celery.
  • Gebruik logging om informatie over de taken en hun status vast te leggen.
  • Pas logniveaus toe om de debug-output te beheren en te begrijpen.

Lesson Content

Introductie tot Achtergrondtaken met Celery

Webapplicaties hebben vaak taken die lang duren, zoals het verwerken van grote bestanden, het verzenden van e-mails of het genereren van rapporten. Als je deze taken direct in de webserver uitvoert, blokkeert de applicatie en wordt de gebruikerservaring slecht. Celery is een krachtig open-source distributed task queue waarmee je deze taken asynchroon op de achtergrond kunt uitvoeren.

Voordelen van Celery:

  • Responsiviteit: Je webserver blijft responsief, omdat lange taken niet meer blokkeren.
  • Schaalbaarheid: Celery kan eenvoudig worden geschaald door meer workers toe te voegen om het volume van taken te verwerken.
  • Betrouwbaarheid: Taken worden persistent opgeslagen, zodat ze niet verloren gaan bij server crashes.

Installatie en Configuratie van Celery en Flask

Laten we Celery installeren en integreren in een Flask-project. Eerst installeren we de benodigde libraries via pip:

pip install Flask celery redis python-dotenv

Redis als Broker: Celery heeft een broker nodig om taken op te slaan en te beheren. We gebruiken Redis als onze broker (een snelle in-memory databas). Je moet Redis installeren op je systeem. Op Ubuntu/Debian:

sudo apt update
sudo apt install redis-server

Flask App Setup: Maak een Flask app, een celery.py bestand voor de celery configuration en een .env bestand voor de configuratie:

app.py:

import os
from flask import Flask
from dotenv import load_dotenv

load_dotenv()

app = Flask(__name__)
app.config['CELERY_BROKER_URL'] = os.environ.get('CELERY_BROKER_URL', 'redis://localhost:6379/0')
app.config['CELERY_RESULT_BACKEND'] = os.environ.get('CELERY_RESULT_BACKEND', 'redis://localhost:6379/0')

from . import celery

@app.route('/')
def index():
    return "Hallo, Celery wereld!"

if __name__ == '__main__':
    app.run(debug=True)

celery.py:

import os
from celery import Celery
from dotenv import load_dotenv

load_dotenv()


CELERY_BROKER_URL = os.environ.get('CELERY_BROKER_URL', 'redis://localhost:6379/0')
CELERY_RESULT_BACKEND = os.environ.get('CELERY_RESULT_BACKEND', 'redis://localhost:6379/0')



celery_app = Celery('app', broker=CELERY_BROKER_URL, backend=CELERY_RESULT_BACKEND, include=['app.tasks'])

# Optional configuration, see the celery documentation.
celery_app.conf.update(
    CELERY_TASK_SERIALIZER='json',
    CELERY_RESULT_SERIALIZER='json',
    CELERY_ACCEPT_CONTENT=['json'],
    CELERY_TIMEZONE='Europe/Amsterdam',  # Set timezone to your local timezone
    CELERY_ENABLE_UTC=True, # Use UTC for consistency
)

.env:

CELERY_BROKER_URL=redis://localhost:6379/0
CELERY_RESULT_BACKEND=redis://localhost:6379/0

Let op: Vergeet niet om een __init__.py bestand in de map te maken waar app.py en celery.py zich bevinden.

Taken Definiëren en Uitvoeren

Taken worden in Celery gedefinieerd als Python-functies die worden versierd met @celery_app.task. Laten we een eenvoudige taak maken die een bericht print en een tijdje wacht:

app/tasks.py: (Maak een map 'app' met een bestand 'tasks.py' in dezelfde map als app.py en celery.py)

import time
from celery import shared_task

@shared_task
def lange_taak(n):
    print(f"Taak gestart, wacht {n} seconden...")
    time.sleep(n)
    print("Taak voltooid!")
    return f"Taak voltooid na {n} seconden"

Taken Uitvoeren: Importeer de taak in app.py en roep deze aan. Let op, we hoeven de functie niet rechtstreeks aan te roepen. We roepen de delay() methode aan. Dit stuurt de taak naar de Celery broker.

app.py:

import os
from flask import Flask
from dotenv import load_dotenv
from .tasks import lange_taak

load_dotenv()

app = Flask(__name__)
app.config['CELERY_BROKER_URL'] = os.environ.get('CELERY_BROKER_URL', 'redis://localhost:6379/0')
app.config['CELERY_RESULT_BACKEND'] = os.environ.get('CELERY_RESULT_BACKEND', 'redis://localhost:6379/0')

from . import celery

@app.route('/')
def index():
    result = lange_taak.delay(5) # Stuur de taak naar de broker
    return f"Taak gestart met taak-ID: {result.id}"

if __name__ == '__main__':
    app.run(debug=True)

Celery Worker Opstarten: Open een terminal en navigeer naar de map met celery.py en app.py. Start de Celery worker:

celery -A app worker -l info

Nu, als je de Flask applicatie bezoekt in je browser, zul je zien dat de webpagina direct reageert. De lange taak wordt op de achtergrond uitgevoerd door de Celery worker, met een vertraging van 5 seconden.

Logging in Celery Taken

Logging is cruciaal voor het debuggen en monitoren van taken. Celery biedt loggers die je kunt gebruiken in je taken. Laten we de lange_taak modificeren om logging te gebruiken:

app/tasks.py:

import time
import logging
from celery import shared_task

logger = logging.getLogger(__name__)

@shared_task
def lange_taak(n):
    logger.info(f"Taak gestart, wacht {n} seconden...")
    time.sleep(n)
    logger.warning("Taak bijna voltooid!")
    logger.info("Taak voltooid!")
    return f"Taak voltooid na {n} seconden"

Logging Configuratie: Configureer logging in je Celery config of via de command line flags van Celery. De meest voorkomende is -l info (voor INFO logging). Je kan ook een log file definieren.

Celery Worker met Logging: Start de worker met de -l info flag:

celery -A app worker -l info

Je ziet nu info messages in de console van de worker. Wanneer je de taak uitvoert via de webapp, zul je de logging output zien in de console van de worker.

Log Levels en Debugging

Verschillende log levels (DEBUG, INFO, WARNING, ERROR, CRITICAL) helpen bij het beheren van de hoeveelheid informatie die je ziet. Gebruik DEBUG voor gedetailleerde informatie, INFO voor algemene status, WARNING voor potentiële problemen, ERROR voor fouten en CRITICAL voor ernstige problemen.

Voorbeeld: Voeg debug logging toe aan de taak:

app/tasks.py:

import time
import logging
from celery import shared_task

logger = logging.getLogger(__name__)

@shared_task
def lange_taak(n):
    logger.debug(f"DEBUG: Taak gestart, wacht {n} seconden...")
    logger.info(f"INFO: Taak gestart, wacht {n} seconden...")
    time.sleep(n)
    logger.warning("WARNING: Taak bijna voltooid!")
    logger.info("INFO: Taak voltooid!")
    return f"Taak voltooid na {n} seconden"

Als je de worker start met -l debug, zie je alle log entries. Met -l info, zul je alleen INFO, WARNING, ERROR en CRITICAL messages zien.

Let op: Pas de log levels aan om de output te filteren en te focussen op relevante informatie tijdens het debuggen.

Deep Dive

Explore advanced insights, examples, and bonus exercises to deepen understanding.

Uitgebreide Leerinhoud: Achtergrondprocessen in Flask (Enterprise Level)

Uitgebreide Leerinhoud: Achtergrondprocessen in Flask (Enterprise Level) - Dag 7

Terugblik en Context

We hebben Celery geconfigureerd, taken gedefinieerd en uitgevoerd in de achtergrond, en logging toegepast voor monitoring. Deze verdiepende sessie bouwt voort op die fundamenten en duikt dieper in geavanceerde configuraties, best practices en real-world scenario's voor enterprise-level toepassingen.

Deep Dive: Celery Configuratie en Optimalisatie

Geavanceerde Celery Configuratie

Naast de basisconfiguratie zijn er cruciale instellingen voor enterprise-level toepassingen. Denk aan:

  • Broker Redundancy: Configureer meerdere brokers (e.g., RabbitMQ clusters) voor failover en hoge beschikbaarheid. Dit is cruciaal voor uptime in cruciale systemen. Stel je voor dat een Rabobank-achtige bank een storing in hun systemen heeft. Zoiets willen we voorkomen.
  • Result Backends: Gebruik een result backend (e.g., Redis, een database) om de resultaten van lange taken op te slaan. Dit is handig voor monitoring en retry-mechanismen.
  • Rate Limits: Beperk de snelheid waarmee taken worden uitgevoerd per queue of per taak type om overbelasting te voorkomen. Denk aan het beperken van e-mail verzendingen om spam te voorkomen.
  • Retry Mechanismen: Implementeer automatisch retry's voor falende taken met exponentiële backoff. Dit verhoogt de robuustheid.

Voorbeeld: Stel je wilt een taak 'email_verzenden' limiteren tot 1000 emails per minuut. Je kunt dit in Celery configureren:

                    
                    from celery import Celery

                    app = Celery('mijn_app', broker='amqp://user:password@localhost//')
                    app.conf.task_default_queue = 'default'
                    app.conf.task_routes = {
                        'mijn_app.tasks.email_verzenden': {'queue': 'email', 'rate_limit': '1000/m'}
                    }
                    
                

Celery Monitoring en Performance

Gebruik Celery flower of andere monitoring tools (zoals Prometheus en Grafana, geïntegreerd via Celery's exporter) om de performance te monitoren, queues te bekijken, taken te inspecteren en bottlenecks te identificeren. Regelmatige monitoring is essentieel voor het optimaliseren van de performance.

Tip: Stel alerts in op belangrijke metrics zoals queue lengte, taak wachttijd en failed tasks. Dit geeft je proactief inzicht in mogelijke problemen.

Bonus Oefeningen

Oefening 1: Redundante Broker Configuratie

Configureer Celery om te werken met twee RabbitMQ-brokers (e.g., op verschillende servers). Test de failover door de primary broker te stoppen en te verifiëren dat taken nog steeds worden verwerkt.

Oefening 2: Rate Limiting Implementatie

Implementeer rate limiting op een bestaande Celery-taak (bijvoorbeeld het verzenden van een e-mail). Test de limieten door meerdere taken snel te starten.

Real-World Connections

In de echte wereld worden achtergrondprocessen met Celery op grote schaal gebruikt voor:

  • E-commerce platforms: Het verwerken van bestellingen, genereren van facturen, en verzenden van notificaties. Denk aan de back-end van Bol.com.
  • Financiële systemen: Het verwerken van transacties, genereren van rapporten, en fraudedetectie.
  • Content Management Systems (CMS): Het genereren van thumbnails, converteren van bestanden, en indexering van content. Denk aan het publiceren van een artikel op een website, de afbeeldingen worden geoptimaliseerd in de achtergrond.
  • Automatisering van Marketing: Het verzenden van e-mailcampagnes, personaliseren van content, en analyseren van gebruikersgedrag.

Enterprise-niveau implementaties vereisen vaak robuuste configuraties, monitoring en schaalbaarheid om de prestaties en betrouwbaarheid te garanderen.

Challenge Yourself

Implementeer een taak die afhankelijk is van een andere taak (chaining). Test foutafhandeling in de chained taken, inclusief retry mechanismen en logging.

Verdere Verdieping

  • Celery Documentatie: Duik diep in de officiële Celery documentatie voor geavanceerde configuratie-opties en best practices.
  • Celery Flower: Leer meer over het gebruik van Celery Flower voor monitoring en beheer van Celery workers.
  • Asynchrone Messaging Architectuur: Verken de principes van asynchrone messaging en hoe deze principes van toepassing zijn op Celery en andere takenwachtrijen.
  • Integratie met andere tools: Onderzoek integratie met Monitoring tools (Prometheus, Grafana, Datadog) en Logging tools (ELK Stack, Splunk).

Interactive Exercises

Celery Taak met Argumenten

Maak een nieuwe Celery taak die twee getallen optelt en het resultaat logt. Maak de taak oproepbaar via een route in je Flask applicatie. Test verschillende combinaties van argumenten.

Logging Niveaus Aanpassen

Wijzig de logging in je Celery taak om verschillende logniveaus (DEBUG, INFO, WARNING, ERROR) te gebruiken. Start de Celery worker met verschillende `-l` opties (bijvoorbeeld `debug`, `info`) en observeer de output. Wat is het effect van de verschillende logniveaus?

Taak Foutafhandeling

Maak een Celery taak die een divide-by-zero fout kan genereren. Voeg try-except blocks toe om deze fout te detecteren en log een ERROR message. Probeer de taak te activeren en observeer de log output.

Knowledge Check

Question 1: Welke bibliotheek wordt gebruikt als broker voor Celery in deze les?

Question 2: Welke decorator gebruik je om een functie als een Celery taak te definiëren?

Question 3: Welke log level toont alle berichten, inclusief debug statements?

Question 4: Wat doet de `delay()` methode op een Celery taak?

Question 5: Welke van de volgende statements is correct over Celery?

Practical Application

Implementeer een systeem voor het verzenden van periodieke e-mails (bijvoorbeeld nieuwsbrieven) naar een database van gebruikers. Gebruik Celery voor het asynchroon verzenden van e-mails en logging om de status van de verzendingen te volgen. Overweeg om een scheduling mechanisme (bijvoorbeeld Celery beat) te gebruiken om de e-mails periodiek te verzenden.

Key Takeaways

Next Steps

Bereid je voor op het volgende gedeelte over Celery Beat, het scheduling van taken in Celery. Onderzoek hoe je taken periodiek kunt laten uitvoeren.

Your Progress is Being Saved!

We're automatically tracking your progress. Sign up for free to keep your learning paths forever and unlock advanced features like detailed analytics and personalized recommendations.

Complete Learning Path