• Insights

Kentico & RabbitMQ Integration: Part 3 – External Workers

This is the third article in our RabbitMQ integration series, where we show you how to to build robust and scalable integrations using Kentico, RabbitMQ and message driven development.

In the first article, Outbound Integration, I talked about how to configure Kentico to send messages to RabbitMQ for processing by one or more external systems.

In the second article, Inbound Integration, I described how to configure Kentico to receive inbound messages from those external systems.

In this article I will detail how to create a separate application to act as a “worker”, and how to run multiple instances of this worker simultaneously. This will allow you to rapidly process even high-volume message queues.

Setup

Assumptions

For the purposes of this article:

1. You importing products from an external system

2. The messages in the your queue all follow the same format. This is good practice when setting up your queues.

3. There will be three types of message you  might receive: full product, price update, stock update.

Message formats

All three types of message will follow the same format:

	{  
	    "messageType": "",  
	    "messageSent": "2018-09-04T09:28:00Z"  
	    "data": {  

	    }  
	}  


The messageType property may contain one of the following values:

  • Product
  • Price
  • StockLevel

The data property will contain an object appropriate to the type. Examples of each below:

Product

	{  
	    "messageType": "Product",  
	    "messageSent": "2018-09-04T09:28:00Z"  
	    "data": {  
	        "productCode": "TEST001",  
	        "productName": "Test product",  
	        "productPriceExVat": 124.99,  
	        "productSummery": "A short description of the product goes here",  
	        "productDescription": "A longer description can go here. This field could include HTML code.",  
	        "productMainImageUrl": "http://www.somedomain.com/path/to/image.jpg",  
	        "productAdditionalImages": [ /* An array of image URL's as strings */ ],  
	        "productStockAvailable": 123,  
	        "productAllowedForSale": true  
	    }  
	} 

Price

	{  
	    "messageType": "Price",  
	    "messageSent": "2018-09-04T09:28:00Z"  
	    "data": {  
	        "productCode": "TEST001",  
	        "productPriceExVat": 124.99,  
	        "productAllowedForSale": true  
	    }  
	}  

StockLevel

	{  
	    "messageType": "StockLevel",  
	    "messageSent": "2018-09-04T09:28:00Z"  
	    "data": {  
	        "productCode": "TEST001",  
	        "productStockAvailable": 123,  
	        "productAllowedForSale": true  
	    }  
	}  

Creating the Application

I will be building the application as a .NET Framework 4.7 Console application. The project type is largely unimportant, and you may wish to use a different project type depending on your infrastructure.

Create your solution in Visual Studio and install the packages below.

  • RabbitMQ.Client
  • Kentico.Libraries
  • Newtonsoft.Json

Once installed you will need to add your Kentico connection string to the web.config with the name “CMSConnectionString”.

Now that the skeleton application is set up I will begin the development by adding the interfaces and classes that I’ll need.

Interfaces

Message model

I described this briefly in part 1 of this series:

	public interface IRabbitMqMessageModel<T>  
	    where T : class  
	{  
	    string MessageType { get; set; }  
	    DateTime MessageSent { get; set; }  
	    T Data { get; set; }  
	}  

You’ll notice that this mirrors the JSON message model above. When creating concrete classes using this interface we can tell each class what the type “T” is using the generic interface.

Message processor

I will have a separate class to process each type of message. This interface will ensure that these classes will all have the same public signature.

	public interface IMessageProcessor<T, T2>  
	where T : class, IRabbitMqMessageModel<T2> 
	where T2 : class
	{  
	    void Execute(T data);  
	}  

In this case the type “T” will be the exact type of the message model for this processor.

Data model classes

These classes will form the basis of all data moving through this worker and will closely mirror the “data” property from the JSON objects detailed above.

Product data model

	public class ProductDataModel{  
	    public string ProductCode { get; set; }  
	    public string ProductName { get; set; }  
	    public decimal ProductPriceExVat { get; set; }  
	    public string ProductSummary { get; set; }  
	    public string ProductDescription { get; set; }  
	    public string ProductMainImageUrl { get; set; }  
	    public IEnumerable<string> ProductAdditionalImages { get; set; }  
	    public int ProductStockAvailable { get; set; }  
	    public bool ProductAllowedForSale { get; set; }  
	}  

Price data model

	public class PriceDataModel{  
	    public string ProductCode { get; set; }  
	    public decimal ProductPriceExVat { get; set; }  
	    public bool ProductAllowedForSale { get; set; }  
	}  

Stock level data model

	public class StockLevelDataModel{  
	    public string ProductCode { get; set; }  
	    public int ProductStockAvailable { get; set; }  
	    public bool ProductAllowedForSale { get; set; }  
	}  

Message model classes

These message model classes form the final piece of the puzzle that will allow me to use Newtonsoft.Json to directly de-serialise the message from the queue into a strongly typed object model.

This is an easy way to get the data out of the messages quickly and efficiently.

Product message model

	public class ProductMessageModel : IRabbitMqMessageModel<ProductDataModel>{  
	    public string MessageType { get; set; }  
	    public DateTime MessageSent { get; set; }  
	    public ProductDataModel Data { get; set; }  
	}  

Price message model

	public class PriceMessageModel : IRabbitMqMessageModel<PriceDataModel>{  
	    public string MessageType { get; set; }  
	    public DateTime MessageSent { get; set; }  
	    public PriceDataModel Data { get; set; }  
	}  

Stock level message model

	public class StockLevelMessageModel : IRabbitMqMessageModel<StockLevelDataModel>{  
	    public string MessageType { get; set; }  
	    public DateTime MessageSent { get; set; }  
	    public StockLevelDataModel Data { get; set; }  
	}  

The message processor classes

At this point you could have a single class that deals with importing any/all of the three message types using a switch statement on the MessageType property, but that would break the first rule of  SOLID development: each class must have a single responsibility.

To that end I would create a separate class for each of the three message types. This pattern also makes the system easily extendible – you could add new message types later along with a class for each to process them.

These classes will implement the IMessageProcessor<> interface and the Execute method will process the single product in the message, inserting or updating as necessary within Kentico.

The ProductMessageProcessor class will look like this at its most basic:

	public class ProductMessageProcessor : IMessageProcessor<ProductMessageModel, ProductDataModel>  
	{  
	    public void Execute(ProductMessageModel data)  
	    {  
	              
	    }  
	}  

Within the Execute method you should perform any logic needed to insert/update the product in Kentico – See Kentico’s documentation for details on how to do this.

The other message processors will look very similar in construction thanks to the common IMessageProcessor interface. The main difference will be in the DataModel that is used in the generic definition. 

Once you have the message processors done, its time to bring it all together…

The main application thread

In the Main method of Program.cs in my console application I would set it up as follows:

	private static readonly ManualResetEvent QuitEvent = new ManualResetEvent(false);  
	static void Main(string[] args)  
	{  
	    Console.CancelKeyPress += (sender, eventArgs) =>  
	    {  
	        QuitEvent.Set();  
	        eventArgs.Cancel = true;  
	    };  
	  
	    Setup();  
	  
	    DoWork();  
	  
	    QuitEvent.WaitOne();  
	  
	    GracefulClosedown(); // An optional method to handle cleanup on shutdown
	} 

The QuitEvent allows us to keep the application running indefinitely, allowing us to utilise the “subscription” method of receiving messages from the queue without the need of an infinite loop.

In the Setup method I configure the RabbitMQ connection…

	private static IConnection _rabbitMqConnection;  
	private static IModel _rabbitMqChannel;  
	  
	private static void Setup()  
	{  
	    var factory = new ConnectionFactory  
	    {  
	        UserName = ConfigurationManager.AppSettings["RmqUserName"],  
	        Password = ConfigurationManager.AppSettings["RmqPassword"],  
	        VirtualHost = ConfigurationManager.AppSettings["RmqVirtualHost"],  
	        HostName = ConfigurationManager.AppSettings["RmqHostName"],  
	        RequestedHeartbeat = 150,  
	        Ssl = new SslOption  
	        {  
	            ServerName = ConfigurationManager.AppSettings["RmqHostName"],  
	            Enabled = false,  
	            AcceptablePolicyErrors = SslPolicyErrors.RemoteCertificateNameMismatch & SslPolicyErrors.RemoteCertificateNotAvailable & SslPolicyErrors.RemoteCertificateChainErrors  
	        }  
	    };  
	  
	    IConnection service = null;  
	    try  
	    {  
	        service = factory.CreateConnection();  
	    }  
	    catch (BrokerUnreachableException ex)  
	    {  
	        // You could add logging here.  
	        QuitEvent.Set(); // allow the app to closedown gracefully  
	    }  
	  
	    if (service == null)  
	    {  
	        // You could log this too  
	        QuitEvent.Set();  
	        return;  
	    }  
	  
	    _rabbitMqConnection = service;  
	    _rabbitMqChannel = service.CreateModel();  
	    _rabbitMqChannel.ExchangeDeclare("ProductsExchange", ExchangeType.Direct);  
	    _rabbitMqChannel.QueueDeclare("ProductsQueue", false, false, false, null);  
	    _rabbitMqChannel.QueueBind("ProductsQueue", "ProductsExchange", "ProductsQueue");  
	}  

Then, in the DoWork method I register the handler method for consuming the messages by subscription…

	private static void HandleMessage(object sender, BasicDeliverEventArgs e)  
	{  
	    var message = e.Body;  
	  
	    var messageModel = JsonConvert.DeserializeObject<IRabbitMqMessageModel<object>>(Encoding.UTF8.GetString(message));  
	  
	    // Through the use of reflection this switch could be replaced   
	    // with dynamic selection of the correct message processor  
	    // removing the need for manual code updates here when new ones are added.  
	    try  
	    {  
	        switch (messageModel.MessageType.ToLowerInvariant())  
	        {  
	            case "product":  
	                new ProductMessageProcessor().Execute ((ProductMessageModel) messageModel);  
	                break;  
	  
	            case "price":  
	                new PriceMessageProcessor().Execute((PriceMessageModel) messageModel);  
	                break;  
	  
	            case "stocklevel":  
	                new StockLevelMessageProcessor().Execute((StockLevelMessageModel) messageModel);  
	                break;  
	        }  
	    }  
	    catch (Exception ex)  
	    {  
	        // if something went wrong, log it and NACK the message.  
	  
	        _rabbitMqChannel.BasicNack(e.DeliveryTag, false, true);  
	    }  
	  
	    // If we get here then we successfully completed the message  
	    _rabbitMqChannel.BasicAck(e.DeliveryTag, false);  
	}  

In closing

I’ve used RabbitMQ in several integration projects to date and have found it to provide an amazing level of scalability and reliability, even when dealing with huge amounts of data.

I hope that this series has been as useful for you to read as it was enjoyable for me to write and has inspired you to try out RabbitMQ as an integration tool with Kentico and other projects .

 
Please let us know your thoughts in the comments by reaching out on LinkedIn, Facebook or Twitter, and stay tuned for more Kentico articles.

At Distinction, we are Kentico specialists and certified gold partners. If you are interested in working with us, get in touch today to see how we can help your business.

Author:Lee Conlin

More insights