Single Page Application — With Worker Process

What is Worker Process

A worker process is a type of computer process that runs in the background and performs tasks as assigned by a main process. It is typically used to offload tasks from a main process, allowing the main process to continue with other operations while the worker process handles the task in the background.

This article constitutes a component of a comprehensive collection of web architecture pattern references.

Single Page Application Frontend
Backend for SPA and Mobile App (Managed Services)
Backend for SPA and Mobile App (Serverless)
Multi Page Application (Integrated Web)
Single Page Application with Worker Process

How it works

In cloud computing, worker processes can be implemented using messaging or queues. The main process adds tasks to a queue, and worker processes monitor the queue for new tasks. When a worker process finds a new task in the queue, it takes the task and performs the work. This ensures that tasks are performed in the order in which they were received and that multiple worker processes can operate in parallel to handle a large number of tasks.

Examples of messaging systems used for worker process implementation in the cloud include RabbitMQ, Apache Kafka, and Amazon Simple Queue Service (SQS). The messaging system provides a way for the main process to communicate with the worker processes, allowing tasks to be dispatched and results to be returned.

Components

The main components of a worker process can include:

  • Task queue: A task queue is used to store the tasks that need to be performed. The worker processes monitor the queue for new tasks.

  • Task dispatcher: The task dispatcher is responsible for allocating tasks to the worker processes. It ensures that tasks are dispatched to the worker processes in a fair and efficient manner.

  • Worker processes: The worker processes are the actual processes that perform the tasks. They receive tasks from the task queue, perform the work, and return the results.

  • Result storage: A result storage mechanism is used to store the results of the tasks performed by the worker processes. This can be a database, a file system, or another type of data storage.

  • Error handling: A worker process should have error handling mechanisms in place to deal with unexpected errors that may occur during task processing. This can include retrying failed tasks or logging the error for later review.

  • Monitoring and reporting: A monitoring and reporting system is used to monitor the performance of the worker processes and provide feedback to the main process. This can include information about the number of tasks processed, the processing time for each task, and any errors that may have occurred.

Architecture

There are multiple architecture patterns to implement a Worker Process, and those are totally dependant on use cases.

Use Case 1

An example use case can be sending newsletters to all subscribed users at 10 AM.

  • Application has to execute some long running jobs on specific intervals.
  • Application should not use any compute resources if it’s not executing any jobs
  • Jobs should be triggered based some external or internal events or from schedulers

In this use case we can utilize AWS EventBridge Scheduler to trigger the jobs on specific intervals. AWS ECS Tasks can be used to execute the jobs, and once the job is done these tasks will end the execution and de-allocate the compute resources. We don’t require any Messaging/Queue services in this case(will cover that in the next use case).

The following diagram explains the architecture of the worker process along with other important components.

Single Page Application with Worker Process (Use Case 1)

Use Case 2

An example use case can be sending registration welcome messages to users after their successful registration on the platform

  • After completing the registration process, a message object with userId will be pushed to a Queue(eg: SQS).
  • A Lambda configured as an SQS trigger will be triggered and send the email to the user.
  • Update database if required.
  • As lambda is a serverless component, the costing will be calculated based on number of executions and time.

The following diagram explains the architecture of the worker process along with other important components.

Single Page Application with Worker Process (Use Case 2)

Previous – Multi Page Application (Integrated Web)

Home

Generic Message Queue implementation using AWS SQS and .Net 6 BackgroundService

Requirement

Most of the Enterprise projects we develop, we have to implement some cross-cutting concerns like Audit Logs. The important factor is that application performance should not impact because of these Audit Logs (or similar cross-cutting concerns). So how to implement this without compromising the performance?

BackgroundService

Background tasks and scheduled jobs are something you might need to use in any application, whether or not it follows the microservices architecture pattern. The difference when using a microservices architecture is that you can implement the background task in a separate process/container for hosting so you can scale it down/up based on your need.

From a generic point of view, in .NET we called these type of tasks Hosted Services, because they are services/logic that you host within your host/application/microservice. Note that in this case, the hosted service simply means a class with the background task logic.

In our sample we will be configuring the BackgroundService in the same Web API project for the sake of simplicity, but in the real production scenario you should consider a separate service. So BackgroundService can offload the Audit Log writing mechanism from the Web API. Now the challenge is how should we send the Audit Log objects to BackgroundService?

Message Queue

Message queues allow different parts of a system to communicate and process operations asynchronously. A message queue provides a lightweight buffer which temporarily stores messages, and endpoints that allow software components to connect to the queue in order to send and receive messages. The messages are usually small, and can be things like requests, replies, error messages, or just plain information. To send a message, a component called a producer adds a message to the queue. The message is stored on the queue until another component called a consumer retrieves the message and does something with it.

There are different implementations of Message Queues exists, multiple cloud providers provide their own implementations. Here we’re using AWS SQS

Steps

Create an ASP.NET Core Web API

Install Dependencies

Create a folder Services -> Contracts and create a Generic Interface called IMessageService as follows:

public interface IMessageService<T>{    Task DeleteMessageAsync(string id);    Task<Dictionary<string, T?>> ReceiveMessageAsync(int maxMessages = 1);    Task SendMessage(T message);}

Now let’s create the SQS Message Service by implementing the above Interface

Let’s first create the Constructor and this will create AWS SQS Client. Before doing that we need to create an IAM user with required permissions.

Create IAM user

  • Go to the AWS console and search IAM.
  • Click on the Users panel on the left.
  • Click on the Add User button.
  • Provide a user name and select the Access key - Programmatic access checkbox and click Next: Permissions button.
  • Click on Attach existing policies directly tab
  • Search AmazonSQSFullAccess and select that policy. We would require FullAccess because we will be creating the Queue programmatically if that Queue does not exist.
  • Click on Next button twice and finally click on Create User button.
  • Copy the Access key ID and Secret key ID and store in a safe place.

Configure the AWS Credentials

There are multiple ways to configure the AWS credentials. I used AWS CLI, which can be downloaded from here. Once it’s downloaded and installed in your machine follow the below command in Terminal or Command Prompt.

aws configure

The above command will access the following details from user and store in the ~/.aws/config file. AWS SDK will fetch these credentials and create the clients.

AWS Access Key IDAWS Secret Access KeyDefault region name

Constructor code will look like the below:

public SqsGenericService(ILogger<SqsGenericService<T>> logger, IConfiguration configuration, IHostingEnvironment env){    _logger = logger;    var options = c;    //This queueName will be used to create the SQS Queue for each type of object in different environments    var queueName = $"que-{env.EnvironmentName.ToLower()}-{typeof(T).Name.ToLower()}";    _amazonSQSClient = options.CreateServiceClient<IAmazonSQS>();    _queueUrl = GetQueueUrl(queueName).Result;}

Most of  the code is self explanatory here. The configuration.GetAWSOptions() will fetch the AWS configurations.

Dynamic Queue creation for each environments and entities

queueName variable will be created concatenating the environment name and the name of the generic entity. GetQueueUrl() method will fetch the queue url if it already exists or else it will create a queue.

The next method is SendMessage. This method will accept a Generic message object and use AWS SQS Client to push the serialized object to the Queue.

public async Task SendMessage(T message){    var messageBody = JsonConvert.SerializeObject(message);    await _amazonSQSClient.SendMessageAsync(new SendMessageRequest    {       QueueUrl = _queueUrl,       MessageBody = messageBody    });    _logger.LogInformation("Message {message} send successfully to {_queueUrl}.", message, _queueUrl);}

The next method is ReceiveMessageAsync, this method will fetch the messages from queue and convert that to a Dictionary of MessageReceiptHandle and MessageBody as the consumer of this service would require RecieptHandle to delete the message after processing.

Worker Process

To implement the worker process, as mentioned earlier, decided to use BackgroundService. Here is the complete code for the AuditLogWorker class.

 public class AuditLogWorker : BackgroundService {     private readonly ILogger<AuditLogWorker> _logger;     private readonly IMessageService<AuditLogModel> _messageClient;     public AuditLogWorker(ILogger<AuditLogWorker> logger, IMessageService<AuditLogModel> messageClient)     {         _logger = logger;         _messageClient = messageClient;     }     protected override async Task ExecuteAsync(CancellationToken stoppingToken)     {         while (!stoppingToken.IsCancellationRequested)         {             _logger.LogInformation("AuditLogWorker running at: {Time}", DateTime.Now);             var messages = await _messageClient.ReceiveMessageAsync();             foreach (var message in messages)             {                 //You can write your custom logic here...                 _logger.LogInformation("AuditLogWorker processed message {userID}, {Action}", message.Value?.UserId, message.Value?.Message);                 await _messageClient.DeleteMessageAsync(message.Key);             }         await Task.Delay(5000, stoppingToken); //Delay can be set according to your business requirement.         }     }}

Modify the Program.cs to add the necessary dependencies and configure the hosted service as below:

builder.Services.AddSingleton<IMessageService<AuditLogModel>, SqsGenericService<AuditLogModel>>();builder.Services.AddHostedService<AuditLogWorker>();

Create a simple REST API method to accept an object and push that item to the queue and test it. The testing method will look like the below:

 [HttpPost] public async Task Post([FromBody] AuditLogModel model) {     await _messageClient.SendMessage(model);     _logger.LogInformation("Message pushed to the queue successfully."); }

That’s it folks, I hope everybody enjoyed the blog. The entire code can be downloaded from the GitHub repo

Happy coding!!!