Tuesday, May 19, 2009

MassTransit Consumers/Subscribers

[Intro]

A messaging system does make a lot of sense if no one or nothing is listening, consuming or subscribing to those sent messages. If you are interested in a particular event that a message represents then you subscribe to that event.

Continuing on with the idea of a new employee at a company, lets assume that head office have decided that all staff members must do a new online intranet based safety course and any new employees must do the safety course as part of the induction. We can create this online application, send out the notifications to all existing staff, but how do we ensure all new staff do the course? well we know that HR publish a New Employee Notification when an employee joins the company so we decide to subscribe to the message so our application notifies the new employee and his supervisor that this course must be completed as part of their induction.

Ok, so how do we do this in MassTransit?

Well one option is to create a consumer, a service that subscribes to the message and acts on it when it happens.

public class NewEmployeeService : Consumes<NewEmployeeNotificationMessage>.All
{
private IServiceBus _serviceBus;
private UnsubscribeAction _unsubscribeToken;
public void Consume(NewEmployeeNotificationMessage message)
{
//Notify user and supervisor of course requirement
}
public void Dispose()
{
_serviceBus.Dispose();
}
public void Start(IServiceBus bus)
{
_serviceBus = bus;
_unsubscribeToken = _serviceBus.Subscribe(this);
}
public void Stop()
{
_unsubscribeToken();
}
}


A couple of things to note here:



The NewEmployeeService implements the "Consumes<T>.All" interface. This means we are subscribing to any message published of type T, in this case NewEmployeeNotificationMessage. By doing so we must implement Consume(T message), this is the method that will be called when the message arrives.  Start and stop are methods we have defined that get call when the host starts up the hosting service (we will cover this is later posts). More importantly and something that may not be obvious is the unsubscribeToken. When subscribing to the bus the subscribe method returns an UnsubscribeAction delegate that can be called when the subscription is no longer required. Therefore calling this delegate on the stopping of the service would be a good idea :)



A service can subscribe to many  messages by specifying and implementing more of the consume interfaces, as it is not a base class you are not limited to a single inheritance. So you may want to define the class as :



public class NewEmployeeService : 
Consumes<NewEmployeeNotificationMessage>.All,
Consumes<EmployeeChangedLocationNotificationMessage>.All
{
//...etc


It is also worth while to note that the message can be responded to:



CurrentMessage.Respond(responseMessage);


This will send the message back to the response address specified by the client, see the Starbucks example: CashierSaga.ProcessNewOrder(..) and OrderDrinkForm. NB: The OrderDrinkForm also implements the consume interface for the response message, otherwise it will not know what to do with the message

No comments: