CQRS and Mediator Design Patterns in .Net 6

Introduction

CQRS

CQRS stands for Command and Query Responsibility Segregation, a design pattern that separates read and update operations for a data store. Implementing CQRS in your application can maximize its performance, scalability, and security. The flexibility created by migrating to CQRS allows a system to better evolve over time and prevents update commands from causing merge conflicts at the domain level. I’ve posted an article explaining how a CQRS pattern can be used to scale the MySQL database horizontally

Mediator

Mediator design pattern is one of the important and widely used behavioral design patterns. Mediator enables decoupling of objects by introducing a layer in between so that the interaction between objects happens via the layer. If the objects interact with each other directly, the system components are tightly-coupled with each other that makes higher maintainability cost and not hard to extend. Mediator pattern focuses on providing a mediator between objects for communication and help in implementing loose-coupling between objects.

Problem

In traditional architecture, we use the same database for query and update operations. That is simple and works well for basic CRUD operations. In more complex applications, however, this approach can become unmanageable. Then you start refactoring your code and try to separate read and update calls probably by implementing the CQRS pattern. In the DotNet world developers use one service to manage everything related to one specific entity, but if you implement CQRS pattern then you need multiple services for queries and commands and if you inject all these dependencies using Dependency Injection, then there will be lot of services in a simple Controller.

Solution

Mediator pattern can help you resolve the above problem. One mediator class or library can call the required commands and query services based on the input models. So you just need to inject one Interface and that interface will manage further dependencies. We use Dependency Injection to make our application loosely coupled, but the Mediator pattern will make this further de-coupled and simplified.

Package

The MediatR Nuget package can be used to implement the Mediator pattern in .Net. You can use the below commands to install the required packages

    dotnet add package MediatR    dotnet add package MediatR.Extensions.Microsoft.DependencyInjection

We will be using Dapper Micro ORM in this application to do the database operations. I’ll be explaining about the Dapper in a separate article. Install Dapper by running the below command

    dotnet add package Dapper

MediatR mainly uses two interface to implement the Mediator pattern

  • IRequest<T>
  • IRequestHandler<T, U>

More details can be found here

How

I’m going to define a folder structure and naming convention as follows but feel free to use your own conventions.

  • Commands For all commands(CUD Operations), these are POCO classes implements IRequest<T>
  • CommandHandlers All the business logic to execute the commands
  • Queries Same as commands but only for Read operations
  • QueryHandlers All the business logic to execute the queries

I’ve created two DbContexts called ToDoContextRead and ToDoContextWrite both pointing to the same database, but in a Production scenario you can use separate database connection strings for both DbContexts. More on that topic is mentioned here.

ToDoContextRead will look like the below

public class ToDoContextRead{    private readonly string _connectionString;    public ToDoContextRead(IConfiguration configuration)    {        _connectionString = configuration.GetConnectionString("SqlConnectionRead");    }    public IDbConnection CreateConnection()        => new SqlConnection(_connectionString);}

I’ve also created two Repositories for reading and writing to the databases

ToDoRepositoryRead will look like the below:

public class ToDoRepositoryRead : IToDoRepositoryRead{    private readonly ToDoContextRead _context;    public ToDoRepositoryRead(ToDoContextRead context)    {        _context = context;    }    public async Task<ToDo> GetToDoById(Guid id)    {        var query = "SELECT * FROM ToDos where id=@id";        var param = new { id };        using var connection = _context.CreateConnection();        var todo = await connection.QueryFirstOrDefaultAsync<ToDo>(query, param);        return todo;    }    public async Task<IEnumerable<ToDo>> GetToDos()    {        var query = "SELECT * FROM ToDos";        using var connection = _context.CreateConnection();        var todos = await connection.QueryAsync<ToDo>(query);        return todos.ToList();    }}

ToDoRepositoryWrite code is as follows:

public class ToDoRepositoryWrite : IToDoRepositoryWrite{    private readonly ToDoContextWrite _context;    public ToDoRepositoryWrite(ToDoContextWrite context)    {        _context = context;    }    public async Task<int> DeleteToDoById(Guid id)    {        var query = "DELETE FROM ToDos where id=@id";        var param = new { id };        using var connection = _context.CreateConnection();        return await connection.ExecuteAsync(query, param);    }    public async Task<ToDo> GetToDoById(Guid id)    {        var query = "SELECT * FROM ToDos where id=@id";        var param = new { id };        using var connection = _context.CreateConnection();        var todo = await connection.QueryFirstOrDefaultAsync<ToDo>(query, param);        return todo;    }    public async Task<IEnumerable<ToDo>> GetToDos()    {        var query = "SELECT * FROM ToDos";        using var connection = _context.CreateConnection();        var todos = await connection.QueryAsync<ToDo>(query);        return todos.ToList();    }    public async Task<int> SaveToDo(ToDo toDo)    {        var query = @"INSERT INTO ToDos                    (Id, Title, Description, Created, IsCompleted)                    VALUES (@Id, @Title, @Description, @Created, @IsCompleted);";        toDo.Created = DateTime.Now;        using var connection = _context.CreateConnection();        return await connection.ExecuteAsync(query, toDo);    }    public async Task<int> UpdateToDo(ToDo toDo)    {        var query = @"UPDATE ToDos SET                    Title=@Title, Description=@Description, Created=@Created, IsCompleted=@IsCompleted                    WHERE Id=@Id";        toDo.Created = DateTime.Now;        using var connection = _context.CreateConnection();        return await connection.ExecuteAsync(query, toDo);    }}

Commands and Queries

Now the important step is to create the Commands and Queries. Commands and Queries are simple DTOs or POCO classes, but in order to work with the MediatR library, we need to implement an Interface called IRequest<T>. A sample Command is shown below. All other commands and queries can be found in the Github Repo.

public class CreateToDoCommand : IRequest<ToDo>{    public string? Title { get; set; }    public string? Description { get; set; }    public CreateToDoCommand(string? title, string? description)    {        Title = title;        Description = description;    }    public CreateToDoCommand()    {    }}

In the above example, the ToDo class in the IRequest Interface is the return type. For each command or query there should be a Handler defined. Here is the Handler for CreateToDoCommand:

public class CreateToDoCommandHandler : IRequestHandler<CreateToDoCommand, ToDo>{    private readonly IToDoRepositoryWrite _toDoRepositoryWrite;    public CreateToDoCommandHandler(IToDoRepositoryWrite toDoRepositoryWrite)    {        _toDoRepositoryWrite = toDoRepositoryWrite;    }    public async Task<ToDo> Handle(CreateToDoCommand request, CancellationToken cancellationToken)    {        var todo = new ToDo        {            Created = DateTime.Now,            Description = request.Description,            Title = request.Title,            Id = Guid.NewGuid(),            IsCompleted = false        };        var result = await _toDoRepositoryWrite.SaveToDo(todo);        if (result > 0)        {            return todo;        }        else        {            throw new ArgumentException("Unable to save the ToDo");        }    }}

Similar way you can define all your commands, queries and handlers. Once that part is ready then you need to configure the Mediator service in the Program.cs file as follows:

    builder.Services.AddMediatR(typeof(ToDoContextRead).GetTypeInfo().Assembly);

In the above line ToDoContextRead is used just for getting the assembly, and this line will do all the magic to binding the commands and queries to the handlers.

Now you can inject the Mediator into your controller as follows:

private readonly IMediator _mediator;public ToDosController(IMediator mediator){    _mediator = mediator;}

Now you can call any handlers by simply sending the commands as follows

var todos = await _mediator.Send(new GetToDoDetailQuery { Id = id });

Complete code sample can be found at https://github.com/kannan-kiwitech/CqrsMediatorSampleApi.

Happy coding!

How to Scale an AWS RDS MySQL Database Horizontally?

What is scalability

The scalability of an application is the measure of the number of client requests it can simultaneously handle. When a hardware resource runs out and can no longer handle requests, it is counted as the limit of scalability. When this limit of the resource is reached, the application can no longer handle additional requests. To efficiently handle additional requests, administrators should scale the infrastructure by adding more resources such as RAM, CPU, storage, network devices, etc. Horizontal and vertical scaling are the two methods implemented by administrators for capacity planning.

What is Horizontal Scaling?

Horizontal scaling is an approach of adding more devices to the infrastructure to increase the capacity and efficiently handle increasing traffic demands. As the name says, horizontal scaling is about expanding the capacity horizontally by adding extra servers. The load and processing power are shared among multiple servers within a system using a load balancer. It is also called scaling out.

What is Vertical Scaling?

Vertical scaling is a type of scalability wherein more computing and processing power is added to a machine to increase its performance. Also called scale-up, vertical scaling allows you to increase the machine’s capacity while maintaining resources within the same logical unit. The processor, memory, storage, and network capacity are increased in this approach.

Scalability Issues of RDBMS (Specific to MySQL)

As we discussed earlier, vertical scalability has some hardware upper limits. Vertical scaling also requires some downtime. We cannot afford both in the database world. So we need to look into horizontal scalability options. In a database world, horizontal scaling is usually based on the partitioning of data (each partition only contains part of the data). Partitioning requires more effort and thought process in the development and design phase. That is a separate process and we’re not discussing that here.

Scaling MySQL Using Read Replicas

The read replica feature allows you to replicate data from MySQL server to one or more read-only servers. Replicas are updated asynchronously using the MySQL engine’s native binary log file position-based replication technology.

In this case, we will create a Master-Slave architecture and route all the write queries on the Master instance and all the read queries on the slave instance which are replicated from the Master. We can have multiple Slave instances running at one and scale our read operations horizontally. But the Master can only be scale Vertically. In most of the cases databases are read heavy, so this approach will work in most of the use cases.

Step 1: Application Development Considerations

While developing the application, we should follow the CQRS design pattern. CQRS stands for Command and Query Responsibility Segregation, a pattern that separates read and update operations for a data store. Implementing CQRS in your application can maximize its performance, scalability, and security. The flexibility created by migrating to CQRS allows a system to better evolve over time and prevents update commands from causing merge conflicts at the domain level. In short our application will have two connection strings, one for read operations and the another one for update operations.

As we have a single Master write node we can use that connection string for update operations and we will have multiple read nodes(slaves), so we need to setup a load balancer for that which will equally distribute the load among multiple read nodes.

Step 2: Setup Load Balancer for Read Only Nodes

We can set up Amazon Route 53 weighted record sets to distribute requests across your read replicas. Within a Route 53 hosted zone, create individual record sets for each DNS endpoint associated with your read replicas. Then, give them the same weight, and direct requests to the sub domain/endpoint of the record set.

How to Create Read Replicas

Assuming that you already have a MySQL RDS in your AWS account. Follow the below steps to create a read replica, and repeat the steps to create multiple replicas if required. In order to evaluate the load balancing feature, we should create at least 2 replicas.

  • Type rds in AWS Console search box and select RDS
  • Select Databases from the left panel
  • Select the database you want to create read replica on
  • Click on the Actions menu and select Create read replica as shown in the below screenshot.

  • Then select the Db Instance class
  • Select Publicly Accessible to Yes
  • Select the VPC Security Groups(You can select the same security group of your master node)
  • Enter the Db Instance Indentifier and then click ‘Create Read Replica`

Read replica will be created in a few minutes. Repeat the above steps to create one more node.

Create DNS Based Load Balancer

To create a DNS based load balancer, you have to set up a hosted zone in Route 53. Follow the below steps to create a hosted zone and record set.

  • Type route 53 in AWS Console search box and select Route 53 from the result.
  • Click Create Hosted Zone

  • Enter the Domain name, Description is optional
  • Select the Public hosted zone in Type option
  • Click Create hosted zone

  • Now we need to create Records in the newly created hosted zone
  • Select Create Record
  • Enter a subdomain name in the Name field
  • Select CNAME as Type
  • For Value enter the endpoint DNS name of the first read replica
  • For TTL value, set a value that is appropriate for your needs
  • For Routing Policy, choose Weighted
  • In the Weight field, enter a value. Be sure to use the same value for each replica’s record set
  • Provide an Id for the Record set
  • Repeat the steps to create records for all the replicas. Keep the same name (subdomain) for all the records.

Now your records would look like the below screenshot:

Update the NS records Entries in the Custom Name Server

Now copy the all four NS record values. You have to go to your Domain Registrar’s portal (godaddy, google domains etc.) and update the custom name server values there. In case of Google Domains, that will look like below. The changes may take some time to reflect.

Once your NS records are properly updated, you will be able to use the newly created subdomain as your read only database host name. You can use the same credentials of your master database to access the read only load balanced instances

Now you can configure your Master hostname for CUD operations and load balanced hostname for Read operations.

AWS Serverless and DynamoDb Single Table Design using .Net 6 – Part 2

Introduction

This is a continuation of the previous article AWS Serverless and DynamoDb Single Table Design using .Net 6 – Part 1. In this part we’re going to create a sample Serverless application using DynamoDb and deploy that on AWS Lambda.

Tools

Configure

Configure the AWS Toolkit using the link. While creating the IAM user make sure to attach the below policies

AWS Policies
Image: 1 

We’ll be using this user for creating the serverless application and deploying the same from Visual Studio or dotnet tools command line interface.

Why Serverless

Serverless solutions offer technologies for running code, managing data, and integrating applications, all without managing servers. Serverless technologies feature automatic scaling, built-in high availability, and a pay-for-use billing model to increase agility and optimize costs. These technologies also eliminate infrastructure management tasks like capacity provisioning and patching, so you can focus on writing code that serves your customers. Some of the popular serverless solutions are AWS Lambda and Azure Functions.

Development

AWS Toolkit for Visual Studio provides many built-in templates for creating AWS based serverless applications quickly.

Create a new project from Visual Studio and type ‘serverless’ in the search box and select `AWS Serverless Application (.NET Core – C#).

Image: 2

Enter the project name and continue.

Image: 3

Then select the ASP.NET Core Web API blueprint from the selection and click Finish

Image: 4

Once the project is ready in Visual Studio, you can see a file called serverless.template. This is AWS CloudFormation Serverless Application Model template file for declaring your Serverless functions and other AWS resources. Make sure to add two policies( AWSLambda_FullAccess and AmazonDynamoDBFullAccess) as shown below: These permissions are required for the Lambda to Read and Write to DynamoDb

Image: 5

Then add the below Nuget packages:

AWSSDK.DynamoDBv2Newtonsoft.JsonSwashbuckle.AspNetCore.SwaggerGenSwashbuckle.AspNetCore.SwaggerUI

Create an Interface called IEmployeeDb to define the methods

public interface IEmployeeDb{   Task<IEnumerable<EmployeeModel>> GetAllReporteesAsync(string empCode);   Task<EmployeeModel> GetEmployeeAsync(string empCode);   Task SaveAsync(EmployeeModel model);   Task SaveBatchAsync(List<EmployeeModel> models);}

Create a class to implement the IEmployeeDb interface. The constructor would look like the below:

public EmployeeDb(ILogger<EmployeeDb> logger, IWebHostEnvironment configuration){   //Comment out the below four line if you're not using the DynamoDb local instance.   if (configuration.IsDevelopment())   {      _clientConfig.ServiceURL = "http://localhost:8000";   }   _client = new AmazonDynamoDBClient(_clientConfig);   _context = new DynamoDBContext(_client);   _logger = logger;}

We configured the ServiceURL to point the localhost in case we’re using DynamoDb local instance. We also initialized the AmazonDynamoDBClient and DynamoDBContext. We’ll be mainly using the Highlevel API called DynamoDBContext for reading and writing data from DynamoDb.

The below methods are responsible for writing/saving the data:

public async Task SaveAsync(EmployeeModel model){   await SaveInDbAsync(GetUserModelForSave(PrepareEmpModel(model)));   await SaveInDbAsync(GetReporteeModelForSave(PrepareEmpModel(model)));}private async Task SaveInDbAsync(EmployeeModel model){   await _context.SaveAsync(model);   _logger.LogInformation("Saved {} successfully!", model.EmployeeCode);}private EmployeeModel PrepareEmpModel(EmployeeModel model){   model.EmployeeCode = model.EmployeeCode?.ToUpper();   model.ReportingManagerCode = model.ReportingManagerCode?.ToUpper();   return model;}

When saving a record, this method will actually insert two objects, one for user type and the other for reportee type. We discussed the reason and logic for creating two entries in the previous part.

In the below method we implemented the logic for fetching the employee by EmployeeCode:

public async Task<EmployeeModel> GetEmployeeAsync(string empCode){   var result = await _context.LoadAsync<EmployeeModel>(empCode.ToUpper(), empCode.ToUpper());   if (result != null)      result.ReportingManagerCode = ""; //ReportingManagerCode was same as EmployeeCode, so just remove it   return result;}

Next method will cover the logic for fetching the reportees by EmployeeCode:

public async Task<IEnumerable<EmployeeModel>> GetAllReporteesAsync(string empCode){   var config = new DynamoDBOperationConfig   {      QueryFilter = new List<ScanCondition> {         new ScanCondition("Type", ScanOperator.Equal, "Reportee"),         new ScanCondition("LastWorkingDate", ScanOperator.IsNull)      }   };   var result = await _context.QueryAsync<EmployeeModel>(empCode.ToUpper(), config).GetRemainingAsync();   return PrepareReporteeReturnModel(result); //swap the EmployeeCode and  ReportingManagerCode and return}

All the other code fragments and complete solution can be downloaded from the GitHub repository.

Once you complete the development you need to create a DynamoDb table in your AWS account. There are many ways to create a service in AWS. You can use CLI, Console, SDK or even Visual Studio Toolkit. Below is the CLI command for creating the table and setting up the pk and sk.

aws dynamodb create-table --table-name employees --attribute-definitions AttributeName=EmployeeCode,AttributeType=S         AttributeName=ReportingManagerCode,AttributeType=S --key-schema AttributeName=EmployeeCode,KeyType=HASH AttributeName=ReportingManagerCode,KeyType=RANGE --provisioned-throughput ReadCapacityUnits=1,WriteCapacityUnits=1 --table-class STANDARD

Now you can deploy the serverless application either using Visual Studio or dotnet tools. To deploy using Visual Studio, right click on the project and select the Publish to AWS Lambda button.

To deploy using dotnet tools you need to follow the below steps in the command line.

dotnet tool install -g Amazon.Lambda.Toolscd "AWSServerlessDynamoDb/AWSServerlessDynamoDb" #or whatever the folderdotnet lambda deploy-serverless

After successful deployment, you will get a Lambda endpoint(ApiURL)  as below:

Image: 6

You can access your SwaggerUI by adding /swagger in the above url and you can test the APIs.

Complete source code can be found here.

Happy coding!!

AWS Serverless and DynamoDb Single Table Design using .Net 6 – Part 1

Introduction

When developing a high performance scalable application everybody tends to use the below technologies.

  • Serverless Functions or Lambdas
  • Cloud managed NoSQL databases like DynamoDb or CosmosDb
  • Database design strategies like Single Table Design

In this article we’ll cover about Single Table Design. Next part we’ll create a Serverless application using .Net 6 and DynamoDb.

Use Case

Recently we worked on a Social Networking platform and we used Single Table Design in that project. That use case is very complex and overwhelming for a beginner, so let’s consider an imaginary use case (this may not exactly fit the Single Table Design). But let’s consider an Employee REST API which will help us to design a basic Single Table Design. Here are the features of the API:

  • User will be able to add a Employee
  • User will be able to fetch the Employee details with the EmployeeCode
  • User will be able to fetch the immediate Reportees (for the sake of simplicity) of the Employee/Manager

Schema

EmployeeCodeEmailIdFirstNameLastNameReportingManagerCode

In the RDBMS world EmployeeCode will be the Primary Key and ReportingManagerCode will be the foreign key pointing to the same table using self join.

Single Table Design

In RDBMS, we use multiple tables in a database, and that tables may be interrelated with foreign keys and we tend to normalize the tables up to a certain level and avoid duplicate storage as far as we can. In the NoSQL world(especially in DynamoDb), there are no foreign keys and joins(and there is a reason for that), and do not care about duplicacy. In Single Table Design, we put all the entities(eg: Post, User, Comment, Follower etc.) in a single table and may use the ‘Type’ attribute to identify the entities.

Why

In a Read Heavy database, many(millions of) users will be accessing the different content at the same time. So you have to fetch the data as fast as possible. If you want to return the data quickly you have to minimize the database requests for a single API call. In RDBMS even though we’re making a single call most of the queries will have complex joins and involve multiple tables, as the data size increases these queries take more time.

If you have to fetch the Posts of all the users who I’m following, then in SQL-based databases you have to join ‘Users’, ‘Followers’, ‘Posts’, ‘Comments’ etc. If you store the entities in separate DynamoDb tables then you’ve to make multiple calls from your backend to DynamoDb and do some JSON manipulations and return that to the Frontend. We cannot afford that many Db calls from the backend, so we need to get all the data in a single Db request.

How

In DynamoDb within each table, you must have a partition key, which is a string, numeric, or binary value. This key is a hash value used to locate items in constant time regardless of table size. It is conceptually different to an ID or primary key field in a SQL-based database and does not relate to data in other tables. When there is only a partition key, these values must be unique across items in a table.

Each table can optionally have a sort key. This allows you to search and sort within items that match a given primary key. While you must search on exact single values in the partition key, you can pattern search on sort keys. It’s common to use a numeric sort key with timestamps to find items within a date range, or use string search operators to find data in hierarchical relationships.

With only partition keys and sort keys, this limits the possible types of query without duplicating data in a table(even though there is no harm in duplicating the data as storage cost is very less, but modifying the multiple copies is another headache). To solve this issue, DynamoDB also offers two types of indexes ie: Local secondary indexes (LSIs) and Global secondary indexes (GSIs). We can discuss these topics in a separate session.

Single Table Design is not ‘Agile’, you have to identify the Data Access Patterns in the beginning of the project, otherwise you may identify a use case later which may require an entire redesign of the data structure. So let’s identify our access patterns first.

Data Access Patterns

Our patterns are the GET API responses we discussed earlier. In our case it’s simple as of now.

  • User will be able to fetch the Employee details with the EmployeeCode
  • User will be able to fetch the immediate Reportees of the Employee/Manager
User will be able to add an Employees

Let’s create the table as follows. Partition Key is EmployeeCode, and Sort Key is ReportingManagerCode

Table: 1
Table: 1

Now things look simple, we can get the entity based on pk, let’s evaluate the next access pattern and come back here if required.

User will be able to fetch the immediate Reportees of the Employee/Manager

Suppose we need to fetch all the direct reportees of user 11. We can see if we can query using the sort key then we could have fetched all the reportees of 11 in a single query, but here your challenges start. You cannot query a DynamoDb table without providing a pk equals statement. So if you query pk=11 then you’ll get only one record.

Now the next step is to evaluate whether we can use LSI or GSIs to solve the problem or not. LSI is just another sort key and in query we need to provide pk then LSI won’t work here. If we use GSI then you can create one more pk and sk, but then you can’t use the main pk or sk.

Next option is to duplicate the data, let’s think about that. In the above table structure one record was self-sufficient(it had both EmployeeCode and ReportingManagerCode), but in the below format.we separated the entity. We also added a type attribute to identify the type of entity.

Table: 2
Table: 2

In user entities both pk and sk are the same (ie EmployeeCode). We duplicated the same entities and swapped the pk and sk and assigned the type as ‘reportee’. Now let’s evaluate the query. If we ran a query as pk=11 we will get three records

Table: 3
Table: 3

One record is for the Manager and multiple records for the reportees, we can filter out the user type by filter expressions if required. So the second access pattern has been solved, but we’ve a problem in the first access pattern now. Our first query was pk=11, but now that will return 3 records so we need to fix that. We can use pk=11 and sk=11. Solved!

Conclusions

The use case we discussed here was very basic one, but still we had to take care of multiple things and we also had to duplicate the data. Duplicating the data will further complicate things like updation and deletions etc. You may need to implement Message Queues like SQS and BackgroundService to solve that issue.

In the next article, we will cover the actual practical implementation with code samples.

Generic Message Queue implementation using AWS SQS and .Net 6 BackgroundService

Requirement

Most of the Enterprise projects we develop, we have to implement some cross-cutting concerns like Audit Logs. The important factor is that application performance should not impact because of these Audit Logs (or similar cross-cutting concerns). So how to implement this without compromising the performance?

BackgroundService

Background tasks and scheduled jobs are something you might need to use in any application, whether or not it follows the microservices architecture pattern. The difference when using a microservices architecture is that you can implement the background task in a separate process/container for hosting so you can scale it down/up based on your need.

From a generic point of view, in .NET we called these type of tasks Hosted Services, because they are services/logic that you host within your host/application/microservice. Note that in this case, the hosted service simply means a class with the background task logic.

In our sample we will be configuring the BackgroundService in the same Web API project for the sake of simplicity, but in the real production scenario you should consider a separate service. So BackgroundService can offload the Audit Log writing mechanism from the Web API. Now the challenge is how should we send the Audit Log objects to BackgroundService?

Message Queue

Message queues allow different parts of a system to communicate and process operations asynchronously. A message queue provides a lightweight buffer which temporarily stores messages, and endpoints that allow software components to connect to the queue in order to send and receive messages. The messages are usually small, and can be things like requests, replies, error messages, or just plain information. To send a message, a component called a producer adds a message to the queue. The message is stored on the queue until another component called a consumer retrieves the message and does something with it.

There are different implementations of Message Queues exists, multiple cloud providers provide their own implementations. Here we’re using AWS SQS

Steps

Create an ASP.NET Core Web API

Install Dependencies

Create a folder Services -> Contracts and create a Generic Interface called IMessageService as follows:

public interface IMessageService<T>{    Task DeleteMessageAsync(string id);    Task<Dictionary<string, T?>> ReceiveMessageAsync(int maxMessages = 1);    Task SendMessage(T message);}

Now let’s create the SQS Message Service by implementing the above Interface

Let’s first create the Constructor and this will create AWS SQS Client. Before doing that we need to create an IAM user with required permissions.

Create IAM user

  • Go to the AWS console and search IAM.
  • Click on the Users panel on the left.
  • Click on the Add User button.
  • Provide a user name and select the Access key - Programmatic access checkbox and click Next: Permissions button.
  • Click on Attach existing policies directly tab
  • Search AmazonSQSFullAccess and select that policy. We would require FullAccess because we will be creating the Queue programmatically if that Queue does not exist.
  • Click on Next button twice and finally click on Create User button.
  • Copy the Access key ID and Secret key ID and store in a safe place.

Configure the AWS Credentials

There are multiple ways to configure the AWS credentials. I used AWS CLI, which can be downloaded from here. Once it’s downloaded and installed in your machine follow the below command in Terminal or Command Prompt.

aws configure

The above command will access the following details from user and store in the ~/.aws/config file. AWS SDK will fetch these credentials and create the clients.

AWS Access Key IDAWS Secret Access KeyDefault region name

Constructor code will look like the below:

public SqsGenericService(ILogger<SqsGenericService<T>> logger, IConfiguration configuration, IHostingEnvironment env){    _logger = logger;    var options = c;    //This queueName will be used to create the SQS Queue for each type of object in different environments    var queueName = $"que-{env.EnvironmentName.ToLower()}-{typeof(T).Name.ToLower()}";    _amazonSQSClient = options.CreateServiceClient<IAmazonSQS>();    _queueUrl = GetQueueUrl(queueName).Result;}

Most of  the code is self explanatory here. The configuration.GetAWSOptions() will fetch the AWS configurations.

Dynamic Queue creation for each environments and entities

queueName variable will be created concatenating the environment name and the name of the generic entity. GetQueueUrl() method will fetch the queue url if it already exists or else it will create a queue.

The next method is SendMessage. This method will accept a Generic message object and use AWS SQS Client to push the serialized object to the Queue.

public async Task SendMessage(T message){    var messageBody = JsonConvert.SerializeObject(message);    await _amazonSQSClient.SendMessageAsync(new SendMessageRequest    {       QueueUrl = _queueUrl,       MessageBody = messageBody    });    _logger.LogInformation("Message {message} send successfully to {_queueUrl}.", message, _queueUrl);}

The next method is ReceiveMessageAsync, this method will fetch the messages from queue and convert that to a Dictionary of MessageReceiptHandle and MessageBody as the consumer of this service would require RecieptHandle to delete the message after processing.

Worker Process

To implement the worker process, as mentioned earlier, decided to use BackgroundService. Here is the complete code for the AuditLogWorker class.

 public class AuditLogWorker : BackgroundService {     private readonly ILogger<AuditLogWorker> _logger;     private readonly IMessageService<AuditLogModel> _messageClient;     public AuditLogWorker(ILogger<AuditLogWorker> logger, IMessageService<AuditLogModel> messageClient)     {         _logger = logger;         _messageClient = messageClient;     }     protected override async Task ExecuteAsync(CancellationToken stoppingToken)     {         while (!stoppingToken.IsCancellationRequested)         {             _logger.LogInformation("AuditLogWorker running at: {Time}", DateTime.Now);             var messages = await _messageClient.ReceiveMessageAsync();             foreach (var message in messages)             {                 //You can write your custom logic here...                 _logger.LogInformation("AuditLogWorker processed message {userID}, {Action}", message.Value?.UserId, message.Value?.Message);                 await _messageClient.DeleteMessageAsync(message.Key);             }         await Task.Delay(5000, stoppingToken); //Delay can be set according to your business requirement.         }     }}

Modify the Program.cs to add the necessary dependencies and configure the hosted service as below:

builder.Services.AddSingleton<IMessageService<AuditLogModel>, SqsGenericService<AuditLogModel>>();builder.Services.AddHostedService<AuditLogWorker>();

Create a simple REST API method to accept an object and push that item to the queue and test it. The testing method will look like the below:

 [HttpPost] public async Task Post([FromBody] AuditLogModel model) {     await _messageClient.SendMessage(model);     _logger.LogInformation("Message pushed to the queue successfully."); }

That’s it folks, I hope everybody enjoyed the blog. The entire code can be downloaded from the GitHub repo

Happy coding!!!