SYMPTOM:
Indexing tasks from Kafka or Kinesis are finishing without any data ingested. From task logs we can find similar logs like following:
2018-07-02T07:46:02,783 ERROR [task-runner-0-priority-0] io.druid.indexing.overlord.ThreadPoolTaskRunner - Exception while running task[KinesisIndexTask{id=index_kinesis_xxxx, type=index_kinesis, dataSource=xxxxxx}]
io.druid.java.util.common.ISE: Starting sequenceNumber [49585811274695675412070234775862022455403125352932835346] is no longer available for partition [shardId-000000000001] (earliest: [49585811274695675412070251479546550741235996701713498130]) and resetOffsetAutomatically is not enabled
Or
2018-09-03T12:55:14,522 WARN [task-runner-0-priority-0] io.druid.indexing.kafka.KafkaIndexTask - OffsetOutOfRangeException with message [Offsets out of range with no configured reset policy for partitions: {Druid-XXXX-X=7263946670}]
And at the end, we can see:
2018-09-03T12:55:15,207 INFO [publish-0] io.druid.segment.realtime.appenderator.BaseAppenderatorDriver - Nothing to publish, skipping publish step.
......
2018-09-03T12:55:15,232 INFO [task-runner-0-priority-0] io.druid.indexing.overlord.TaskRunnerUtils - Task [index_kafka_Test-K_40675c261af88df_pcigmfli] status changed to [SUCCESS]. ..... ...... 2018-09-03T12:55:15,233 INFO [task-runner-0-priority-0] io.druid.indexing.worker.executor.ExecutorLifecycle - Task completed with status: { "id" : "index_kafka_Test-K_40675c261af88df_pcigmfli", "status" : "SUCCESS", "duration" : 3601087, "errorMsg" : null }
ROOT CAUSE:
The ingestion is trying to read messages that are not available in Kafka, it patiently waits until the `taskDuration` is over, or the supervisor is reset. The most common reason is that the ingestion is not able to ingest all messages from Kafka before they were deleted due to Kafka retention policies. It could be because the ingestion is slow and needs to be scaled out / tuned, or the ingestion could have just been turned off for long enough that Kafka deleted the messages it was pointed at.
WORKAROUND:
Druid admin shall reset the Supervisor when this happens, which basically skips ingestion forward to the latest Kafka messages. This reset can be done by using the "reset supervisor" command in the UI or by using the reset API described on http://druid.io/docs/latest/development/extensions-core/kafka-ingestion.html.
When reset, Supervisor will kill all currently on-going ingestion tasks, and launch new tasks after reset. Druid also will make sure to load exactly whatever is in Kinesis/Kafka only once.
The reason it doesn't reset automatically by default is due to possible scenarios where messages are only temporarily not available (like some temporary issue on the Kafka side that can be fixed quickly). In another hand, admin can enable the `resetOffsetAutomatically` config if this resetting needs to be automatic.
Other options include using `useEarliestSequenceNumber` in Kinesis ingestion, while `useEarliestOffset` in Kafka property ingestion. If one is configured for another ingestion method, that property will be ignored during ingestion. These two properties serve same purpose in each ingestion method of their own.
In production, we recommend to monitor Kafka ingestion lag in order to prevent ingest from falling behind like this. There are two main ways to do it. One is manually, through Clarity's Ingestion tab and the "Kafka Lag" metric (see attached screenshot). Another is by API, using the supervisor status API and looking at the "aggregateLag" value. In both cases, the lag is measured in terms of number of messages. So if the normal expectation is to read 100 messages per second, then a lag value of 100,000 means there are about 16 minutes behind.
Finally: the fact that this condition, when occurring for an entire task duration, is reported as SUCCESS instead of FAILED is confusing and we will be adjusting that. In general our preference is that user only see SUCCESS when the connection to Kafka is healthy.
Comments
0 comments
Please sign in to leave a comment.