Celery & Celery Beat#
Installation#
pip install celery redis
# Or for RabbitMQ
pip install celery
Setup#
# myproject/celery.py
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
app = Celery('myproject')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
# myproject/__init__.py
from .celery import app as celery_app
__all__ = ('celery_app',)
# settings.py
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'UTC'
# Celery Beat (periodic tasks)
CELERY_BEAT_SCHEDULE = {
'send-daily-newsletter': {
'task': 'myapp.tasks.send_daily_newsletter',
'schedule': crontab(hour=8, minute=0), # 8 AM daily
},
'cleanup-old-sessions': {
'task': 'myapp.tasks.cleanup_old_sessions',
'schedule': crontab(hour=2, minute=0), # 2 AM daily
},
'update-post-stats': {
'task': 'myapp.tasks.update_post_stats',
'schedule': 60.0, # Every 60 seconds
},
}
Basic Task#
# myapp/tasks.py
from celery import shared_task
from django.core.mail import send_mail
@shared_task
def send_email_task(subject, message, recipient):
send_mail(
subject,
message,
'from@example.com',
[recipient],
fail_silently=False,
)
return f'Email sent to {recipient}'
# Call task
from myapp.tasks import send_email_task
# Async (returns immediately)
result = send_email_task.delay('Subject', 'Message', 'user@example.com')
# Sync (waits for completion)
result = send_email_task('Subject', 'Message', 'user@example.com')
Task with Model#
# myapp/tasks.py
from celery import shared_task
from .models import Post
@shared_task
def publish_scheduled_posts():
"""Publish posts scheduled for now"""
from django.utils import timezone
posts = Post.objects.filter(
is_published=False,
scheduled_publish_at__lte=timezone.now()
)
count = posts.update(is_published=True)
return f'Published {count} posts'
# Schedule in settings.py
CELERY_BEAT_SCHEDULE = {
'publish-scheduled-posts': {
'task': 'myapp.tasks.publish_scheduled_posts',
'schedule': 300.0, # Every 5 minutes
},
}
Periodic Tasks (Celery Beat)#
Crontab Schedule#
# myapp/tasks.py
from celery import shared_task
from celery.schedules import crontab
@shared_task
def send_daily_newsletter():
"""Send daily newsletter to subscribers"""
from .models import Subscriber
subscribers = Subscriber.objects.filter(is_active=True)
for subscriber in subscribers:
send_email_task.delay(
'Daily Newsletter',
'Your daily news...',
subscriber.email
)
return f'Sent newsletter to {subscribers.count()} subscribers'
# settings.py
from celery.schedules import crontab
CELERY_BEAT_SCHEDULE = {
'send-daily-newsletter': {
'task': 'myapp.tasks.send_daily_newsletter',
'schedule': crontab(hour=8, minute=0), # 8 AM daily
},
'weekly-report': {
'task': 'myapp.tasks.send_weekly_report',
'schedule': crontab(hour=9, minute=0, day_of_week=1), # Monday 9 AM
},
}
Interval Schedule#
# settings.py
from celery.schedules import timedelta
CELERY_BEAT_SCHEDULE = {
'update-cache': {
'task': 'myapp.tasks.update_cache',
'schedule': timedelta(minutes=30), # Every 30 minutes
},
'sync-external-api': {
'task': 'myapp.tasks.sync_external_api',
'schedule': timedelta(hours=1), # Every hour
},
}
Solar Schedule#
# settings.py
from celery.schedules import solar
CELERY_BEAT_SCHEDULE = {
'sunset-task': {
'task': 'myapp.tasks.sunset_task',
'schedule': solar('sunset', 40.7128, -74.0060), # NYC coordinates
},
}
Task Retry#
# myapp/tasks.py
from celery import shared_task
from celery.exceptions import Retry
@shared_task(bind=True, max_retries=3)
def process_payment(self, order_id):
try:
# Process payment
order = Order.objects.get(id=order_id)
# ... payment logic
except Exception as exc:
# Retry after 60 seconds
raise self.retry(exc=exc, countdown=60)
Task Chaining#
# myapp/tasks.py
from celery import chain
# Execute tasks in sequence
workflow = chain(
process_image.s(image_id),
create_thumbnail.s(),
send_notification.s()
)
result = workflow()
Task Groups#
# myapp/tasks.py
from celery import group
# Execute tasks in parallel
job = group(
send_email_task.s('user1@example.com'),
send_email_task.s('user2@example.com'),
send_email_task.s('user3@example.com'),
)
result = job.apply_async()
Running Celery#
# Start Celery worker
celery -A myproject worker --loglevel=info
# Start Celery Beat (scheduler)
celery -A myproject beat --loglevel=info
# Or both in one command
celery -A myproject worker --beat --loglevel=info
# With auto-reload (development)
celery -A myproject worker --beat --loglevel=info --reload
Django Integration#
# views.py
from myapp.tasks import send_email_task
def contact_view(request):
if request.method == 'POST':
# Send email asynchronously
send_email_task.delay(
'Contact Form',
request.POST['message'],
'admin@example.com'
)
return redirect('success')
Monitoring#
# Install Flower (Celery monitoring)
pip install flower
# Run Flower
celery -A myproject flower
# Access at http://localhost:5555
Best Practices#
- ✅ Use @shared_task - Works across apps
- ✅ Idempotent tasks - Can be safely retried
- ✅ Use result backend - Store task results
- ✅ Set timeouts - Prevent hanging tasks
- ✅ Monitor tasks - Use Flower or similar
- ✅ Test locally - Use CELERY_TASK_ALWAYS_EAGER=True
# settings.py (development)
CELERY_TASK_ALWAYS_EAGER = True # Run tasks synchronously
Complete! Review Getting Started for Django basics.