Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Race condition at AdminMiddleware #154

Open
spumer opened this issue Sep 26, 2023 · 5 comments
Open

Race condition at AdminMiddleware #154

spumer opened this issue Sep 26, 2023 · 5 comments

Comments

@spumer
Copy link
Contributor

spumer commented Sep 26, 2023

Hi. We have a lot of tasks in ENQUEUED status but they executed and jobs well done.

I think we have small race condition at middleware callbacks:

  1. Scheduler emit before_enqueue
  2. Scheduler publish task to queue
  3. Worker got task from queue
  4. Worker emit before processing (update task status=RUNNING)
  5. Worker done task
  6. Worker emit after processing. (update task status=DONE)
  7. Scheduler emit afer_enqueue (update task status=ENQUEUED)

In logs this looks like this:
image

You can see different order for Create and Update logs. But last log is Update. Do not worry, message logged before SQL operation and this is not equal reflection of SQL updates.


So, i want make a PR to fix that. I see two solutions for that case:

  1. Optimistic locks: check row version at update and ignore if it changed (this require schema update)
  2. Do not allow change status from DONE/RUNNING to ENQUEUED. Check status when update. (control state flow)

@amureki what do you think about that?

@spumer
Copy link
Contributor Author

spumer commented Sep 27, 2023

Quick fix in our case is use get_or_create instead create_or_update in after_enqueue method

    def after_enqueue(self, broker, message, delay):
        from django_dramatiq.models import Task

        self.logger.debug('Creating Task from message %r.', message.message_id)
        status = Task.STATUS_ENQUEUED
        if delay:
            status = Task.STATUS_DELAYED

        Task.tasks.get_or_create(
            id=message.message_id,
            defaults={
                'message_data': message.encode(),
                'status': status,
                'actor_name': message.actor_name,
                'queue_name': message.queue_name,
            },
        )

@dapanin
Copy link

dapanin commented Feb 16, 2024

Сan I solve it this way?

    def before_enqueue(self, broker, message, delay):
        super().before_enqueue(broker, message, delay)
        super().after_enqueue(broker, message, delay)

    def after_enqueue(self, broker, message, delay):
        pass

@spumer
Copy link
Contributor Author

spumer commented Apr 5, 2024

finally we are fixed it by strict status transition flow:

class AdminMiddleware(django_dramatiq.middleware.AdminMiddleware):
    logger = logging.getLogger(__name__).getChild('AdminMiddleware')

    @cached_property
    def transitions(self):
        from django_dramatiq.models import Task

        # target_status -> [source_status, ...]
        transitions = {
            Task.STATUS_ENQUEUED: [  # Задача поступила в очередь на обработку
                None,  # Задачи не было, создали новую
                Task.STATUS_DELAYED,  # Перешло из DELAYED очереди
            ],
            Task.STATUS_RUNNING: [  # Worker взял задачу в работу
                Task.STATUS_ENQUEUED,
                Task.STATUS_DELAYED,
            ],
            Task.STATUS_FAILED: [  # Задача завершилась с ошибкой (н-р Exception)
                Task.STATUS_RUNNING,
            ],
            Task.STATUS_DELAYED: [  # Задача отложена
                # Задачу можно отложить из любого конечного статуса
                Task.STATUS_FAILED,
                Task.STATUS_SKIPPED,
                Task.STATUS_DONE,
            ],
        }
        return transitions

    def after_enqueue(self, broker, message, delay):
        from django_dramatiq.models import Task

        self.logger.debug('Creating Task from message %r.', message.message_id)
        target_status = Task.STATUS_ENQUEUED
        if delay:
            target_status = Task.STATUS_DELAYED

        obj, created = Task.tasks.get_or_create(
            id=message.message_id,
            defaults={
                'message_data': message.encode(),
                'status': target_status,
                'actor_name': message.actor_name,
                'queue_name': message.queue_name,
            },
        )
        if created:
            return

        source_status = obj.status
        if source_status in self.transitions[target_status]:
            self.logger.debug(
                'Update Task status for message %r: %s -> %s',
                message.message_id,
                source_status,
                target_status,
            )
            Task.tasks.filter(id=message.message_id, status=source_status).update(status=target_status)
        else:
            self.logger.debug(
                'Incorrect Task status transition for message %r: %s -> %s',
                message.message_id,
                source_status,
                target_status,
            )

@spumer
Copy link
Contributor Author

spumer commented Apr 5, 2024

@amureki if you agree with that i will make a PR

@amureki
Copy link
Collaborator

amureki commented Apr 17, 2024

@amureki if you agree with that i will make a PR

So far, your proposal seems logical to me. And if you already have it running and it resolves this race condition - then even better.

I am happy to review the patch. 👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants