Are Erlang/OTP messages reliable? Can messages be duplicated?

joshng picture joshng · Jul 3, 2010 · Viewed 7.2k times · Source

Long version:

I'm new to erlang, and considering using it for a scalable architecture. I've found many proponents of the platform touting its reliability and fault tolerance.

However, I'm struggling to understand exactly how fault-tolerance is achieved in this system where messages are queued in transient memory. I understand that a supervisor hierarchy can be arranged to respawn deceased processes, but I've been unable to find much discussion of the implications of respawning on works-in-progress. What happens to in-flight messages and the artifacts of partially-completed work that were lost on a dying node?

Will all producers automatically retransmit messages that are not ack'd when consumer processes die? If not, how can this be considered fault-tolerant? And if so, what prevents a message that was processed -- but not quite acknowledged -- from being retransmitted, and hence reprocessed inappropriately?

(I recognize that these concerns are not unique to erlang; similar concerns will arise in any distributed processing system. But erlang enthusiasts seem to claim that the platform makes this all "easy"..?)

Assuming messages are retransmitted, I can easily envision a scenario where the downstream effects of a complex messaging chain could become very muddled after a fault. Without some sort of heavy distributed transaction system, I don't understand how consistency and correctness can be maintained without addressing duplication in every process. Must my application code always enforce constraints to prevent transactions from being executed more than once?

Short version:

Are distributed erlang processes subject to duplicated messages? If so, is duplicate-protection (ie, idempotency) an application responsibility, or does erlang/OTP somehow help us with this?

Answer

I GIVE TERRIBLE ADVICE picture I GIVE TERRIBLE ADVICE · Jul 5, 2010

I'll separate this into points I hope will make sense. I might be re-hashing a bit of what I have written in The Hitchhiker's Guide to Concurrency. You might want to read that one to get details on the rationale behind the way message passing is done in Erlang.


1. Message transmission

Message passing in Erlang is done through asynchronous messages sent into mailboxes (a kind of queue for storing data). There is absolutely no assumption as to whether a message was received or not, or even that it was sent to a valid process. This is because it is plausible to assume [at a language level] that someone might want to treat a message in maybe only 4 days and won't even acknowledge its existence until it has reached a certain state.

A random example of this could be to imagine a long-running process that crunches data for 4 hours. Should it really acknowledge it received a message if it's unable to treat it? Maybe it should, maybe not. It really depends on your application. As such, no assumption is made. You can have half your messages asynchronous and only one that isn't.

Erlang expects you to send an acknowledgement message (and wait on it with a timeout) if you ever need it. The rules having to do with timing out and the format of the reply are left to the programmer to specify -- Erlang can't assume you want the acknowledgement on message reception, when a task is completed, whether it matches or not (the message could match in 4 hours when a new version of the code is hot-loaded), etc.

To make it short, whether a message isn't read, fails to be received or is interrupted by someone pulling the plug while it is in transit doesn't matter if you don't want it to. If you want it to matter, you need to design a logic across processes.

The burden of implementing a high-level message protocol between Erlang processes is given to the programmer.


2. Message protocols

As you said, these messages are stored in transient memory: if a process dies, all the messages it hadn't read yet are lost. If you want more, there are various strategies. A few of them are:

  • Read the message as fast as possible and write it to disk if needed, send an acknowledgement back and process it later. Compare this to queue software such as RabbitMQ and ActiveMQ with persistent queues.
  • Use process groups to duplicate messages across a group of processes on multiple nodes. At this point you might enter transactional semantics. This one is used for the mnesia database for the transaction commits;
  • Don't assume anything has worked until you receive either an acknowledgement that everything went fine or a failure message
  • A combination of process groups and failure messages. If a first process fails to handle a task (because the node goes down), a notification is automatically sent by the VM to a fail-over process which handles it instead. This method is sometimes used with full applications to handle hardware failures.

Depending on the task at hand, you might use one or many of these. They're all possible to implement in Erlang and in many cases modules are already written to do the heavy lifting for you.

So this might answer your question. Because you implement the protocols yourself, it's your choice whether messages get sent more than once or not.


3. What is fault-tolerance

Picking one of the above strategies does depend on what fault-tolerance means to you. In some cases, people mean it to say "no data is ever lost, no task ever fails." Other people use fault-tolerance to say "the user never sees a crash." In the case of Erlang systems, the usual meaning is about keeping the system running: it's alright to maybe have a single user dropping a phone call rather than having everyone dropping it.

Here the idea is then to let stuff that fails fail, but keep the rest running. To achieve this, there are a few things the VM gives you:

  • You can know when a process dies and why it did
  • You can force processes that depend on each other to die together if one of them goes wrong
  • You can run a logger that automatically logs every uncaught exception for you, and even define your own
  • Nodes can be monitored so you know when they went down (or got disconnected)
  • You can restart failed processes (or groups of failed processes)
  • Have whole applications restarting on different nodes if one fails
  • And a lot more more stuff with the OTP framework

With these tools and a few of the standard library's modules handling different scenarios for you, you can implement pretty much what you want on top of Erlang's asynchronous semantics, although it usually pays to be able to use Erlang's definition of fault tolerance.


4. A few notes

My personal opinion here is that it's pretty hard to have more assumptions than what exists in Erlang unless you want pure transactional semantics. One problem you'll always have trouble with is with nodes going down. You can never know if they went down because the server actually crashed or because the network failed.

In the case of a server crash, simply re-doing the tasks is easy enough. However with a net split, you have to make sure some vital operations are not done twice, but not lost either.

It usually boils down to the CAP theorem which basically gives you 3 options, of which you have to pick two:

  1. Consistency
  2. Partition tolerance
  3. Availability

Depending on where you position yourself, different approaches will be needed. The CAP theorem is usually used to describe databases, but I believe similar questions are to be asked whenever you need some level of fault tolerance when processing data.