Masstransit latest version, even simpler than ever

In a recent post about Masstransit I’ve explained how to setup communication using a Subscription Service, and this configuration needs a dispatcher that can be build directly from MassTransit source code. If you do not want to have a central dispatcher, because you need to install a windows service (or run program in console) and a sql server used by the dispatcher you can also avoid this using Multicast.

The key is the call to sbc.UseMulticastSubscriptionClient() that basically permits you to completely avoid the complexity of having/maintaining a dispatcher with MSMQ. This works using PGM on top of MSMQ, so if you want to create a simple program that listen for a specific message on the bus you can use a minimal configuration, here is all the code you need

sbc.UseMsmq();
sbc.UseMulticastSubscriptionClient();
sbc.ReceiveFrom("msmq://localhost/test_basicCommunication");
sbc.Subscribe(subs =>
{
    subs.Handler<Message>(msg => Console.WriteLine(msg.MessageText));
});

If you only want to publish messages on the bus the code is simpler, because you can completely avoid the Subscribe part, suppose you want to create a simple program that sends message, here is the whole code you need.

Bus.Initialize(sbc =>
{
    sbc.UseMsmq();
    sbc.UseMulticastSubscriptionClient();
    sbc.ReceiveFrom("msmq://localhost/test_basicCommunication2");
});

........

Bus.Instance.Publish(new Message() { MessageText = message });

Now you can fire both the program and when you call Publish on instance of the bus you are actually dispatching the message to everyone that registered to that specific message. This is really freaking simple to do and really shows the powerful API of Masstransit library.

Alk.

Masstransit and dispatching of messages

One of the coolest aspect of Masstransit is simplicity of use (even if it still lacks a really comprehensive documentation), dispatching of message is one of this aspect. The key of Masstransit is that dispatching is done on type of the message instead that address, we can simply ignore everything in the middle, we have just components that declares to Masstransit that they are able to handle a certain type of messages, other components will simply publish messages in the bus and dispatching is done thanks to the CLR type of message sent.

As an example I’ve created a little supersimple demo with three console program (ProgramA B and C) each one is completely identical to the other, except that he registers for a different type of message (respectively, MessageA N or C) and each one listen on a different queue.

sbc.Subscribe(subs =>
{
    subs.Handler<MessageA>(msg => Console.WriteLine(msg.ToString()));
});

The above snippet is taken from ProgramA that register itself for Message of type MessageA. Each program has a main loop that accepts input string from the user, and each string should begin with the char a b or c to decide what type of message we want to send. You do not need anything else, everything is ready to run (remember that in MSMQ on windows you need to run the MassTransitRuntimeService that takes care of dispatching. If you fire all three programs everything works.

image

Figure 1: MassTransit in action

Each program can send message to each other but the key point is that you are not interested Where to send a message but What message to send. This technique is basically a publish/subscribe structure that you obtain with few lines of code. Just as an example you can now fire all three message console and send a couple of message to ProgramB from the other programs to verify that everything works.

Now takes the executable of ProgramB present in the bin\debug folder and move in another folder, then edit the Programb.exe.config file, and change the MSMQ address used by the program.

  <appSettings>
    <add key="localqueue" value="msmq://localhost/Programb2"/>
  </appSettings>

This operation is needed because you cannot start two programs that listen in the same MSMQ; if you do this, message will not be dispatched anymore because multiple listener on the same MSMQ is not permitted. In the above snippet I simply configure a copied version of ProgramB to use a Programb2 queue, then I start ProgramB from the other folder and when I send MessageB messages from other programs they will get dispatched to both instance of ProgramB, as visible in Figure2.

image

Figure 2: The first console is ProgramC that send two MessageB messages, the second one is sent when the other instance of ProgramB is run, and it gets dispatched to both the instances.

This structure is especially useful in CQRS and DDD architectures, where a BOUNDED CONTEXT usually publish some of its DOMAIN EVENTS to the outside world, but it is not interested in where to send the message or if there is some other BOUNDED CONTEXT interested in message, so he can simply call Bus.Publish(DomainMessage) and let MassTransit care to understand if there is a listener out of there to route the message to.

SourceCode: Download from SkyDrive

RelatedArticles:

Alk

Quick start on Mass Transit and MSMQ on windows

MassTransit is a bus implementation for .NET that promise frictionless configuration and simple usage. The main problem about MassTransit in my opinion is the lack of organic documentation, especially a quick start guide that shows you really the basic concepts behind MassTransit. If you jump into online documentation there is a nice page called “Show Me the code” that promise a quick start to jump into MassTransit concepts. The problem with that example is that it shows a single snippet of code that opens a bus based on Windows MSMQ and send a message to itself.

In my opinion this is a too simplistic “Hello MassTransit” example, because it does not show a typical sender/receiver configuration. If you try to write another simple console application to send message to the first one you build following the “Show me the code” instruction you will need this code.

Bus.Initialize(sbc =>
{
    sbc.UseMsmq();
    sbc.VerifyMsmqConfiguration();
    sbc.UseMulticastSubscriptionClient();
    sbc.ReceiveFrom("msmq://localhost/test_queue_client");
    sbc.ConfigureService<RoutingConfigurator>(
        BusServiceLayer.Session,
        rc => rc.Route<YourMessage>().To("msmq://localhost/simple_first_server")
        );
});
Bus.Instance.Probe();
Bus.Instance.WriteIntrospectionToConsole();
String read;
while (!String.IsNullOrEmpty(read = Console.ReadLine())) 
{
    Bus.Instance.Publish(new YourMessage { Text = read });
}

The code is really similar to the code of the other application, it just change the ReceiveFrom address and add the sbs.ConfigureService to rout the YourMessage to the queue used by the server. A things worth notice in this example: the ConfigureService method is obsolete and deprecated and the reason is that this is not the typical scenario MassTransit want to solve. The problem with this code is that is modeling a classic SOA model, where a client sends messages to a well known service and such a situation is so simple that you probably do not need a bus to handle it. The real feature that MassTransit offers you is the ability to just send and manage Messages from your applications, leaving all the gory details to the bus infrastructure.

To create a more interesting example add this line to the configuration of the first application, that one that subscribed to the YourMessage message.

sbc.UseSubscriptionService("msmq://localhost/mt_subscriptions");
sbc.UseSubscriptionService("msmq://localhost/mt_subscriptions");

This lines tells MassTransit to handle subscription of messages through the queue called mt_subscriptions. Now modify the other application console substituting the call to sbc.ConfigureService with the same above line. This will remove the need to specify the address of the receiver. Now the application can simply publish messages to the bus with the Bus.Instance.Publish call and let MassTransit take care of everything, especially routing messages to the application that subscribed to that specific message.

Now the main question is: what is the mechanism that route the message from the sender application to the listening application? The answer is: MassTransit and this is one of the cool reason to use a bus instead of directly using MSMQ or using WCF. When the first application subscribe to the YourMessage message, this subscription is sent to the subscription service in the queue mt_subscriptions and when the other application publish the message, the subscription service is used to understand where the message should be routed. The routing service will check the incoming message that is a message of type YourMessage, then verify if there is anyone that is registered to listen to that message, if a match is found it routes the message to the right application. The problem is that if you run the above two programs you will get a nasty exception.

Failed to create the bus service: SubscriptionRouterService

Timeout waiting for subscription service to respond

The reason is that you need to run a third program, called MassTransit.RuntimeServices.exe that actually manages all the routing stuff. If you got MassTransit reference through NuGet you surely miss the runtimeservice components. The simpliest thing is downloading MassTransit source from GitHub, follow the instruction in the readme to build with build.bat and finally you will end with all you need in the folder MassTransit\build_output\Services\RuntimeServices

The MassTransit.RuntimeServices.exe application needs a database to store data and since it uses nhibernate you can use almost any database you want, just edit the MassTransit.RuntimeService.exe.config file and configure nhibernate to access your preferred database. In my example I used SqlExpress, just create a database called MassTransitRuntimeServices and create the schema running the script file located in \src\MassTransit.RuntimeServices\SetupSQLServer.sql. Once the database and the configuration file MassTransit.RuntimeService.Exe.config are modified correctly, you can run the service that basically is a simple console application that do the routing. Remember that you need to run it as administrator because it need some administrative permission to setup stuff (like creating queue).

When the subscription service is running well you can run both your applications and everything should work.

You can found code here.

Gian Maria.