From 2ebc4e27a469ac4b76ae0253882bfee94b4ed084 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Voron?= Date: Tue, 20 Jan 2026 14:26:35 +0100 Subject: [PATCH] fix(dramatiq): cleanup isolated scope and transaction when message is skipped The Dramatiq middleware was missing the [`after_skip_message` hook](https://dramatiq.io/reference.html#dramatiq.Middleware.after_skip_message), which is triggered in place of [`after_process_message`](https://dramatiq.io/reference.html#dramatiq.Middleware.after_skip_message) when [`SkipMessage`](https://dramatiq.io/reference.html#dramatiq.middleware.SkipMessage) is raised. Thus, the isolated scope and transaction that are opened in [`before_process_message`](https://dramatiq.io/reference.html#dramatiq.Middleware.before_process_message) were never closed; leading to a memory leak particularly visible in a context where lot of messages are skipped. The fix is simply to assign `after_skip_message` to `after_process_message`, so the teardown logic is triggered. This pattern is used in the [Dramatiq code base](https://github.com/Bogdanp/dramatiq/blob/143aaa228521606806a87daefff2d21f88607e70/dramatiq/middleware/time_limit.py#L97). --- sentry_sdk/integrations/dramatiq.py | 2 ++ tests/integrations/dramatiq/test_dramatiq.py | 30 ++++++++++++++++++-- 2 files changed, 30 insertions(+), 2 deletions(-) diff --git a/sentry_sdk/integrations/dramatiq.py b/sentry_sdk/integrations/dramatiq.py index ae87de7525..f954d4fb98 100644 --- a/sentry_sdk/integrations/dramatiq.py +++ b/sentry_sdk/integrations/dramatiq.py @@ -188,6 +188,8 @@ def after_process_message( transaction.__exit__(type(exception), exception, None) scope_manager.__exit__(type(exception), exception, None) + after_skip_message = after_process_message + def _make_message_event_processor( message: "Message[R]", integration: "DramatiqIntegration" diff --git a/tests/integrations/dramatiq/test_dramatiq.py b/tests/integrations/dramatiq/test_dramatiq.py index 3860ee61d9..a9d3966839 100644 --- a/tests/integrations/dramatiq/test_dramatiq.py +++ b/tests/integrations/dramatiq/test_dramatiq.py @@ -1,15 +1,16 @@ -import pytest import uuid import dramatiq +import pytest from dramatiq.brokers.stub import StubBroker +from dramatiq.middleware import Middleware, SkipMessage import sentry_sdk -from sentry_sdk.tracing import TransactionSource from sentry_sdk import start_transaction from sentry_sdk.consts import SPANSTATUS from sentry_sdk.integrations.dramatiq import DramatiqIntegration from sentry_sdk.integrations.logging import ignore_logger +from sentry_sdk.tracing import Transaction, TransactionSource ignore_logger("dramatiq.worker.WorkerThread") @@ -386,3 +387,28 @@ def dummy_actor(): worker.join() assert events == [] + + +@pytest.mark.parametrize("broker", [1.0], indirect=True) +def test_that_skip_message_cleans_up_scope_and_transaction( + broker, worker, capture_events +): + transactions: list[Transaction] = [] + + class SkipMessageMiddleware(Middleware): + def before_process_message(self, broker, message): + transactions.append(sentry_sdk.get_current_scope().transaction) + raise SkipMessage() + + broker.add_middleware(SkipMessageMiddleware()) + + @dramatiq.actor(max_retries=0) + def skipped_actor(): ... + + skipped_actor.send() + + broker.join(skipped_actor.queue_name) + worker.join() + + (transaction,) = transactions + assert transaction.timestamp is not None