I'm receiving events from an EventHub using EventProcessorHost and an IEventProcessor class (call it: MyEventProcessor). I scale this out to two servers by running my EPH on both servers, and having them connect to the Hub using the same ConsumerGroup, but unique hostName's (using the machine name).
The problem is: at random hours of the day/night, the app logs this:
Exception information:
Exception type: ReceiverDisconnectedException
Exception message: New receiver with higher epoch of '186' is created hence current receiver with epoch '186' is getting disconnected. If you are recreating the receiver, make sure a higher epoch is used.
at Microsoft.ServiceBus.Common.ExceptionDispatcher.Throw(Exception exception)
at Microsoft.ServiceBus.Common.Parallel.TaskHelpers.EndAsyncResult(IAsyncResult asyncResult)
at Microsoft.ServiceBus.Messaging.IteratorAsyncResult`1.StepCallback(IAsyncResult result)
This Exception occurs at the same time as a LeaseLostException, thrown from MyEventProcessor's CloseAsync method when it tries to checkpoint. (Presumably Close is being called because of the ReceiverDisconnectedException?)
I think this is occurring due to Event Hubs' automatic lease management when scaling out to multiple machines. But I'm wondering if I need to do something different to make it work more cleanly and avoid these Exceptions? Eg: something with epochs?
TLDR: This behavior is absolutely normal.
Why can't Lease Management be smooth & exception-free: To give more control on the situation to developer.
The really long story - all-the-way from Basics
EventProcessorhost
(hereby EPH
- is very similar to what __consumer_offset topic
does for Kafka Consumers
- partition ownership & checkpoint store) is written by Microsoft Azure EventHubs
team themselves - to translate all of the EventHubs partition receiver Gu
into a simple onReceive(Events)
callback.
EPH
is used to address 2 general, major, well-known problems while reading out of a high-throughput partitioned streams like EventHubs
:
fault tolerant receive pipe-line - for ex: a simpler version of the problem - if the host running a PartitionReceiver
dies and comes back - it needs to resume processing from where it left. To remember the last successfully processed EventData
, EPH
uses the blob
supplied to EPH
constructor to store the checkpoints - when ever user invokes context.CheckpointAsync()
. Eventually, when the host process dies (for ex: abruptly reboots or hits a hardware fault and never/comesback) - any EPH
instance can pick up this task and resume from that Checkpoint
.
Balance/distribute partitions across EPH
instances - lets say, if there are 10 partitions and 2 EPH
instances processing events from these 10 partitions - we need a way to divide partitions across the instances (PartitionManager
component of EPH
library does this). We use Azure Storage - Blob LeaseManagement-feature
to implement this. As of version 2.2.10
- to simplify the problem, EPH
assumes that all partitions are loaded equally.
With this, lets try to see what's going on:
So, to start with, in the above example of 10
event hub partitions and 2
EPH
instances processing events out of them:
EPH
instance - EPH1
started, at-first, alone and a part of start-up, it created receivers to all 10 partitions and is processing events. In the start up - EPH1
will announce that it owns all these 10
partitions by acquiring Leases on 10
storage blobs representing these 10
event hub partitions (with a standard nomenclature
- which EPH
internally creates in the Storage account - from the StorageConnectionString
passed to the ctor
). Leases will be acquired for a set time, after which the EPH
instance will loose the ownership on this Partition.EPH1
continually announces
once in a while - that it is still owning those partitions - by renewing
leases on the blob. Frequency of renewal
, along with other useful tuning, can be performed using PartitionManagerOptions
EPH2
starts up - and you supplied the same AzureStorageAccount
as EPH1
to the ctor
of EPH2
as well. Right now, it has 0
partitions to process. So, to achieve balance of partitions across EPH
instances, it will go ahead and download
the list of all leaseblobs
which has the mapping of owner
to partitionId
. From this, it will STEAL
leases for its fair share of partitions
- which is 5
in our example, and will announce that information on that lease blob
. As part of this, EPH2
reads the latest checkpoint written by PartitionX
it wants to steal the lease for and goes ahead and creates corresponding PartitionReceiver
's with the EPOCH
same as the one in the Checkpoint
. EPH1
will loose ownership of these 5 partitions
and will run into different errors based on the exact state it is in.
EPH1
is actually invoking the PartitionReceiver.Receive()
call - while EPH2
is creating the PartitionReceiver
on the same receiver - EPH1
will experience ReceiverDisconnectedException. This will eventually, invoke IEventProcessor.Close(CloseReason=LeaseLost)
. Note that, probability of hitting this specific Exception is higher, if the messages being received are larger or the PrefetchCount
is smaller - as in both cases the receiver would be performing more aggressive I/O.EPH1
is in the state of checkpointing
the lease
or renewing
the lease
, while the EPH2
stole
the lease, the EventProcessorOptions.ExceptionReceived
eventHandler would be signaled with a leaselostException
(with 409
conflict error on the leaseblob
) - which also eventually invokes IEventProcess.Close(LeaseLost)
.Why can't Lease Management be smooth & exception-free:
To keep the consumer simple and error-free, lease management related exceptions could have been swallowed by EPH
and not notified to the user-code at all. However, we realized, throwing LeaseLostException
could empower customers to find interesting bugs in IEventProcessor.ProcessEvents()
callback - for which the symptom would be - frequent partition-moves
EPH1
fails to renew
leases and comes back up! - and imagine if the n/w of this machine stands flaky for a day - EPH
instances are going to play ping-pong
with Partitions
! This machine will continuously try to steal the lease from other machine - which is legitimate from EPH
point-of-view - but, is a total disaster for the user of EPH
- as it completely interferes with the processing pipe. EPH
- would exactly see a ReceiverDisconnectedException
, when the n/w comes back up on this flaky m/c! We think the best and infact the only way is to enable the developer to smell this!ProcessEvents
logic - which throws unhandled exceptions which are fatal and brings down the whole process - ex: a poison event. This partition is going to move around a lot.EPH
is also using - by mistake (like an automated clean-up script) etc.outage
on Azure d.c where a specific EventHub.Partition
is located - say n/w incident. Partitions are going to move around across EPH
instances.Basically, in majority of situations, it would be tricky - for us to detect the diff. between these situations and a legitimate leaseLost due to balancing and we want to delegate control of these situations to the Developer.