Don’t mutate objects inside Celery task

Recently, when I was trying to write a celery task I had an issue with mutating a dictionary (changing the dictionary values) when the retries are enabled for the task. When I searched through the internet, I couldn’t find any resources which were related to the problem. So I wanted to describe what exactly was the problem and how to avoid it.

Tl;dr Don’t modify the mutable datatype objects inside the celery tasks when the tasks could be retried. If you have to, first make a copy of the mutable object and make changes to the copy.

I was writing a code to store some logs data to a database for analytics . The incoming data needed to be slightly modified such as changing timestamp, calculating updated time, etc. before logging it to the DB. I wanted to do this job asynchronously and with retries so that the logging won’t fail. Since the application that I was working on had Python Celery setup and configured, I chose to create a task to complete this job. Following is an emulation of part of what the code does

In the file, we’re trying to save the travel details to a database. The celery task save_travel_details converts the epoch time to ISO format and calls save_to_db to save the data. In the save_to_db method, we’re just printing the data to terminal, but in production this would usually be a network call to a database which could fail due to lot of reasons such network failures, transaction rollbacks, DB overloads, etc. Now if we run the file typing python tasks.py, we can see the following output (install pytest, celery, pytest-celery if you’re getting import errors)

{'from': 'Hyderabad', 'to': 'Bangalore', 'flight_number': 'PQ126BN', 'time': '2021-06-12 15:00:00'}

Till here everything seems fine. But we haven’t tested what happens when the save_to_db call fails. To emulate the scenario, I used pytest to setup the environment and run the tests. (Alternatively, I could use a debugger to go through the code flow) Following is the code for the tests.

test_tasks.py

For the purpose of the discussion, I won’t go and explain about pytest, mocks, or patching. But essentially in the line 26 above, we’re making save_to_db to throw an exception. And another detail here is that retry method calls signature method internally to wrap arguments, task, and task options into a single Signature object. Later task scheduling calls (such as apply_async etc.) or made on the Signature object. Details about the signature method can be found here — https://docs.celeryproject.org/en/stable/reference/celery.html#celery.signature. We’re mocking the signature call to see if it being called and what arguments it is being called with.

Now when we run the first test using pytest -s -W ignore::DeprecationWarning -k test_save_travel_details_success we see the following output

Everything seems to be working fine. We would expect the second test to pass as well. When we run the second test, following is the (shortened) output

Well, that was not expected. When we look closely at the output of the test, we can see that the call to signature method was made but it was not with the parameters that we have expected. To see what arguments were passed to signature method, comment out the signature_mock.assert_called_with line and uncomment print(signature_mock.mock_calls) method and re-run the test. Here’s the output

Here we can see that the signature method was passed other parameters along with our data such task_id, options, etc.. That is to be expected. But when we look at our data, the timestamp has been changed from the epoch timestamp to ISO string which was passed to subsequent retry calls. So when we’re trying to convert this string to ISO string a second time, it would lead to an error TypeError: an integer is required (got type str)

This behavior is caused by the way celery schedules tasks. When a celery task is scheduled, the parameters are serialized and saved in the broker. When a worker is available to pickup the task, it will take the parameters from the broker and starts executing. If the task succeeds, the result is written back to the configured backend. But if the task fails due to any exception, we’ll have to call the retry method to retry the task later. We can limit the number of times a task is retries and present number of retry is sent as a context to the retry method. But either way, the parameters will be removed from the broker once the work acknowledges that it has received the task. The retry method checks if the number of configured retries is not exceeded and instead of the parameters it picked up from broker it will schedule another task with the mutated parameters and pushes it to the broker. So, if any mutable parameter is passed to the task and it is changed in the task, the next retry will get the updated parameters instead of the original parameter. This could cause unwanted side effects and might become difficult to debug because of the asynchronous nature of execution.

To solve the problem, let’s make a small change in the save_travel_details method. Now it should look like this

Here, instead of directly modifying the passed in array, we’re first making a copy of it and using that to make modifications on timestamp. We can alternatively pass only immutable parameters to the task (best practice but wouldn’t always be possible). Now let’s run the second test again and see the output.

Sure enough, the timestamp was sent as an epoch instead of ISO string to the signature method which shouldn’t cause any issues in the subsequent retry calls.

To conclude, don’t modify the mutable datatype objects inside celery tasks.

Luckily, we had the logging and monitoring enabled and configured in our staging servers because of which I was able to at least guess the problem and read about it. I hope this story will help those people who didn’t know something like this could happen. My suggestion, always use logging and monitoring to catch these kind of issues. Though it might take a little extra cost(in terms of money or time) it would save a lot of time and headache later.