Answer a question

I use the KafkaTemplate of spring-kafka for producing some messages to kafka.

I have a simple rest API which could be used for creating messages. Inside my code i use the KafkaTemplate like this for producing my message to kafka.

kafkaTemplate.send("topic", "key", "data")

It's important for me that I only give back a success to the client when the message was really send.

But now i realised that the method is at least partially asynchron. The signature of the method is:

    public ListenableFuture<SendResult<K, V>> send(String topic, K key, @Nullable V data) {

So it returns a future and the future could eventually call the failure callback when it will be resolved.

I have looked a little bit under the hood in the java code of spring-kafka (version 2.2.6) and it seems like there are some error which will be thrown directly and some which will be only available by resolving the future, there is also a javadoc for it which looks like:

            // handling exceptions and record the errors;
            // for API exceptions return them in the future,
            // for other exceptions throw directly

My main question is: Do I have to resolve the future (which includes API exception) in a blocking way for finding out if something goes wrong on send; so that I can be 100% sure that my message was send correctly?

Or is the sending itself already guaranteed when the send method don't throw any exception and the error inside the future are only about issues on receiving back some meta information? (or something similar).

Answers

If you don't want to block on the get to detect a failure, the other option is to add a callback to the future to get the result asynchronously. (It is a ListenableFuture).

public interface ListenableFutureCallback<T> extends SuccessCallback<T>, FailureCallback {

}

@FunctionalInterface
public interface SuccessCallback<T> {

    /**
     * Called when the {@link ListenableFuture} completes with success.
     * <p>Note that Exceptions raised by this method are ignored.
     * @param result the result
     */
    void onSuccess(@Nullable T result);

}

@FunctionalInterface
public interface FailureCallback {

    /**
     * Called when the {@link ListenableFuture} completes with failure.
     * <p>Note that Exceptions raised by this method are ignored.
     * @param ex the failure
     */
    void onFailure(Throwable ex);

}

There are no guarantees of success if you just send (and pray).

EDIT

Client side exceptions (e.g. serialization) are thrown on the calling thread, but server side will be completed on the future asynchronously.

There are a number of server-side errors that might result in an async failure; see the Errors class...

/**
 * This class contains all the client-server errors--those errors that must be sent from the server to the client. These
 * are thus part of the protocol. The names can be changed but the error code cannot.
 *
 * Note that client library will convert an unknown error code to the non-retriable UnknownServerException if the client library
 * version is old and does not recognize the newly-added error code. Therefore when a new server-side error is added,
 * we may need extra logic to convert the new error code to another existing error code before sending the response back to
 * the client if the request version suggests that the client may not recognize the new error code.
 *
 * Do not add exceptions that occur only on the client or only on the server here.
 */
public enum Errors {
    UNKNOWN_SERVER_ERROR(-1, "The server experienced an unexpected error when processing the request.",
            UnknownServerException::new),
    NONE(0, null, message -> null),
    OFFSET_OUT_OF_RANGE(1, "The requested offset is not within the range of offsets maintained by the server.",
            OffsetOutOfRangeException::new),
    CORRUPT_MESSAGE(2, "This message has failed its CRC checksum, exceeds the valid size, has a null key for a compacted topic, or is otherwise corrupt.",
            CorruptRecordException::new),
    UNKNOWN_TOPIC_OR_PARTITION(3, "This server does not host this topic-partition.",
            UnknownTopicOrPartitionException::new),
    INVALID_FETCH_SIZE(4, "The requested fetch size is invalid.",
            InvalidFetchSizeException::new),
    LEADER_NOT_AVAILABLE(5, "There is no leader for this topic-partition as we are in the middle of a leadership election.",
            LeaderNotAvailableException::new),
...
Logo

华为、百度、京东云现已入驻,来创建你的专属开发者社区吧!

更多推荐