I am using Celery to process background tasks in my application. Sometimes, I need to process asynchronous functions within those tasks, for which I use asyncio. And in this case, I also need to process an asynchronous function in a finally block. So my code is something like this:
@celery_app.task(name="charlie-search-engine-task", retry_backoff=True, acks_late=True, max_retries=5)
def search_process(
order_priority: str,
subject: str,
content: str,
comments: str,
attachment_names: str,
recipients: List[str],
user_ids: List[int],
department_ids: List[int],
current_user_id: int,
temporal_search_id: int
):
print(f"params: {order_priority}, {subject}, {content}, {comments}, {attachment_names}, {recipients}, {user_ids}, {department_ids}, {current_user_id}, {temporal_search_id}")
try:
"""
attachments will be send to searching elk if all params has been received
because it's meaning users are searching in all params
"""
search_params = SearchDefault(
order_priority=order_priority,
subject=subject,
content=content,
comments=comments,
attachment_names=attachment_names,
recipients=recipients,
user_ids=user_ids,
department_ids=department_ids,
current_user_id=current_user_id,
temporal_search_id=temporal_search_id
)
log.info(
f"[INFO]: A process of SEARCHING was started. PARAMS: {search_params.dict()}"
)
start = timeit.default_timer()
asyncio.run(execute_match_search(search_params=search_params))
stop = timeit.default_timer()
#log.info(f"Execution Time: {stop - start}")
except Exception as e:
log.error(
f"Error in task for searching with temporal_search_id: {temporal_search_id}"
)
log.error(e)
log.error(traceback.format_exc())
finally:
asyncio.run(deliver_message_service.emit_notification(
notifier=NOTIFIER,
action=ActionCodes.NO_MORE_RESULTS.value,
room=search_params.current_user_id,
channel="search-engine",
info={},
title=f"No more found for search {search_params.temporal_search_id}",
temporal_search_id=search_params.temporal_search_id,
))
In the test executions I have done I have not had any errors with the execute_match_search function, so when it finishes executing it goes to the finally block, where I get this error:
Event loop is closed
I have a feeling that maybe it is due to calling asyncio.run twice in the same function. Why does this happen? What options do I have to solve it?