Mismatch between message offsets and offsets stored in Kafka by Queue Processor
Hi,
We faced an issue recently where all queue processing items were going to broken queue.
Sample error getting printed in logs were
" [QueueProcessor:pyFTSIncrementalIndexer] [MessageID : c22c4719-3fbe-4e67-9d88-53a6d6243eb5] [Message Offset : 2365253] [Current offset in memory: 2369067] [Dataflow offset in memory: 2363495] [Partition Id : 11] - message skipped"
This impacted elastic search for new cases. Similar behavior for other queue processor.
There can be a few scenarios that can cause the offsets to go out-of-sync:
-
Stream node went done
-
Cluster communications or heartbeat issue between any DF/BP nodes
-
Manual deletion the entire kafka folder before restart
-
Direct operation on Database i.e. restore.
To resolve this issue, following steps were performed in sequence:
1. Stop the queue processor
2. In Dev Studio go to instances of class Data-QueueProcessor-Run-Partition
3. Filter by queue processor name
4. Delete instance for each partition of selected QP
5. Requeue\Remove all broken items as per need before restarting the queue processor
6. Start the queue processor
PS: The queue processor will be auto started if it takes more time to finish above steps.
We can also use the activity named “pxStopById” present in class Data-Decision-DDF-RunOptions and Pass the Queue Processor Run Id (which is name of the Queue Processor rule) as parameter to stop effected QP
To restart, execute activity named “pxStartById” present in class Data-Decision-DDF-RunOptions and Pass the Queue Processor Run Id (which is name of the Queue Processor rule) as parameter to start effected QP