Search before reporting
Read release policy
User environment
- Broker version: 4.0.6
- Client library type: Java (Kafka Source Connector /
pulsar-io-kafka)
- Deployed on Kubernetes via Function Mesh
Issue Description
When the Kafka Source connector's consumer thread encounters a fatal exception (e.g. due to a transient Pulsar authentication outage), the error notification is silently lost and the connector pod remains alive but permanently idle.
What happens:
- The consumer thread in
KafkaAbstractSource.start() catches an exception at KafkaAbstractSource.java L192-L195:
} catch (Exception e) {
LOG.error("Error while processing records", e);
notifyError(e);
break; // consumer thread exits permanently
}
notifyError(e) (in AbstractPushSource) enqueues an ErrorNotifierRecord onto a bounded LinkedBlockingQueue(1000) via consume() -> queue.put().
- The intent is that the instance thread dequeues it via
readNext(), which throws the exception, causing the function framework to restart the instance.
The bug: If the instance thread is blocked in sendOutputMessage() (e.g. when Pulsar client cannot contact an external authz service because of a Cloudflare outage 🤦), it never returns to call readNext(). The ErrorNotifierRecord sits in the queue (or queue.put() blocks if the queue is full), and the error is never propagated. The consumer thread is dead, the instance thread is stuck, but the pod passes liveness/readiness checks - it's alive but doing no work.
What was expected: The connector should crash (or the instance should restart) so that Kubernetes can restart the pod and recovery can happen automatically once the transient outage resolves.
Why this is a bug: The notifyError() mechanism added in PR #20791 / PIP-281 relies on the instance thread being in a state where it will call readNext(). When the instance thread is blocked on I/O (sending to Pulsar), this assumption is violated and the error signal is silently dropped. This is a liveness bug - the connector appears healthy but is permanently stalled.
Error messages
No error messages beyond the initial LOG.error("Error while processing records", e) on the consumer thread. The instance thread produces no output because it is blocked.
Reproducing the issue
- Deploy a Kafka Source connector (e.g.
KafkaBytesSource) on Kubernetes via Function Mesh
- Ensure the connector is actively consuming from Kafka and producing to Pulsar
- Cause a transient Pulsar authentication failure (e.g. make the SSO/OAuth token endpoint return 500s)
- The consumer thread will eventually hit an exception when
CompletableFuture.allOf(futures).get() times out (because messages can't be sent to Pulsar), call notifyError(), and break
- The instance thread is blocked in
sendOutputMessage() trying to write a previously-consumed record to the unauthenticated Pulsar client
- Observe that the pod remains running, passes health checks, but consumes no further messages from Kafka
- Even after the auth outage recovers, consumption does not resume - the consumer thread is dead
Additional information
Related issues and PRs:
The fixes in 3.1.0–3.3.0 addressed the case where the consumer thread itself gets stuck, but did not address the case where the consumer thread correctly signals an error via notifyError() but the instance thread is blocked and never reads it.
Possible fix directions:
- Have
notifyError() call System.exit(1) to force a pod restart (this is what we've done as a downstream workaround)
- Have
KafkaAbstractSource catch the exception and call close() on a separate thread with a timeout, then System.exit(1) if close doesn't complete
- Have
notifyError() throw a RuntimeException instead of (or in addition to) enqueuing - however, since notifyError() runs on the consumer thread, an uncaught exception would only kill that thread (which is already dying via break); it would not affect the stuck instance thread unless combined with a Thread.UncaughtExceptionHandler that calls System.exit(1)
- Add a watchdog/heartbeat mechanism to detect that the consumer thread has died
Are you willing to submit a PR?
Search before reporting
Read release policy
User environment
pulsar-io-kafka)Issue Description
When the Kafka Source connector's consumer thread encounters a fatal exception (e.g. due to a transient Pulsar authentication outage), the error notification is silently lost and the connector pod remains alive but permanently idle.
What happens:
KafkaAbstractSource.start()catches an exception at KafkaAbstractSource.java L192-L195:notifyError(e)(inAbstractPushSource) enqueues anErrorNotifierRecordonto a boundedLinkedBlockingQueue(1000)viaconsume()->queue.put().readNext(), which throws the exception, causing the function framework to restart the instance.The bug: If the instance thread is blocked in
sendOutputMessage()(e.g. when Pulsar client cannot contact an external authz service because of a Cloudflare outage 🤦), it never returns to callreadNext(). TheErrorNotifierRecordsits in the queue (orqueue.put()blocks if the queue is full), and the error is never propagated. The consumer thread is dead, the instance thread is stuck, but the pod passes liveness/readiness checks - it's alive but doing no work.What was expected: The connector should crash (or the instance should restart) so that Kubernetes can restart the pod and recovery can happen automatically once the transient outage resolves.
Why this is a bug: The
notifyError()mechanism added in PR #20791 / PIP-281 relies on the instance thread being in a state where it will callreadNext(). When the instance thread is blocked on I/O (sending to Pulsar), this assumption is violated and the error signal is silently dropped. This is a liveness bug - the connector appears healthy but is permanently stalled.Error messages
No error messages beyond the initial
LOG.error("Error while processing records", e)on the consumer thread. The instance thread produces no output because it is blocked.Reproducing the issue
KafkaBytesSource) on Kubernetes via Function MeshCompletableFuture.allOf(futures).get()times out (because messages can't be sent to Pulsar), callnotifyError(), and breaksendOutputMessage()trying to write a previously-consumed record to the unauthenticated Pulsar clientAdditional information
Related issues and PRs:
[fix][io] Close the kafka source connector if there is uncaught exception(merged, 3.1.0)[fix][io] Close the kafka source connector got stuck(merged, 3.1.0)[fix][io] Not restart instance when kafka source poll exception(merged, 3.1.0) - this PR introduced thenotifyError()+breakpatternnotifyError()toPushSource/AbstractPushSource[fix][io] Kafka Source connector maybe stuck(merged, 3.3.0) - added timeout on futures, but doesn't address this scenarioThe fixes in 3.1.0–3.3.0 addressed the case where the consumer thread itself gets stuck, but did not address the case where the consumer thread correctly signals an error via
notifyError()but the instance thread is blocked and never reads it.Possible fix directions:
notifyError()callSystem.exit(1)to force a pod restart (this is what we've done as a downstream workaround)KafkaAbstractSourcecatch the exception and callclose()on a separate thread with a timeout, thenSystem.exit(1)if close doesn't completenotifyError()throw aRuntimeExceptioninstead of (or in addition to) enqueuing - however, sincenotifyError()runs on the consumer thread, an uncaught exception would only kill that thread (which is already dying viabreak); it would not affect the stuck instance thread unless combined with aThread.UncaughtExceptionHandlerthat callsSystem.exit(1)Are you willing to submit a PR?