Generic Message Queue implementation using AWS SQS and .Net 6 BackgroundService

Requirement

Most of the Enterprise projects we develop, we have to implement some cross-cutting concerns like Audit Logs. The important factor is that application performance should not impact because of these Audit Logs (or similar cross-cutting concerns). So how to implement this without compromising the performance?

BackgroundService

Background tasks and scheduled jobs are something you might need to use in any application, whether or not it follows the microservices architecture pattern. The difference when using a microservices architecture is that you can implement the background task in a separate process/container for hosting so you can scale it down/up based on your need.

From a generic point of view, in .NET we called these type of tasks Hosted Services, because they are services/logic that you host within your host/application/microservice. Note that in this case, the hosted service simply means a class with the background task logic.

In our sample we will be configuring the BackgroundService in the same Web API project for the sake of simplicity, but in the real production scenario you should consider a separate service. So BackgroundService can offload the Audit Log writing mechanism from the Web API. Now the challenge is how should we send the Audit Log objects to BackgroundService?

Message Queue

Message queues allow different parts of a system to communicate and process operations asynchronously. A message queue provides a lightweight buffer which temporarily stores messages, and endpoints that allow software components to connect to the queue in order to send and receive messages. The messages are usually small, and can be things like requests, replies, error messages, or just plain information. To send a message, a component called a producer adds a message to the queue. The message is stored on the queue until another component called a consumer retrieves the message and does something with it.

There are different implementations of Message Queues exists, multiple cloud providers provide their own implementations. Here we’re using AWS SQS

Steps

Create an ASP.NET Core Web API

Install Dependencies

Create a folder Services -> Contracts and create a Generic Interface called IMessageService as follows:

public interface IMessageService<T>{    Task DeleteMessageAsync(string id);    Task<Dictionary<string, T?>> ReceiveMessageAsync(int maxMessages = 1);    Task SendMessage(T message);}

Now let’s create the SQS Message Service by implementing the above Interface

Let’s first create the Constructor and this will create AWS SQS Client. Before doing that we need to create an IAM user with required permissions.

Create IAM user

  • Go to the AWS console and search IAM.
  • Click on the Users panel on the left.
  • Click on the Add User button.
  • Provide a user name and select the Access key - Programmatic access checkbox and click Next: Permissions button.
  • Click on Attach existing policies directly tab
  • Search AmazonSQSFullAccess and select that policy. We would require FullAccess because we will be creating the Queue programmatically if that Queue does not exist.
  • Click on Next button twice and finally click on Create User button.
  • Copy the Access key ID and Secret key ID and store in a safe place.

Configure the AWS Credentials

There are multiple ways to configure the AWS credentials. I used AWS CLI, which can be downloaded from here. Once it’s downloaded and installed in your machine follow the below command in Terminal or Command Prompt.

aws configure

The above command will access the following details from user and store in the ~/.aws/config file. AWS SDK will fetch these credentials and create the clients.

AWS Access Key IDAWS Secret Access KeyDefault region name

Constructor code will look like the below:

public SqsGenericService(ILogger<SqsGenericService<T>> logger, IConfiguration configuration, IHostingEnvironment env){    _logger = logger;    var options = c;    //This queueName will be used to create the SQS Queue for each type of object in different environments    var queueName = $"que-{env.EnvironmentName.ToLower()}-{typeof(T).Name.ToLower()}";    _amazonSQSClient = options.CreateServiceClient<IAmazonSQS>();    _queueUrl = GetQueueUrl(queueName).Result;}

Most of  the code is self explanatory here. The configuration.GetAWSOptions() will fetch the AWS configurations.

Dynamic Queue creation for each environments and entities

queueName variable will be created concatenating the environment name and the name of the generic entity. GetQueueUrl() method will fetch the queue url if it already exists or else it will create a queue.

The next method is SendMessage. This method will accept a Generic message object and use AWS SQS Client to push the serialized object to the Queue.

public async Task SendMessage(T message){    var messageBody = JsonConvert.SerializeObject(message);    await _amazonSQSClient.SendMessageAsync(new SendMessageRequest    {       QueueUrl = _queueUrl,       MessageBody = messageBody    });    _logger.LogInformation("Message {message} send successfully to {_queueUrl}.", message, _queueUrl);}

The next method is ReceiveMessageAsync, this method will fetch the messages from queue and convert that to a Dictionary of MessageReceiptHandle and MessageBody as the consumer of this service would require RecieptHandle to delete the message after processing.

Worker Process

To implement the worker process, as mentioned earlier, decided to use BackgroundService. Here is the complete code for the AuditLogWorker class.

 public class AuditLogWorker : BackgroundService {     private readonly ILogger<AuditLogWorker> _logger;     private readonly IMessageService<AuditLogModel> _messageClient;     public AuditLogWorker(ILogger<AuditLogWorker> logger, IMessageService<AuditLogModel> messageClient)     {         _logger = logger;         _messageClient = messageClient;     }     protected override async Task ExecuteAsync(CancellationToken stoppingToken)     {         while (!stoppingToken.IsCancellationRequested)         {             _logger.LogInformation("AuditLogWorker running at: {Time}", DateTime.Now);             var messages = await _messageClient.ReceiveMessageAsync();             foreach (var message in messages)             {                 //You can write your custom logic here...                 _logger.LogInformation("AuditLogWorker processed message {userID}, {Action}", message.Value?.UserId, message.Value?.Message);                 await _messageClient.DeleteMessageAsync(message.Key);             }         await Task.Delay(5000, stoppingToken); //Delay can be set according to your business requirement.         }     }}

Modify the Program.cs to add the necessary dependencies and configure the hosted service as below:

builder.Services.AddSingleton<IMessageService<AuditLogModel>, SqsGenericService<AuditLogModel>>();builder.Services.AddHostedService<AuditLogWorker>();

Create a simple REST API method to accept an object and push that item to the queue and test it. The testing method will look like the below:

 [HttpPost] public async Task Post([FromBody] AuditLogModel model) {     await _messageClient.SendMessage(model);     _logger.LogInformation("Message pushed to the queue successfully."); }

That’s it folks, I hope everybody enjoyed the blog. The entire code can be downloaded from the GitHub repo

Happy coding!!!