آموزش کار با celery در پایتون

April 2021


همه افراد جامعه پایتون حداقل یک بار در مورد celery چیزی شنیده اند و شاید حتی قبلاً با آن کار کرده باشند. اساساً ، این یک ابزار مفید است که به شما کمک می کند تا کدهای به تعویق افتاده یا اختصاصی را در یک فرآیند جداگانه یا حتی در یک رایانه یا سرور جداگانه اجرا کنید. این باعث صرفه جویی در وقت و تلاش در بسیاری از سطوح می شود.

 

مقدمه ای بر راهنمای Python سلری

celery با اجرای بخشی از عملکرد به عنوان کارهای به تعویق افتاده یا در همان سرور سایر کارها یا در سرور دیگری ، بار عملکرد را کاهش می دهد. معمولاً توسعه دهندگان از آن برای ارسال ایمیل استفاده می کنند. با این وجود کرفس چیزهای بیشتری برای ارائه دارد. در این مقاله ، من برخی از اصول celery، و همچنین چند مورد از بهترین روش های پایتون-سلری را به شما نشان خواهم داد.

 

مقدمات سلری

اگر قبلاً با سلری کار کرده اید ، از این فصل صرف نظر کنید. اما اگر سلری برای شما جدید است ، در اینجا خواهید آموخت که چگونه سلری را در پروژه خود فعال کنید و در یک آموزش جداگانه در مورد استفاده از سلری با جنگو شرکت کنید. در واقع ، شما باید یک نمونه Celery ایجاد کنید و از آن برای علامت گذاری توابع پایتون به عنوان وظایف استفاده کنید.

 

بهتر است نمونه را در یک فایل جداگانه ایجاد کنید ، زیرا لازم است Celery را به همان روشی که با WSGI در جنگو کار می کند ، اجرا کنید. به عنوان مثال ، اگر دو نمونه Flask و Celery را در یک فایل در برنامه Flask ایجاد کرده و آن را اجرا کنید ، دو نمونه خواهید داشت اما فقط از یک مورد استفاده می کنید. هنگام اجرای سلری نیز همینطور است.

 

نمونه های اولیه سلری پایتون

همانطور که قبلاً اشاره کردم ، مورد اصلی استفاده از سلری ارسال ایمیل است. من از این مثال برای نشان دادن اصول استفاده از سلری استفاده خواهم کرد. در اینجا یک آموزش سریع Celery Python آورده شده است:

from django.conf import settings
from django.core.mail import send_mail
from django.template import Engine, Context

from myproject.celery import app


def render_template(template, context):
    engine = Engine.get_default()

    tmpl = engine.get_template(template)

    return tmpl.render(Context(context))
    

@app.task
def send_mail_task(recipients, subject, template, context):
    send_mail(
        subject=subject,
        message=render_template(f'{template}.txt', context),
        from_email=settings.DEFAULT_FROM_EMAIL,
        recipient_list=recipients,
        fail_silently=False,
        html_message=render_template(f'{template}.html', context)
    )

این کد از Django استفاده می کند ، زیرا چارچوب اصلی ما برای برنامه های وب است. با استفاده از سلری، زمان پاسخ به مشتری را کاهش می دهیم ، زیرا فرایند ارسال را از کد اصلی مسئول پاسخ دادن جدا می کنیم.

ساده ترین راه برای اجرای این کار فراخوانی روش تاخیر عملکرد است که توسط دکوریتور app.task ارائه شده است.

send_mail_task.delay(('noreply@example.com', ), 'Celery cookbook test', 'test', {})

نه تنها این - سلری مزایای بیشتری دارد. به عنوان مثال ، ما می توانیم دوباره در صورت عدم موفقیت کد را اجرا کنیم:

@celery_app.task(bind=True, default_retry_delay=10 * 60)
def send_mail_task(self, recipients, subject, template, context):
    message = render_template(f'{template}.txt', context)
    html_message = render_template(f'{template}.html', context)
    try:
        send_mail(
            subject=subject,
            message=message,
            from_email=settings.DEFAULT_FROM_EMAIL,
            recipient_list=recipients,
            fail_silently=False,
            html_message=html_message
        )
    except smtplib.SMTPException as ex:
        self.retry(exc=ex)

در صورت عدم موفقیت در ارسال ، این کار پس از ده دقیقه دوباره راه اندازی می شود. همچنین ، می توانید تعداد تلاش های مجدد را تنظیم کنید.

برخی از شما ممکن است تعجب کند که چرا من ارائه الگو را خارج از تماس send_mail منتقل کردم. به این دلیل است که ما فراخوانی send_mail را به try/catch بسته بندی می کنیم و بهتر است تا آنجا که ممکن است کد کمتری در try/catch داشته باشیم.

 

سلری برای کاربران پیشرفته

 

وظایف برنامه ریزی شده سلری جنگو

سلری امکان اجرای وظایف توسط برنامه ریزانی مانند crontab در لینوکس را فراهم می کند.

اول از همه ، اگر می خواهید از کارهای دوره ای استفاده کنید ، باید کارگر سلری را با پرچم beat اجرا کنید ، در غیر این صورت سلری برنامه ریز را نادیده می گیرد. قدم بعدی شما ایجاد پیکربندی است که می گوید چه کاری باید انجام شود و چه زمانی. در اینجا یک مثال آورده شده است:

from celery.schedules import crontab


CELERY_BEAT_SCHEDULE = {
    'monday-statistics-email': {
        'task': 'myproject.apps.statistics.tasks.monday_email',
        'schedule': crontab(day_of_week=1, hour=7),
    },
}

* اگر از جنگو استفاده نمی کنید ، باید از celery_app.conf.beat_schedule به جای CELERY_BEAT_SCHEDULE استفاده کنید.

 

آنچه در این پیکربندی داریم تنها یک وظیفه است که هر دوشنبه ساعت 7 صبح اجرا می شود. کلید ریشه یک نام یا یک cranjob است نه یک کار.

شما می توانید آرگومان هایی را به وظایف اضافه کنید و انتخاب کنید چه کاری باید انجام شود در صورتی که همان کار در زمان های مختلف با استدلال های مختلف اجرا شود. روش crontab از نحو crontab سیستم پشتیبانی می کند - مانند crontab (دقیقه = '* / 15') - برای اجرای کار هر 15 دقیقه.

 

به تعویق انداختن اجرای کار در سلری

همچنین می توانید قبل از اجرا ، در صف Python Celery وظایف را تعیین کنید. (به عنوان مثال ، هنگامی که پس از اقدام به ارسال اعلان نیاز دارید.) برای انجام این کار ، از روش apply_async با استدلال eta یا شمارش معکوس استفاده کنید.

etc انجام وظیفه در زمان دقیق
countdown کار را در N ثانیه اجرا کنید

در مثال اول ، ایمیل در 15 دقیقه ارسال می شود ، در حالی که در نمونه دوم در ساعت 7 صبح در تاریخ 20 مه ارسال می شود. بیایید بررسی کنیم که کد آن به چه صورت است:

from datetime import datetime


send_mail_task.apply_async(
    (('noreply@example.com', ), 'Celery cookbook test', 'test', {}),
    countdown=15 * 60
)

send_mail_task.apply_async(
    (('noreply@example.com', ), 'Celery cookbook test', 'test', {}),
    eta=datetime(2019, 5, 20, 7, 0)

 

تنظیم صف های سلری پایتون

سلری زمانی توزیع می شود که چندین کارگر در سرورهای مختلف داشته باشید و از یک پیام برای برنامه ریزی کار استفاده می کنند. می توانید یک صف اضافی برای کار / کارگر خود پیکربندی کنید. به عنوان مثال ، ارسال ایمیل قسمت مهمی از سیستم شماست و نمی خواهید هیچ کار دیگری روی ارسال تأثیر بگذارد. سپس می توانید یک صف جدید اضافه کنید ، بگذارید آن را ایمیل بنامیم و از این صف برای ارسال ایمیل استفاده کنیم.

 

CELERY_TASK_ROUTES = {
    'myproject.apps.mail.tasks.send_mail_task': {'queue': 'mail', },
}

* اگر از جنگو استفاده نمی کنید ، به جای CELERY_TASK_ROUTES از celery_app.conf.task_routes استفاده کنید

دو کارگر کرفس جداگانه را برای صف پیش فرض و صف جدید اجرا کنید:

celery -A myproject worker -l info -Q celery
celery -A myproject worker -l info -Q mail

خط اول کارگر را برای صف پیش فرض به نام celery اجرا می کند و خط دوم کارگر را برای صف mail اجرا می کند. می توانید از اولین کارگر بدون استدلال -Q استفاده کنید ، سپس این کارگر از همه صف های پیکربندی شده استفاده می کند.

 

وظایف طولانی مدت سلری Python

گاهی اوقات ، برای گذر از سوابق پایگاه داده و انجام برخی عملیات ها ، مجبورم با وظایفی که نوشته شده است کنار بیایم. اغلب اوقات ، توسعه دهندگان در مورد رشد داده ها فراموش می کنند ، که می تواند منجر به طولانی شدن زمان کار طولانی شود. همیشه بهتر است کارهایی از این قبیل را به روشی بنویسید که امکان کار با تکه های داده را فراهم کند. ساده ترین راه افزودن جبران و محدود کردن پارامترها به یک کار است. به شما این امکان را می دهد که اندازه تکه را نشانگر ، و مکان نما را برای بدست آوردن یک تکه داده جدید نشان دهید.

@celery_app.task
def send_good_morning_mail_task(offset=0, limit=100):
    users = User.objects.filter(is_active=True).order_by('id')[offset:offset + limit]
    for user in users:
        send_good_morning_mail(user)

    if len(users) >= limit:
        send_good_morning_mail_task.delay(offset + limit, limit)

این یک مثال بسیار ساده از چگونگی اجرای یک وظیفه مانند این است. در پایان کار ، بررسی می کنیم که چه تعداد کاربر را در پایگاه داده پیدا کرده ایم. اگر این عدد برابر با حد باشد ، احتمالاً باید کاربران جدیدی را برای پردازش داشته باشیم. بنابراین ما کار را با جبران جدید دوباره اجرا می کنیم. اگر تعداد کاربر کمتر از حد مجاز باشد ، به این معنی است که آخرین قطعه است و دیگر لازم نیست ادامه دهیم. مراقب باشید ، هرچند: این پیاده سازی وظیفه باید هربار ترتیب یکسانی برای سوابق داشته باشد.

 

سلری: گرفتن نتایج کار

بیشتر توسعه دهندگان نتایجی را که پس از اجرای کار به دست می آورند ثبت نمی کنند. تصور کنید که می توانید به محض دریافت درخواست کاربر ، بخشی از کد را بردارید ، آن را به یک کار اختصاص دهید و این کار را به طور مستقل اجرا کنید. وقتی به نتایج کار نیاز داریم ، یا نتایج را بلافاصله دریافت می کنیم (اگر کار تکمیل شده باشد) ، یا منتظر می مانیم تا کامل شود. سپس نتیجه را به پاسخ کلی وارد می کنیم. با استفاده از این روش می توانید زمان پاسخ را کاهش دهید که برای کاربران و رتبه سایت شما بسیار مناسب است.

 

ما برای اجرای عملیات همزمان از این ویژگی استفاده می کنیم. در یکی از پروژه های ما ، داده های کاربر زیادی و ارائه دهندگان خدمات زیادی داریم. برای یافتن بهترین ارائه دهنده خدمات ، محاسبات و بررسی های سنگینی را انجام می دهیم. برای انجام سریعتر این کار ، با هر ارائه دهنده خدمات وظایفی را برای کاربر ایجاد می کنیم ، آنها را اجرا می کنیم و نتایج را جمع آوری می کنیم تا به کاربر نشان دهیم. انجام آن با گروه های کاری کرفس بسیار آسان است.

 

from celery import group

@celery_app.task
def calculate_service_provider_task(user_id, provider_id):
    user = User.objects.get(pk=user_id)
    provider = ServiceProvider.objects.get(pk=provider_id)

    return calculate_service_provider(user, provider)


@celery_app.task
def find_best_service_provider_for_user(user_id):
    user = User.objects.get(pk=user_id)
    providers = ServiceProvider.objects.related_to_user(user)

    calc_group = group([
        calculate_service_provider_task.s(user.pk, provider.pk)
        for provider in providers
    ]).apply_async()

    return calc_group

اول ، چرا ما حتی دو کار را اجرا می کنیم؟ ما از وظیفه دوم برای تشکیل گروه های وظیفه محاسبه ، راه اندازی و برگرداندن آنها استفاده می کنیم. علاوه بر این ، وظیفه دوم جایی است که می توانید فیلتراسیون پروژه را تعیین کنید - مانند ارائه دهندگان خدماتی که باید برای یک کاربر مشخص محاسبه شود. تمام اینها را می توان در حالی انجام داد که کرفس کارهای دیگری انجام می دهد. وقتی گروه وظیفه برمی گردد ، نتیجه اولین کار در واقع محاسبه ای است که ما به آن علاقه داریم.

 

در اینجا مثالی از نحوه استفاده از این روش در کد آورده شده است:

def view(request):
    find_job = find_best_service_provider_for_user.delay(request.user.pk)

    # do other stuff

    calculations_results = find_job.get().join()

    # process calculations_results and send response

در اینجا ، ما محاسبات را در اسرع وقت انجام می دهیم ، منتظر نتایج در انتهای روش هستیم ، سپس پاسخ را آماده می کنیم و برای کاربر ارسال می کنیم.

 

نکات مفید

داده های کوچک

من احتمالاً قبلاً ذکر کرده ام که من از شناسه های رکورد پایگاه داده به عنوان آرگومان کار به جای اشیا full کامل استفاده می کنم این روش خوبی برای کاهش اندازه صف پیام است. اما آنچه مهمتر این است که هنگام اجرای یک کار ، داده های موجود در پایگاه داده قابل تغییر هستند. و هنگامی که فقط شناسه داشته باشید ، داده های تازه ای دریافت خواهید کرد در مقایسه با داده های منسوخ شده هنگام انتقال اشیا.

 

معاملات

گاهی اوقات ، هنگامی که یک کار اجرا شده نمی تواند شی ای را در پایگاه داده پیدا کند ، ممکن است مشکلاتی ایجاد شود. چرا این اتفاق می افتد؟ به عنوان مثال ، در جنگو ، می خواهید پس از ثبت نام کاربر ، مانند ارسال ایمیل سلام و تنظیمات جنگو ، همه درخواست ها را در یک معامله انجام دهید. با این وجود ، در کرفس ، کارها سریع انجام می شوند ، قبل از اینکه معامله حتی تمام شود. بنابراین اگر هنگام کار در Django از کرفس استفاده می کنید ، ممکن است مشاهده کنید که کاربر هنوز در پایگاه داده وجود ندارد.

 

برای مقابله با این موضوع ، می توانید "پیاده سازی معامله وظیفه" Google را انجام دهید. به طور کلی ، این یک روش اعمال شده_async بازنویسی شده در کلاس است ، یک کلاس است که یک کار را در سیگنال تراکنش.

 

نتیجه

همانطور که می بینید ، سلری کاربردهای بسیار بیشتری از ارسال ایمیل دارد. با استفاده از فرآیند اصلی می توانید همزمان کارهای مختلفی را اجرا کنید و در حالی که وظیفه خود را انجام می دهید ، سلری کارهای کوچکتر را که در دست دارید انجام می دهد. می توانید صف تنظیم کنید ، با تکه های داده در کارهای طولانی مدت کار کنید و زمانهایی را برای انجام وظایف خود تنظیم کنید. این به شما امکان می دهد که برای پیشرفت کار خود بهتر برنامه ریزی کنید ، زمان توسعه را با کارآیی بیشتر برنامه ریزی کنید و وقت گرانبهای خود را صرف کارهای بزرگتر کنید در حالی که گروه های کاری کرفس جادوی خود را انجام می دهند.

مقالات مرتبط

deploying django projects

add custom buttons to django change form

django abstract models

convert django form errors to farsi