اجرای مجدد تسک های ناموفق در سلری

امیرحسین بیگدلو 5 ماه قبل

در زمان کار با سلری، مدیریت تسک هایی که اجرای آنها ناموفق بوده بسیار مهم است. به طور کلی در دو حالت ممکن است یک تسک سلری با شکست مواجه شود:

  • خطا در زمان اتصال به broker یا صف
  • ایجاد شدن یک exception در کد

 

خوشبختانه، Celery ابزارها و گزینه‌های مناسبی را در اختیار ما قرار میدهد تا بتوانیم تسک هایی که با مشکل مواجه شده اند را دوباره اجرا کنیم. در این مقاله با چند روش پرکاربرد آشنا میشوید تا بتوانید یه شکل اتوماتیک تسک های شکست خورده را دوباره اجرا کنید.

 

 

 #  تلاش مجدد برای اتصال به بروکر با celery

اولین مشکلی که داریم اتصال به message broker است. در این حالت نمیتوانیم هیچ تسکی ارسال کنیم. این مشکل با فعال کردن retry=True در پیام و همچنین مشخص کردن یک retry-policy برای تعریف نحوه اجرای مجدد حل می شود.

 

توجه داشته باشید که این را می توان هم در task.apply_async و هم در celery.send_task اعمال کرد. در اینجا یک نمونه از این کد وجود دارد:

from tasks.celery import app

app.send_task(
    "foo.task",
    retry=True,
    retry_policy=dict(
        max_retries=3,
        interval_start=3,
        interval_step=1,
        interval_max=6
    )
)

 

در این قطعه کد مشخص کرده ایم که اگر اتصال قطع شود و نتوانیم پیام را به صف ارسال کنیم، سعی می کنیم 3 بار دیگر امتحان کنیم. اولین تلاش مجدد در interval_start ثانیه به معنای 3 ثانیه اتفاق می افتد. سپس هر شکست اضافی برای interval_step مقدار 1 ثانیه دیگر منتظر می ماند تا زمانی که مجدداً پیام را ارسال کند.

 

 

 #  اجرای مجدد تسک های ناموفق در سلری

نوع بعدی مشکلی که ممکن است به اجرای مجدد تسک ها نیاز داشته باشید، زمانی است که وظایف با شکست مواجه می‌شوند. توجه داشته باشید که این سناریو کاملاً متفاوت از سناریوی اول است. در حالت قبلی، ما موفق به ارسال پیام نمیشدیم اما در این حالت ما موفق به اجرای کامل تسک نمی شویم.

 

دلیل اینکه ممکن است یک تسک شکست بخورد معمولاً این است که یک مشکلی برای کارگرهای سلری اتفاق افتاده و یا یک exception ایجاد شده است. شاید اشکالی در کد وجود داشته باشد، برخی از سرویس‌ها به پایان رسیده یا تقاضای آن خیلی زیاد است.

 

فرض کنید کنید یک تسک سلری همانند کد زیر داریم:

@shared_task
def task_process_notification():
    if not random.choice([0, 1]):
        # mimic random error
        raise Exception()

    requests.post('https://httpbin.org/delay/5')

 

اگر اجرای این تسک شکست بخورد چند راه حل دارید:

 

 

 1  استفاده از بلوک try/except

می‌توانیم از بلوک try/except برای گرفتن استثنا و اجرای مجدد تسک استفاده کنیم:

@shared_task(bind=True)
def task_process_notification(self):
    try:
        if not random.choice([0, 1]):
            # mimic random error
            raise Exception()

        requests.post('https://httpbin.org/delay/5')
    except Exception as e:
        logger.error('exception raised, it would be retry after 5 seconds')
        raise self.retry(exc=e, countdown=5)

 

از آنجایی که در این تسک مقدار bind برابر با True است اولین آرگومانی که این فانکشن میگیرد(self) به خود تسک اشاره میکند. به همین دلیل میتوانید از self.try برای اجرا مجدد تسک استفاده کنید.

 

دقت کنید که حتما باید self.try را همراه با raise اجرا کنید.

 

با آرگومان countdown مشخص کردیم که 5 ثانیه بعد تسک دوباره اجرا شود.

 

 

 2  استفاده از دکوریتور retry

سلری نسخه 4 به طور رسمی از retry پشتیبانی کرد پس میتوانید با یک دکوریتور بعد از ایجاد شدن یک استثنا تسک را مجددا اجرا کنید:

@shared_task(bind=True, autoretry_for=(Exception,), retry_kwargs={'max_retries': 7, 'countdown': 5})
def task_process_notification(self):
    if not random.choice([0, 1]):
        # mimic random error
        raise Exception()

    requests.post('https://httpbin.org/delay/5')

 

با autoretry_for میتوانید لیست یا تاپلی از استثناهایی که میخواهید در زمان ایجاد شدن، تسک دوباره اجرا شود را مشخص کنید.

 

با دیکشنری retry_kwargs میتوانید آپشن های اضافی را برای اجرا مجدد تسک مشخص کنید. در کد بالا با استفاده از countdown بعد از 5 ثانیه تسک دوباره اجرا میشود. با max_retries مشخص کردیم که سلری تا 7 بار میتوانید یک تسک را اجرا کند و بعد از آن یک استثنا ایجاد کند.

مطالب مشابه



مونگارد