Enterprise Service Bus Patterns with Azure Service Bus: Complete Guide 2025
Enterprise Service Bus (ESB) patterns provide a foundation for building scalable, decoupled microservices architectures. Azure Service Bus implements ESB capabilities through queues, topics, subscriptions, and advanced messaging features. Understanding these patterns is essential for building production-ready enterprise integrations.
This comprehensive guide covers enterprise service bus patterns with Azure Service Bus, including message routing, pub/sub patterns, topic subscriptions, dead-letter queue handling, and production deployment strategies. You'll learn how to implement ESB patterns, handle message ordering, implement saga patterns, and build resilient messaging architectures.
Understanding Enterprise Service Bus
What is an ESB?
Enterprise Service Bus provides:
- Message Routing: Route messages to appropriate services
- Transformation: Transform message formats
- Protocol Mediation: Bridge different protocols
- Service Orchestration: Coordinate multiple services
- Centralized Management: Single point of control
Azure Service Bus Features
Azure Service Bus provides:
- Queues: Point-to-point messaging
- Topics: Publish-subscribe messaging
- Sessions: Message ordering and grouping
- Dead-Letter Queues: Failed message handling
- Auto-Forwarding: Message routing
Queue Patterns
Basic Queue Usage
Implement basic queue pattern:
public class QueueMessageProducer
{
private readonly ServiceBusClient _client;
private readonly ServiceBusSender _sender;
public QueueMessageProducer(string connectionString, string queueName)
{
_client = new ServiceBusClient(connectionString);
_sender = _client.CreateSender(queueName);
}
public async Task SendMessageAsync<T>(T message)
{
var json = JsonSerializer.Serialize(message);
var serviceBusMessage = new ServiceBusMessage(json)
{
MessageId = Guid.NewGuid().ToString(),
ContentType = "application/json"
};
await _sender.SendMessageAsync(serviceBusMessage);
}
public async Task SendBatchAsync<T>(IEnumerable<T> messages)
{
using var batch = await _sender.CreateMessageBatchAsync();
foreach (var message in messages)
{
var json = JsonSerializer.Serialize(message);
var serviceBusMessage = new ServiceBusMessage(json);
if (!batch.TryAddMessage(serviceBusMessage))
{
await _sender.SendMessagesAsync(batch);
batch.Dispose();
var newBatch = await _sender.CreateMessageBatchAsync();
newBatch.TryAddMessage(serviceBusMessage);
}
}
if (batch.Count > 0)
{
await _sender.SendMessagesAsync(batch);
}
}
}
Queue Consumer
Implement queue consumer:
public class QueueMessageConsumer
{
private readonly ServiceBusClient _client;
private readonly ServiceBusProcessor _processor;
private readonly ILogger<QueueMessageConsumer> _logger;
public QueueMessageConsumer(
string connectionString,
string queueName,
ILogger<QueueMessageConsumer> logger)
{
_client = new ServiceBusClient(connectionString);
_processor = _client.CreateProcessor(queueName, new ServiceBusProcessorOptions
{
MaxConcurrentCalls = 5,
AutoCompleteMessages = false
});
_logger = logger;
}
public void StartProcessing(Func<ServiceBusReceivedMessage, Task> messageHandler)
{
_processor.ProcessMessageAsync += async args =>
{
try
{
await messageHandler(args.Message);
await args.CompleteMessageAsync(args.Message);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing message: {MessageId}",
args.Message.MessageId);
await args.AbandonMessageAsync(args.Message);
}
};
_processor.ProcessErrorAsync += args =>
{
_logger.LogError(args.Exception, "Processor error");
return Task.CompletedTask;
};
_processor.StartProcessingAsync();
}
}
Topic and Subscription Patterns
Publish-Subscribe Pattern
Implement pub/sub with topics:
public class TopicPublisher
{
private readonly ServiceBusClient _client;
private readonly ServiceBusSender _sender;
public TopicPublisher(string connectionString, string topicName)
{
_client = new ServiceBusClient(connectionString);
_sender = _client.CreateSender(topicName);
}
public async Task PublishAsync<T>(T message, string messageType)
{
var json = JsonSerializer.Serialize(message);
var serviceBusMessage = new ServiceBusMessage(json)
{
MessageId = Guid.NewGuid().ToString(),
Subject = messageType,
ContentType = "application/json"
};
await _sender.SendMessageAsync(serviceBusMessage);
}
}
Subscription Consumer
Consume from subscriptions:
public class SubscriptionConsumer
{
private readonly ServiceBusClient _client;
private readonly ServiceBusProcessor _processor;
public SubscriptionConsumer(
string connectionString,
string topicName,
string subscriptionName)
{
_client = new ServiceBusClient(connectionString);
_processor = _client.CreateProcessor(topicName, subscriptionName);
}
public void StartProcessing(Func<ServiceBusReceivedMessage, Task> handler)
{
_processor.ProcessMessageAsync += async args =>
{
await handler(args.Message);
await args.CompleteMessageAsync(args.Message);
};
_processor.StartProcessingAsync();
}
}
Subscription Filters
Use filters for message routing:
public class SubscriptionFilterManager
{
private readonly ServiceBusAdministrationClient _adminClient;
public async Task CreateFilteredSubscriptionAsync(
string topicName,
string subscriptionName,
string filterExpression)
{
var subscriptionOptions = new CreateSubscriptionOptions(topicName, subscriptionName)
{
DefaultMessageTimeToLive = TimeSpan.FromDays(7),
MaxDeliveryCount = 10
};
var ruleOptions = new CreateRuleOptions("MessageTypeFilter")
{
Filter = new SqlRuleFilter(filterExpression)
};
await _adminClient.CreateSubscriptionAsync(subscriptionOptions);
await _adminClient.CreateRuleAsync(topicName, subscriptionName, ruleOptions);
}
// Example: Filter by message type
public async Task CreateMessageTypeSubscriptionAsync(
string topicName,
string subscriptionName,
string messageType)
{
var filter = $"Subject = '{messageType}'";
await CreateFilteredSubscriptionAsync(topicName, subscriptionName, filter);
}
}
Message Routing Patterns
Content-Based Routing
Route messages based on content:
public class ContentBasedRouter
{
private readonly Dictionary<string, ServiceBusSender> _routers;
public ContentBasedRouter(ServiceBusClient client, Dictionary<string, string> routes)
{
_routers = routes.ToDictionary(
r => r.Key,
r => client.CreateSender(r.Value));
}
public async Task RouteMessageAsync(ServiceBusReceivedMessage message)
{
var messageType = message.Subject;
if (_routers.TryGetValue(messageType, out var sender))
{
await sender.SendMessageAsync(new ServiceBusMessage(message));
}
else
{
// Route to default queue
await _routers["default"].SendMessageAsync(new ServiceBusMessage(message));
}
}
}
Auto-Forwarding
Configure auto-forwarding:
public class AutoForwardingConfig
{
private readonly ServiceBusAdministrationClient _adminClient;
public async Task ConfigureAutoForwardAsync(
string sourceQueue,
string targetQueue)
{
var queueOptions = new QueueProperties(sourceQueue)
{
ForwardTo = targetQueue
};
await _adminClient.UpdateQueueAsync(queueOptions);
}
}
Dead-Letter Queue Patterns
Dead-Letter Handling
Handle dead-letter messages:
public class DeadLetterProcessor
{
private readonly ServiceBusClient _client;
private readonly ServiceBusProcessor _dlqProcessor;
public DeadLetterProcessor(
string connectionString,
string queueName)
{
_client = new ServiceBusClient(connectionString);
_dlqProcessor = _client.CreateProcessor(
queueName,
new ServiceBusProcessorOptions
{
SubQueue = SubQueue.DeadLetter
});
}
public void StartProcessing(Func<ServiceBusReceivedMessage, Task> handler)
{
_dlqProcessor.ProcessMessageAsync += async args =>
{
var reason = args.Message.DeadLetterReason;
var description = args.Message.DeadLetterErrorDescription;
_logger.LogWarning(
"Processing dead-letter message: {MessageId}, Reason: {Reason}",
args.Message.MessageId, reason);
await handler(args.Message);
await args.CompleteMessageAsync(args.Message);
};
_dlqProcessor.StartProcessingAsync();
}
}
Dead-Letter Replay
Replay dead-letter messages:
public class DeadLetterReplayService
{
private readonly ServiceBusClient _client;
private readonly ServiceBusSender _sender;
public async Task ReplayDeadLetterMessagesAsync(
string sourceQueue,
string targetQueue,
int maxMessages = 100)
{
var receiver = _client.CreateReceiver(sourceQueue, new ServiceBusReceiverOptions
{
SubQueue = SubQueue.DeadLetter
});
var sender = _client.CreateSender(targetQueue);
var messages = await receiver.ReceiveMessagesAsync(maxMessages);
foreach (var message in messages)
{
// Create new message from dead-letter
var newMessage = new ServiceBusMessage(message)
{
MessageId = Guid.NewGuid().ToString()
};
await sender.SendMessageAsync(newMessage);
await receiver.CompleteMessageAsync(message);
}
}
}
Message Ordering
Session-Based Ordering
Use sessions for message ordering:
public class SessionBasedProducer
{
private readonly ServiceBusClient _client;
private readonly ServiceBusSender _sender;
public async Task SendOrderedMessageAsync<T>(
T message,
string sessionId)
{
var json = JsonSerializer.Serialize(message);
var serviceBusMessage = new ServiceBusMessage(json)
{
SessionId = sessionId
};
await _sender.SendMessageAsync(serviceBusMessage);
}
}
public class SessionBasedConsumer
{
private readonly ServiceBusClient _client;
private readonly ServiceBusSessionProcessor _processor;
public SessionBasedConsumer(
string connectionString,
string queueName)
{
_client = new ServiceBusClient(connectionString);
_processor = _client.CreateSessionProcessor(queueName, new ServiceBusSessionProcessorOptions
{
MaxConcurrentSessions = 5
});
}
public void StartProcessing(Func<ServiceBusReceivedMessage, Task> handler)
{
_processor.ProcessMessageAsync += async args =>
{
await handler(args.Message);
await args.CompleteMessageAsync(args.Message);
};
_processor.ProcessSessionAsync += async args =>
{
// Session started
_logger.LogInformation("Session started: {SessionId}", args.SessionId);
// Process messages in session
await args.GetSessionStateAsync();
// Session closed
await args.SetSessionStateAsync(null);
};
_processor.StartProcessingAsync();
}
}
Saga Pattern
Implementing Sagas
Implement saga pattern for distributed transactions:
public class OrderSaga
{
private readonly ServiceBusClient _client;
private readonly IOrderService _orderService;
private readonly IPaymentService _paymentService;
private readonly IInventoryService _inventoryService;
public async Task ProcessOrderAsync(OrderCreatedEvent orderEvent)
{
var sagaId = Guid.NewGuid().ToString();
// Step 1: Reserve inventory
var inventoryReserved = await _inventoryService.ReserveInventoryAsync(
orderEvent.OrderId, orderEvent.Items);
if (!inventoryReserved)
{
await CompensateOrderAsync(sagaId, "Inventory reservation failed");
return;
}
// Step 2: Process payment
var paymentProcessed = await _paymentService.ProcessPaymentAsync(
orderEvent.OrderId, orderEvent.Amount);
if (!paymentProcessed)
{
await CompensateInventoryAsync(sagaId, orderEvent.OrderId);
await CompensateOrderAsync(sagaId, "Payment processing failed");
return;
}
// Step 3: Confirm order
await _orderService.ConfirmOrderAsync(orderEvent.OrderId);
}
private async Task CompensateInventoryAsync(string sagaId, string orderId)
{
await _inventoryService.ReleaseInventoryAsync(orderId);
}
private async Task CompensateOrderAsync(string sagaId, string reason)
{
await _orderService.CancelOrderAsync(sagaId, reason);
}
}
Best Practices
1. Use Appropriate Messaging Pattern
- Queues: Point-to-point, guaranteed delivery
- Topics: Pub/sub, multiple consumers
- Sessions: Message ordering required
2. Handle Dead-Letter Messages
- Monitor DLQ regularly
- Implement replay mechanism
- Fix root causes
- Alert on DLQ growth
3. Implement Idempotency
public async Task ProcessMessageIdempotentlyAsync(
ServiceBusReceivedMessage message)
{
var messageId = message.MessageId;
if (await _processedMessages.ContainsAsync(messageId))
{
_logger.LogInformation("Message already processed: {MessageId}", messageId);
return;
}
await ProcessMessageAsync(message);
await _processedMessages.AddAsync(messageId);
}
4. Monitor Message Processing
- Track processing time
- Monitor error rates
- Alert on DLQ growth
- Track throughput
5. Use Managed Identity
Authenticate with Managed Identity:
var credential = new DefaultAzureCredential();
var client = new ServiceBusClient(
"your-namespace.servicebus.windows.net",
credential);
Advanced ESB Patterns
Message Routing Patterns
Implement intelligent message routing:
public class MessageRouter
{
private readonly Dictionary<string, Func<ServiceBusMessage, string>> _routingRules;
public async Task RouteMessageAsync(ServiceBusMessage message)
{
var route = DetermineRoute(message);
switch (route.Type)
{
case RouteType.Queue:
await SendToQueueAsync(route.Destination, message);
break;
case RouteType.Topic:
await SendToTopicAsync(route.Destination, message);
break;
case RouteType.Multiple:
await SendToMultipleDestinationsAsync(route.Destinations, message);
break;
}
}
private Route DetermineRoute(ServiceBusMessage message)
{
// Route based on message properties
if (message.ApplicationProperties.TryGetValue("Priority", out var priority))
{
return priority.ToString() == "High"
? new Route { Type = RouteType.Queue, Destination = "high-priority-queue" }
: new Route { Type = RouteType.Queue, Destination = "standard-queue" };
}
// Route based on message type
if (message.ApplicationProperties.TryGetValue("MessageType", out var messageType))
{
return new Route
{
Type = RouteType.Topic,
Destination = $"topic-{messageType}"
};
}
return new Route { Type = RouteType.Queue, Destination = "default-queue" };
}
}
Message Transformation
Transform messages during routing:
public class MessageTransformer
{
public async Task<ServiceBusMessage> TransformMessageAsync(
ServiceBusMessage source,
TransformationRule rule)
{
var transformed = new ServiceBusMessage(source.Body);
// Transform body
if (rule.TransformBody)
{
transformed.Body = await TransformBodyAsync(source.Body, rule);
}
// Transform properties
if (rule.TransformProperties)
{
foreach (var prop in rule.PropertyMappings)
{
if (source.ApplicationProperties.TryGetValue(prop.Source, out var value))
{
transformed.ApplicationProperties[prop.Target] =
await TransformPropertyAsync(value, prop.Transformation);
}
}
}
return transformed;
}
}
Real-World Scenarios
Scenario 1: High-Volume Message Processing
Handle millions of messages per day:
public class HighVolumeMessageProcessor
{
private readonly ServiceBusClient _client;
private readonly int _maxConcurrency;
public async Task ProcessMessagesAsync(string queueName)
{
var processor = _client.CreateProcessor(queueName, new ServiceBusProcessorOptions
{
MaxConcurrentCalls = _maxConcurrency,
PrefetchCount = 100,
AutoCompleteMessages = false
});
processor.ProcessMessageAsync += async args =>
{
try
{
await ProcessMessageAsync(args.Message);
await args.CompleteMessageAsync(args.Message);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to process message");
await args.AbandonMessageAsync(args.Message);
}
};
processor.ProcessErrorAsync += args =>
{
_logger.LogError(args.Exception, "Processor error");
return Task.CompletedTask;
};
await processor.StartProcessingAsync();
}
}
Scenario 2: Multi-Region Message Replication
Replicate messages across regions:
public class MultiRegionMessageReplicator
{
private readonly List<ServiceBusClient> _regionalClients;
public async Task ReplicateMessageAsync(
ServiceBusMessage message,
string sourceRegion)
{
var tasks = _regionalClients
.Where(c => c.FullyQualifiedNamespace != sourceRegion)
.Select(client => SendToRegionAsync(client, message));
await Task.WhenAll(tasks);
}
private async Task SendToRegionAsync(
ServiceBusClient client,
ServiceBusMessage message)
{
var sender = client.CreateSender("replicated-queue");
await sender.SendMessageAsync(message);
}
}
Troubleshooting ESB Issues
Common Problems
Problem 1: Message Loss
Symptoms:
- Messages not received
- Missing messages in queue
Solution:
public class MessageLossDetector
{
public async Task DetectMessageLossAsync(string queueName)
{
var queueProperties = await GetQueuePropertiesAsync(queueName);
var activeMessages = queueProperties.ActiveMessageCount;
var deadLetterMessages = queueProperties.DeadLetterMessageCount;
// Check for unusual patterns
if (deadLetterMessages > activeMessages * 0.1) // More than 10% in DLQ
{
await AlertAsync("High dead-letter rate detected");
}
// Monitor message flow
var sentCount = await GetSentMessageCountAsync(queueName);
var receivedCount = await GetReceivedMessageCountAsync(queueName);
if (sentCount - receivedCount > sentCount * 0.05) // More than 5% difference
{
await AlertAsync("Potential message loss detected");
}
}
}
Problem 2: Performance Degradation
Symptoms:
- Slow message processing
- High latency
Solution:
public class PerformanceOptimizer
{
public async Task OptimizePerformanceAsync(string queueName)
{
// Increase prefetch
var processorOptions = new ServiceBusProcessorOptions
{
PrefetchCount = 200, // Increase from default
MaxConcurrentCalls = Environment.ProcessorCount * 2
};
// Use batch operations
var receiver = await CreateBatchReceiverAsync(queueName);
var messages = await receiver.ReceiveMessagesAsync(maxMessages: 100);
// Process in parallel
var tasks = messages.Select(m => ProcessMessageAsync(m));
await Task.WhenAll(tasks);
}
}
Performance Optimization
Message Batching
Batch messages for efficiency:
public class BatchMessageSender
{
private readonly ServiceBusSender _sender;
private readonly List<ServiceBusMessage> _batch = new();
private readonly int _batchSize;
public async Task SendMessageAsync(ServiceBusMessage message)
{
_batch.Add(message);
if (_batch.Count >= _batchSize)
{
await FlushBatchAsync();
}
}
private async Task FlushBatchAsync()
{
if (_batch.Count > 0)
{
using var messageBatch = await _sender.CreateMessageBatchAsync();
foreach (var message in _batch)
{
if (!messageBatch.TryAddMessage(message))
{
// Batch is full, send it
await _sender.SendMessagesAsync(messageBatch);
// Create new batch
messageBatch.Dispose();
var newBatch = await _sender.CreateMessageBatchAsync();
newBatch.TryAddMessage(message);
}
}
if (messageBatch.Count > 0)
{
await _sender.SendMessagesAsync(messageBatch);
}
_batch.Clear();
}
}
}
Connection Pooling
Optimize Service Bus connections:
public class OptimizedServiceBusClient
{
public static ServiceBusClient CreateOptimizedClient(string connectionString)
{
return new ServiceBusClient(connectionString, new ServiceBusClientOptions
{
TransportType = ServiceBusTransportType.AmqpTcp,
RetryOptions = new ServiceBusRetryOptions
{
Mode = ServiceBusRetryMode.Exponential,
MaxRetries = 3,
Delay = TimeSpan.FromSeconds(1),
MaxDelay = TimeSpan.FromSeconds(30)
}
});
}
}
Extended FAQ
Q: How do I handle message ordering with topics?
A: Use sessions:
var sender = client.CreateSender("topic", new ServiceBusSenderOptions
{
EnableIdempotentPublishing = true
});
var message = new ServiceBusMessage("body")
{
SessionId = "session-1" // Messages with same session ID are ordered
};
await sender.SendMessageAsync(message);
Q: Can I implement request-reply pattern with Service Bus?
A: Yes, use correlation:
// Request
var request = new ServiceBusMessage("request")
{
MessageId = Guid.NewGuid().ToString(),
ReplyTo = "reply-queue",
ReplyToSessionId = "session-1"
};
await sender.SendMessageAsync(request);
// Reply
var receiver = await client.AcceptSessionAsync("reply-queue", "session-1");
var reply = await receiver.ReceiveMessageAsync();
Case Studies
Case Study 1: Enterprise Order Processing
Challenge: An e-commerce platform needed to process 1M+ orders daily with guaranteed delivery.
Solution: Implemented saga pattern with Service Bus:
public class OrderSagaOrchestrator
{
public async Task ProcessOrderAsync(Order order)
{
// Step 1: Validate order
await SendToQueueAsync("validate-order", order);
// Step 2: Reserve inventory
await SendToQueueAsync("reserve-inventory", order);
// Step 3: Process payment
await SendToQueueAsync("process-payment", order);
// Step 4: Fulfill order
await SendToQueueAsync("fulfill-order", order);
}
public async Task CompensateOrderAsync(Order order, string failedStep)
{
// Compensate based on failed step
switch (failedStep)
{
case "process-payment":
await SendToQueueAsync("release-inventory", order);
break;
case "fulfill-order":
await SendToQueueAsync("refund-payment", order);
await SendToQueueAsync("release-inventory", order);
break;
}
}
}
Results:
- 99.99% message delivery
- Zero order loss
- Sub-second processing time
Migration Guide
Migrating from Legacy Message Queue
Step 1: Analyze existing queue:
public class QueueMigrationAnalyzer
{
public async Task<MigrationPlan> AnalyzeQueueAsync(string legacyQueue)
{
var messages = await GetQueueMessagesAsync(legacyQueue);
return new MigrationPlan
{
TotalMessages = messages.Count,
MessageTypes = messages.Select(m => m.Type).Distinct().ToList(),
EstimatedMigrationTime = EstimateMigrationTime(messages.Count),
RequiredResources = CalculateResources(messages)
};
}
}
Step 2: Migrate messages:
public async Task MigrateQueueAsync(string legacyQueue, string serviceBusQueue)
{
var legacyMessages = await GetLegacyMessagesAsync(legacyQueue);
var serviceBusClient = new ServiceBusClient(connectionString);
var sender = serviceBusClient.CreateSender(serviceBusQueue);
foreach (var legacyMessage in legacyMessages)
{
var serviceBusMessage = ConvertToServiceBusMessage(legacyMessage);
await sender.SendMessageAsync(serviceBusMessage);
// Mark as migrated
await MarkAsMigratedAsync(legacyMessage.Id);
}
}
Conclusion
Enterprise Service Bus patterns with Azure Service Bus provide a robust foundation for building scalable, decoupled microservices. By implementing proper message routing, pub/sub patterns, dead-letter handling, and saga patterns, you can build resilient enterprise integrations.
Key Takeaways:
- Choose right pattern - Queues vs Topics vs Sessions
- Handle dead-letters - Monitor and replay DLQ messages
- Implement idempotency - Ensure safe retries
- Use sessions - For message ordering
- Monitor messaging - Track performance and errors
- Use Managed Identity - Secure authentication
- Message routing - Intelligent routing patterns
- Message transformation - Transform during routing
- Performance optimization - Batching and pooling
- Migration planning - From legacy systems
Next Steps:
- Design messaging architecture
- Implement message producers
- Set up consumers
- Configure dead-letter handling
- Set up monitoring
- Optimize performance
- Plan migration from legacy systems
Service Bus Best Practices
Implementation Checklist
- Choose right messaging pattern (queue/topic/session)
- Configure dead-letter handling
- Implement idempotency checks
- Set up message ordering (sessions)
- Configure retry policies
- Enable monitoring and alerting
- Test message processing
- Document messaging architecture
- Review message patterns regularly
- Optimize performance
Production Deployment
Before deploying messaging:
- Test all message patterns
- Verify dead-letter handling
- Test idempotency
- Validate message ordering
- Set up monitoring
- Load test the system
- Document procedures
- Review security settings
Additional Resources
Azure Service Bus Documentation
- Azure Service Bus overview
- Queues, topics, and subscriptions
- Message sessions
- Dead-letter queues
Related Topics
- Enterprise messaging patterns
- Message routing
- Saga patterns
- Event-driven architecture
Service Bus Implementation Guide
Step-by-Step Setup
-
Create Service Bus Namespace
- Create namespace
- Configure pricing tier
- Set up networking
- Configure authentication
-
Create Queues/Topics
- Create queues for point-to-point
- Create topics for pub/sub
- Configure subscriptions
- Set up dead-letter queues
-
Implement Producers
- Create message senders
- Implement message formatting
- Add error handling
- Set up retry logic
-
Implement Consumers
- Create message processors
- Handle message processing
- Implement error handling
- Set up dead-letter handling
-
Set Up Monitoring
- Track message counts
- Monitor dead-letter queues
- Alert on errors
- Generate reports
Service Bus Patterns
Queue Producer
Send messages to queue:
public class QueueProducer
{
private readonly ServiceBusSender _sender;
public QueueProducer(ServiceBusClient client, string queueName)
{
_sender = client.CreateSender(queueName);
}
public async Task SendMessageAsync<T>(T message)
{
var serviceBusMessage = new ServiceBusMessage(JsonSerializer.Serialize(message))
{
MessageId = Guid.NewGuid().ToString(),
Subject = typeof(T).Name
};
await _sender.SendMessageAsync(serviceBusMessage);
}
}
Topic Subscriber
Subscribe to topic:
public class TopicSubscriber
{
private readonly ServiceBusProcessor _processor;
public TopicSubscriber(ServiceBusClient client, string topicName, string subscriptionName)
{
_processor = client.CreateProcessor(topicName, subscriptionName);
_processor.ProcessMessageAsync += ProcessMessageAsync;
_processor.ProcessErrorAsync += ProcessErrorAsync;
}
private async Task ProcessMessageAsync(ProcessMessageEventArgs args)
{
var message = JsonSerializer.Deserialize<MyMessage>(args.Message.Body.ToString());
await ProcessMessageAsync(message);
await args.CompleteMessageAsync(args.Message);
}
}
Service Bus Advanced Patterns
Message Correlation
Correlate related messages:
public class MessageCorrelator
{
public async Task SendCorrelatedMessageAsync<T>(T message, string correlationId)
{
var serviceBusMessage = new ServiceBusMessage(JsonSerializer.Serialize(message))
{
MessageId = Guid.NewGuid().ToString(),
CorrelationId = correlationId,
Subject = typeof(T).Name
};
await _sender.SendMessageAsync(serviceBusMessage);
}
public async Task<List<ServiceBusMessage>> GetCorrelatedMessagesAsync(
string correlationId)
{
var messages = new List<ServiceBusMessage>();
var receiver = _client.CreateReceiver(_queueName);
while (true)
{
var message = await receiver.ReceiveMessageAsync(
maxWaitTime: TimeSpan.FromSeconds(5));
if (message == null)
{
break;
}
if (message.CorrelationId == correlationId)
{
messages.Add(message);
}
else
{
await receiver.AbandonMessageAsync(message);
}
}
return messages;
}
}
Message Scheduling
Schedule messages for future delivery:
public class MessageScheduler
{
public async Task ScheduleMessageAsync<T>(
T message,
DateTime scheduledEnqueueTime)
{
var serviceBusMessage = new ServiceBusMessage(JsonSerializer.Serialize(message))
{
MessageId = Guid.NewGuid().ToString(),
ScheduledEnqueueTime = scheduledEnqueueTime
};
await _sender.SendMessageAsync(serviceBusMessage);
}
}
Service Bus Monitoring
Message Flow Monitoring
Monitor message flow:
public class MessageFlowMonitor
{
public async Task<FlowMetrics> GetFlowMetricsAsync(string queueName, TimeSpan period)
{
var sent = await GetSentMessageCountAsync(queueName, period);
var received = await GetReceivedMessageCountAsync(queueName, period);
var deadLetter = await GetDeadLetterCountAsync(queueName, period);
return new FlowMetrics
{
QueueName = queueName,
MessagesSent = sent,
MessagesReceived = received,
MessagesInDeadLetter = deadLetter,
Throughput = received / period.TotalSeconds,
SuccessRate = (double)received / sent
};
}
}
Message Processing Monitoring
Monitor message processing:
public class MessageProcessingMonitor
{
public async Task<ProcessingMetrics> GetProcessingMetricsAsync(
string queueName,
TimeSpan period)
{
var messages = await GetProcessedMessagesAsync(queueName, period);
return new ProcessingMetrics
{
QueueName = queueName,
TotalProcessed = messages.Count,
AverageProcessingTime = messages.Average(m => m.ProcessingTime),
P95ProcessingTime = CalculatePercentile(messages.Select(m => m.ProcessingTime), 0.95),
ErrorRate = (double)messages.Count(m => !m.Success) / messages.Count
};
}
}
Service Bus Performance Tuning
Throughput Optimization
Optimize message throughput:
public class ThroughputOptimizer
{
public async Task OptimizeThroughputAsync(string queueName)
{
// Increase prefetch count
var processorOptions = new ServiceBusProcessorOptions
{
PrefetchCount = 200, // Increase from default
MaxConcurrentCalls = Environment.ProcessorCount * 2
};
// Use batch operations
var receiver = await CreateBatchReceiverAsync(queueName);
var messages = await receiver.ReceiveMessagesAsync(maxMessages: 100);
// Process in parallel
var tasks = messages.Select(m => ProcessMessageAsync(m));
await Task.WhenAll(tasks);
}
}
Service Bus Best Practices Summary
Implementation Checklist
- Choose right messaging pattern (queue/topic/session)
- Configure dead-letter handling
- Implement idempotency checks
- Set up message ordering (sessions)
- Configure retry policies
- Enable monitoring and alerting
- Test message processing
- Document messaging architecture
- Review message patterns regularly
- Optimize performance
Production Deployment
Before deploying messaging:
- Test all message patterns
- Verify dead-letter handling
- Test idempotency
- Validate message ordering
- Set up monitoring
- Load test the system
- Document procedures
- Review security settings
Service Bus Troubleshooting
Common Issues
Troubleshoot Service Bus issues:
public class ServiceBusTroubleshooter
{
public async Task<DiagnosticsResult> DiagnoseAsync(string queueName)
{
var diagnostics = new DiagnosticsResult();
// Check queue exists
diagnostics.QueueExists = await CheckQueueExistsAsync(queueName);
// Check message count
diagnostics.MessageCount = await GetMessageCountAsync(queueName);
// Check dead-letter count
diagnostics.DeadLetterCount = await GetDeadLetterCountAsync(queueName);
// Check connection
diagnostics.ConnectionWorking = await TestConnectionAsync();
return diagnostics;
}
}
Service Bus Advanced Scenarios
High-Volume Processing
Handle high-volume message processing:
public class HighVolumeProcessor
{
private readonly ServiceBusClient _client;
private readonly int _batchSize;
public async Task ProcessHighVolumeAsync(string queueName)
{
var processor = _client.CreateProcessor(queueName, new ServiceBusProcessorOptions
{
MaxConcurrentCalls = 100,
PrefetchCount = 500,
AutoCompleteMessages = false
});
processor.ProcessMessageAsync += async args =>
{
try
{
await ProcessMessageBatchAsync(args.Message);
await args.CompleteMessageAsync(args.Message);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to process message");
await args.AbandonMessageAsync(args.Message);
}
};
await processor.StartProcessingAsync();
}
}
Message Priority Handling
Handle message priorities:
public class PriorityMessageHandler
{
public async Task ProcessPriorityMessageAsync(ServiceBusMessage message)
{
var priority = message.ApplicationProperties.TryGetValue("Priority", out var p)
? (Priority)Enum.Parse(typeof(Priority), p.ToString())
: Priority.Normal;
switch (priority)
{
case Priority.High:
await ProcessHighPriorityAsync(message);
break;
case Priority.Normal:
await ProcessNormalPriorityAsync(message);
break;
case Priority.Low:
await ProcessLowPriorityAsync(message);
break;
}
}
}
Service Bus Monitoring and Diagnostics
Message Processing Monitoring
Monitor message processing:
public class MessageProcessingMonitor
{
public async Task<ProcessingMetrics> GetProcessingMetricsAsync(
string queueName,
TimeSpan period)
{
var metrics = await GetMetricsDataAsync(queueName, period);
return new ProcessingMetrics
{
QueueName = queueName,
Period = period,
TotalMessages = metrics.Sum(m => m.MessageCount),
ProcessedMessages = metrics.Count(m => m.Processed),
FailedMessages = metrics.Count(m => !m.Processed),
AverageProcessingTime = metrics.Average(m => m.ProcessingTime),
P95ProcessingTime = CalculatePercentile(
metrics.Select(m => m.ProcessingTime), 0.95)
};
}
}
Dead Letter Queue Management
Manage dead letter queues:
public class DeadLetterQueueManager
{
public async Task<DeadLetterReport> GetDeadLetterReportAsync(string queueName)
{
var deadLetters = await GetDeadLetterMessagesAsync(queueName);
return new DeadLetterReport
{
QueueName = queueName,
TotalDeadLetters = deadLetters.Count,
ByReason = deadLetters
.GroupBy(dl => dl.Reason)
.Select(g => new DeadLetterByReason
{
Reason = g.Key,
Count = g.Count()
})
.ToList()
};
}
}
For more messaging guidance, explore our Reliable Queues with Azure Service Bus or .NET Microservices Communication Patterns.