Years ago, we implemented Service Broker, and a year or so later added something like this (https://techcommunity.microsoft.com/t5/sql-server-blog/reusing-dialogs-with-a-dialog-pool/ba-p/383461) to improve performance. Now that volume has increased, (100+ sources calling the stored procedure, upwards of 1 million messages an hour) we are seeing where it will just slowly grind to a halt and the sql service needs to be restarted. If everything is working fine it can process 800k messages an hour, but during the slowdown it can drop down to 50k/hour.
The problem is somewhere in DialogPool and blocking/waits around accessing it, but I've forgotten enough about SB over the years that I no longer know where to start, to fix it (the "150 trick" comes to mind; we never implemented it, but its premise was to only use every 150th conversation, so that it would span pages; however, I don't remember where I'd do that in here, if you even would). The activated stored procedure is running 30 threads on a 32-core box.
I'm trying to figure out if there's a way to get this to not require multiple-times-a-day restarts. Microsoft seemed at a loss. My next step would be to change DialogPool into an in-memory table.
The initial stored procedure is this:
SET ANSI_NULLS ON
GO
CREATE PROCEDURE [svcBroker].[BundleParse_SvcBroker_INS] (@Message XML)
AS
SET NOCOUNT ON
DECLARE @fromService sysname,
@toService sysname,
@onContract sysname,
@messageType NVARCHAR(128),
@messageBody NVARCHAR(MAX)
SET @fromService = 'DynamicParser\\my_InitiatorService'
SET @toService = 'DynamicParser\\my_TargetService'
SET @onContract = 'svcBroker_ods_claim_contract'
SET @messageType = 'svcBroker_ods_claim_request'
SET @messageBody = CONVERT(NVARCHAR(MAX), @message)
EXEC svcBroker.usp_send @fromService, @toService, @onContract, @messageType, @messageBody
GO
The dialogpool table:
(
[FromService] [sys].[sysname] NOT NULL,
[ToService] [sys].[sysname] NOT NULL,
[OnContract] [sys].[sysname] NOT NULL,
[Handle] [uniqueidentifier] NOT NULL,
[OwnerSPID] [int] NOT NULL,
[CreationTime] [datetime] NOT NULL,
[SendCount] [bigint] NOT NULL
) ON [PRIMARY]
GO
ALTER TABLE [svcBroker].[DialogPool] ADD CONSTRAINT [UQ__DialogPo__FE5BB31A4CC05EF3] UNIQUE NONCLUSTERED ([Handle]) ON [PRIMARY]
GO
usp_send:
SET QUOTED_IDENTIFIER ON
SET ANSI_NULLS ON
GO
CREATE PROCEDURE [svcBroker].[usp_send] (
@fromService SYSNAME,
@toService SYSNAME,
@onContract SYSNAME,
@messageType SYSNAME,
@messageBody NVARCHAR(MAX))
AS
BEGIN
SET NOCOUNT ON;
DECLARE @dialogHandle UNIQUEIDENTIFIER;
DECLARE @sendCount BIGINT;
DECLARE @counter INT;
DECLARE @error INT;
SELECT @counter = 1;
--select 'svcBroker.usp_send ', @fromService, @toService, @onContract, @messageType, @messageBody
BEGIN TRANSACTION;
-- Will need a loop to retry in case the dialog is
-- in a state that does not allow transmission
--
WHILE (1=1)
BEGIN
-- Claim a dialog from the dialog pool.
-- A new one will be created if none are available.
--
EXEC svcBroker.usp_get_dialog @fromService, @toService, @onContract, @dialogHandle OUTPUT, @sendCount OUTPUT;
-- Attempt to SEND on the dialog
--
IF (@messageBody IS NOT NULL)
BEGIN
-- If the @messageBody is not null it must be sent explicitly
SEND ON CONVERSATION @dialogHandle MESSAGE TYPE @messageType (@messageBody);
END
ELSE
BEGIN
-- Messages with no body must *not* specify the body,
-- cannot send a NULL value argument
SEND ON CONVERSATION @dialogHandle MESSAGE TYPE @messageType;
END
SELECT @error = @@ERROR;
IF @error = 0
BEGIN
-- Successful send, increment count and exit the loop --
SET @sendCount = @sendCount + 1;
BREAK;
END
SELECT @counter = @counter+1;
IF @counter > 10
BEGIN
-- We failed 10 times in a row, something must be broken --
RAISERROR('Failed to SEND on a conversation for more than 10 times. Error %i.', 16, 1, @error) WITH LOG;
BREAK;
END
-- Delete the associated dialog from the table and try again --
EXEC svcBroker.usp_delete_dialog @dialogHandle;
SELECT @dialogHandle = NULL;
END
-- "Criterion" for dialog pool removal is send count > 1000.
-- Modify to suit application.
-- When deleting also inform the target to end the dialog.
IF @sendCount > 1000
BEGIN
EXEC svcBroker.usp_delete_dialog @dialogHandle ;
SEND ON CONVERSATION @dialogHandle MESSAGE TYPE [svcBroker_EndOfStream];
END
ELSE
BEGIN
-- Free the dialog.
EXEC svcBroker.usp_free_dialog @dialogHandle, @sendCount;
END
COMMIT
END;
GO
our usp_get_dialog is:
SET ANSI_NULLS ON
GO
CREATE PROCEDURE [svcBroker].[usp_get_dialog] (
@fromService SYSNAME,
@toService SYSNAME,
@onContract SYSNAME,
@dialogHandle UNIQUEIDENTIFIER OUTPUT,
@sendCount BIGINT OUTPUT)
AS
BEGIN
SET NOCOUNT ON;
DECLARE @dialog TABLE (
FromService SYSNAME NOT NULL,
ToService SYSNAME NOT NULL,
OnContract SYSNAME NOT NULL,
Handle UNIQUEIDENTIFIER NOT NULL,
OwnerSPID INT NOT NULL,
CreationTime DATETIME NOT NULL,
SendCount BIGINT NOT NULL);
-- Try to claim an unused dialog in [DialogPool]
-- READPAST option avoids blocking on locked dialogs.
SET TRANSACTION ISOLATION LEVEL READ COMMITTED
BEGIN TRANSACTION;
DELETE @dialog;
UPDATE TOP(1) svcBroker.DialogPool WITH(READPAST)
SET OwnerSPID = @@SPID
OUTPUT INSERTED.* INTO @dialog
WHERE FromService = @fromService
AND ToService = @toService
AND OnContract = @OnContract
AND OwnerSPID = -1;
IF @@ROWCOUNT > 0
BEGIN
SET @dialogHandle = (SELECT Handle FROM @dialog);
SET @sendCount = (SELECT SendCount FROM @dialog);
END
ELSE
BEGIN
-- No free dialogs: need to create a new one
BEGIN DIALOG CONVERSATION @dialogHandle
FROM SERVICE @fromService
TO SERVICE @toService
ON CONTRACT @onContract
WITH ENCRYPTION = OFF;
INSERT INTO svcBroker.DialogPool (
FromService, ToService, OnContract, Handle, OwnerSPID, CreationTime, SendCount)
VALUES (@fromService, @toService, @onContract, @dialogHandle, @@SPID, GETDATE(), 0);
SET @sendCount = 0;
END
COMMIT
END;
GO
and usp_delete_dialog looks like this
@dialogHandle UNIQUEIDENTIFIER)
AS
BEGIN
SET NOCOUNT ON;
BEGIN TRANSACTION;
DELETE svcBroker.DialogPool WHERE Handle = @dialogHandle;
COMMIT
END;
GO
and finally, our activated stored procedure:
SET QUOTED_IDENTIFIER ON
SET ANSI_NULLS ON
GO
CREATE PROCEDURE [svcBroker].[ODS_TargetQueue_Receive]
AS
BEGIN
set nocount on
-- Variable table for received messages.
DECLARE @receive_table TABLE(
queuing_order BIGINT,
conversation_handle UNIQUEIDENTIFIER,
message_type_name SYSNAME,
message_body xml);
-- Cursor for received message table.
DECLARE message_cursor CURSOR LOCAL FORWARD_ONLY READ_ONLY
FOR SELECT
conversation_handle,
message_type_name,
message_body
FROM @receive_table ORDER BY queuing_order;
DECLARE @conversation_handle UNIQUEIDENTIFIER;
DECLARE @message_type SYSNAME;
DECLARE @message_body xml;
-- Error variables.
DECLARE @error_number INT;
DECLARE @error_message VARCHAR(4000);
DECLARE @error_severity INT;
DECLARE @error_state INT;
DECLARE @error_procedure SYSNAME;
DECLARE @error_line INT;
DECLARE @error_dialog VARCHAR(50);
BEGIN TRY
WHILE (1 = 1)
BEGIN
BEGIN TRANSACTION;
-- Receive all available messages into the table.
-- Wait 5 seconds for messages.
WAITFOR (
RECEIVE TOP (1000)
[queuing_order],
[conversation_handle],
[message_type_name],
convert(xml, [message_body])
FROM svcBroker.ODS_TargetQueue
INTO @receive_table
), TIMEOUT 2000;
IF @@ROWCOUNT = 0
BEGIN
COMMIT;
BREAK;
END
ELSE
BEGIN
OPEN message_cursor;
WHILE (1=1)
BEGIN
FETCH NEXT FROM message_cursor
INTO @conversation_handle,
@message_type,
@message_body;
IF (@@FETCH_STATUS != 0) BREAK;
-- Process a message.
-- If an exception occurs, catch and attempt to recover.
BEGIN TRY
IF @message_type = 'svcBroker_ods_claim_request'
BEGIN
exec ParseMessages @message_body
END
ELSE IF @message_type in ('svcBroker_EndOfStream', 'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog')
BEGIN
-- initiator is signaling end of message stream: end the dialog
END CONVERSATION @conversation_handle;
END
ELSE IF @message_type = 'http://schemas.microsoft.com/SQL/ServiceBroker/Error'
BEGIN
-- If the message_type indicates that the message is an error,
-- raise the error and end the conversation.
WITH XMLNAMESPACES ('http://schemas.microsoft.com/SQL/ServiceBroker/Error' AS ssb)
SELECT
@error_number = CAST(@message_body AS XML).value('(//ssb:Error/ssb:Code)[1]', 'INT'),
@error_message = CAST(@message_body AS XML).value('(//ssb:Error/ssb:Description)[1]', 'VARCHAR(4000)');
SET @error_dialog = CAST(@conversation_handle AS VARCHAR(50));
RAISERROR('Error in dialog %s: %s (%i)', 16, 1, @error_dialog, @error_message, @error_number);
END CONVERSATION @conversation_handle;
END
END TRY
BEGIN CATCH
SET @error_number = ERROR_NUMBER();
SET @error_message = ERROR_MESSAGE();
SET @error_severity = ERROR_SEVERITY();
SET @error_state = ERROR_STATE();
SET @error_procedure = ERROR_PROCEDURE();
SET @error_line = ERROR_LINE();
IF XACT_STATE() = -1
BEGIN
-- The transaction is doomed. Only rollback possible.
-- This could disable the queue if done 5 times consecutively!
ROLLBACK TRANSACTION;
-- Record the error.
BEGIN TRANSACTION;
INSERT INTO svcBroker.target_processing_errors (
error_conversation,[error_number],[error_message],[error_severity],
[error_state],[error_procedure],[error_line],[doomed_transaction],
[message_body])
VALUES (NULL, @error_number, @error_message,@error_severity,
@error_state, @error_procedure, @error_line, 1, @message_body);
COMMIT;
-- For this level of error, it is best to exit the proc
-- and give the queue monitor control.
-- Breaking to the outer catch will accomplish this.
RAISERROR ('Message processing error', 16, 1);
END
ELSE IF XACT_STATE() = 1
BEGIN
-- Record error and continue processing messages.
-- Failing message could also be put aside for later processing here.
-- Otherwise it will be discarded.
INSERT INTO svcBroker.target_processing_errors (
error_conversation,[error_number],[error_message],[error_severity],
[error_state],[error_procedure],[error_line],[doomed_transaction],
[message_body])
VALUES (NULL, @error_number, @error_message,@error_severity,
@error_state, @error_procedure, @error_line, 0, @message_body);
END
END CATCH
END
CLOSE message_cursor;
DELETE @receive_table;
END
COMMIT;
END
END TRY
BEGIN CATCH
-- Process the error and exit the proc to give the queue monitor control
SET @error_number = ERROR_NUMBER();
SET @error_message = ERROR_MESSAGE();
SET @error_severity = ERROR_SEVERITY();
SET @error_state = ERROR_STATE();
SET @error_procedure = ERROR_PROCEDURE();
SET @error_line = ERROR_LINE();
IF XACT_STATE() = -1
BEGIN
-- The transaction is doomed. Only rollback possible.
-- This could disable the queue if done 5 times consecutively!
ROLLBACK TRANSACTION;
-- Record the error.
BEGIN TRANSACTION;
INSERT INTO svcBroker.target_processing_errors (
error_conversation,[error_number],[error_message],[error_severity],
[error_state],[error_procedure],[error_line],[doomed_transaction],
[message_body])
VALUES(NULL, @error_number, @error_message,@error_severity, @error_state, @error_procedure, @error_line, 1, @message_body);
COMMIT;
END
ELSE IF XACT_STATE() = 1
BEGIN
-- Record error and commit transaction.
-- Here you could also save anything else you want before exiting.
INSERT INTO svcBroker.target_processing_errors (
error_conversation,[error_number],[error_message],[error_severity],
[error_state],[error_procedure],[error_line],[doomed_transaction],
[message_body])
VALUES(NULL, @error_number, @error_message, @error_severity, @error_state, @error_procedure, @error_line, 0, @message_body);
COMMIT;
END
END CATCH
END;
GO
The stored procedure ParseMessages is purely doing work on the pulled message - it shouldn't have any Service Broker code.