I have a saga setup in a fork/join configuration.
FileMetadataMsg
FileReadyMsg
SomeOtherMsg
SagaStart(correlationId)
FileSavedToMsg(correlationId, fileLoc)
FileMetadataMsg(correlationId, metadata)
FileReadyMsg(correlationId, fileLoc)
Consumer<FileSavedToMsg>
SomeOtherMsg(GotTheFileMsg.correlationId, data)
I am getting a FileSavedToMsg in the saga_skipped queue. I can only assume it's due to having a correlationId on the FileSavedToMsg because the saga itself is not using FileSavedToMsg in its state machine and does not have an Event<FileSavedToMsg>
.
If this is the reason why...should I be passing the correlationId along in a field other than the CorrelationId, so the saga doesn't see it? I need it somewhere so I can tag SomeOtherMsg with it.
Here is how the saga endpoint is defined
return Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(new Uri("rabbitmq://localhost/"), h =>
{
h.Username("guest");
h.Password("guest");
});
cfg.ReceiveEndpoint(host, "study_saga", epCfg =>
{
epCfg.StateMachineSaga(machine, repository);
});
});
Here is how the worker endpoint is defined
return Bus.Factory.CreateUsingRabbitMq(x =>
{
var host = x.Host(new Uri("rabbitmq://localhost/"), h =>
{
h.Username("guest");
h.Password("guest");
});
x.ReceiveEndpoint(host, "study_3d_volume_worker", c =>
{
c.PrefetchCount = 1;
c.Instance(_studyCreatedMsgConsumer);
});
});
These are running on the same machine, but in seperate Console/Topshelf applications.
If you are getting messages on a queue that are not consumed by a consumer on that receive endpoint, it might be that you either previously were consuming that message type and removed it from the consumer (or saga, in your case) or you were using the queue from some other purpose and it consumed that message type.
Either way, if you go into the RabbitMQ management console and look for the queue, you can expand the Bindings chevron, click to go to the exchange of the same name (that's a standard MassTransit convention), and then expand the bindings of the exchange to see which message types (the exchanges named like .NET type names) are bound to that exchange.
If you see one that is not consumed by the endpoint, that's the culprit. You can Unbind it using the UI, after which messages published will no longer be sent to the queue.