Skip to content

Celery & Celery Beat#

Installation#

pip install celery redis
# Or for RabbitMQ
pip install celery

Setup#

# myproject/celery.py
import os
from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')

app = Celery('myproject')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()

# myproject/__init__.py
from .celery import app as celery_app

__all__ = ('celery_app',)
# settings.py
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'UTC'

# Celery Beat (periodic tasks)
CELERY_BEAT_SCHEDULE = {
    'send-daily-newsletter': {
        'task': 'myapp.tasks.send_daily_newsletter',
        'schedule': crontab(hour=8, minute=0),  # 8 AM daily
    },
    'cleanup-old-sessions': {
        'task': 'myapp.tasks.cleanup_old_sessions',
        'schedule': crontab(hour=2, minute=0),  # 2 AM daily
    },
    'update-post-stats': {
        'task': 'myapp.tasks.update_post_stats',
        'schedule': 60.0,  # Every 60 seconds
    },
}

Basic Task#

# myapp/tasks.py
from celery import shared_task
from django.core.mail import send_mail

@shared_task
def send_email_task(subject, message, recipient):
    send_mail(
        subject,
        message,
        'from@example.com',
        [recipient],
        fail_silently=False,
    )
    return f'Email sent to {recipient}'

# Call task
from myapp.tasks import send_email_task

# Async (returns immediately)
result = send_email_task.delay('Subject', 'Message', 'user@example.com')

# Sync (waits for completion)
result = send_email_task('Subject', 'Message', 'user@example.com')

Task with Model#

# myapp/tasks.py
from celery import shared_task
from .models import Post

@shared_task
def publish_scheduled_posts():
    """Publish posts scheduled for now"""
    from django.utils import timezone
    posts = Post.objects.filter(
        is_published=False,
        scheduled_publish_at__lte=timezone.now()
    )
    count = posts.update(is_published=True)
    return f'Published {count} posts'

# Schedule in settings.py
CELERY_BEAT_SCHEDULE = {
    'publish-scheduled-posts': {
        'task': 'myapp.tasks.publish_scheduled_posts',
        'schedule': 300.0,  # Every 5 minutes
    },
}

Periodic Tasks (Celery Beat)#

Crontab Schedule#

# myapp/tasks.py
from celery import shared_task
from celery.schedules import crontab

@shared_task
def send_daily_newsletter():
    """Send daily newsletter to subscribers"""
    from .models import Subscriber
    subscribers = Subscriber.objects.filter(is_active=True)

    for subscriber in subscribers:
        send_email_task.delay(
            'Daily Newsletter',
            'Your daily news...',
            subscriber.email
        )

    return f'Sent newsletter to {subscribers.count()} subscribers'

# settings.py
from celery.schedules import crontab

CELERY_BEAT_SCHEDULE = {
    'send-daily-newsletter': {
        'task': 'myapp.tasks.send_daily_newsletter',
        'schedule': crontab(hour=8, minute=0),  # 8 AM daily
    },
    'weekly-report': {
        'task': 'myapp.tasks.send_weekly_report',
        'schedule': crontab(hour=9, minute=0, day_of_week=1),  # Monday 9 AM
    },
}

Interval Schedule#

# settings.py
from celery.schedules import timedelta

CELERY_BEAT_SCHEDULE = {
    'update-cache': {
        'task': 'myapp.tasks.update_cache',
        'schedule': timedelta(minutes=30),  # Every 30 minutes
    },
    'sync-external-api': {
        'task': 'myapp.tasks.sync_external_api',
        'schedule': timedelta(hours=1),  # Every hour
    },
}

Solar Schedule#

# settings.py
from celery.schedules import solar

CELERY_BEAT_SCHEDULE = {
    'sunset-task': {
        'task': 'myapp.tasks.sunset_task',
        'schedule': solar('sunset', 40.7128, -74.0060),  # NYC coordinates
    },
}

Task Retry#

# myapp/tasks.py
from celery import shared_task
from celery.exceptions import Retry

@shared_task(bind=True, max_retries=3)
def process_payment(self, order_id):
    try:
        # Process payment
        order = Order.objects.get(id=order_id)
        # ... payment logic
    except Exception as exc:
        # Retry after 60 seconds
        raise self.retry(exc=exc, countdown=60)

Task Chaining#

# myapp/tasks.py
from celery import chain

# Execute tasks in sequence
workflow = chain(
    process_image.s(image_id),
    create_thumbnail.s(),
    send_notification.s()
)
result = workflow()

Task Groups#

# myapp/tasks.py
from celery import group

# Execute tasks in parallel
job = group(
    send_email_task.s('user1@example.com'),
    send_email_task.s('user2@example.com'),
    send_email_task.s('user3@example.com'),
)
result = job.apply_async()

Running Celery#

# Start Celery worker
celery -A myproject worker --loglevel=info

# Start Celery Beat (scheduler)
celery -A myproject beat --loglevel=info

# Or both in one command
celery -A myproject worker --beat --loglevel=info

# With auto-reload (development)
celery -A myproject worker --beat --loglevel=info --reload

Django Integration#

# views.py
from myapp.tasks import send_email_task

def contact_view(request):
    if request.method == 'POST':
        # Send email asynchronously
        send_email_task.delay(
            'Contact Form',
            request.POST['message'],
            'admin@example.com'
        )
        return redirect('success')

Monitoring#

# Install Flower (Celery monitoring)
pip install flower

# Run Flower
celery -A myproject flower

# Access at http://localhost:5555

Best Practices#

  1. Use @shared_task - Works across apps
  2. Idempotent tasks - Can be safely retried
  3. Use result backend - Store task results
  4. Set timeouts - Prevent hanging tasks
  5. Monitor tasks - Use Flower or similar
  6. Test locally - Use CELERY_TASK_ALWAYS_EAGER=True
# settings.py (development)
CELERY_TASK_ALWAYS_EAGER = True  # Run tasks synchronously

Complete! Review Getting Started for Django basics.