NATS Logo by Example

Confirmed Message Ack in JetStream

A confirmed message ack means that the client waits for an ack from the server to ensure that the ack was received and processed. The functionality can be found in various clients under the following:

NameClients
ack ackJavascript
double ackRust, C# .NET V2
ack syncGo, Python, Java, C

CLI Go Python JavaScript Rust C# .NET V2 Java Ruby Elixir Crystal C
Jump to the output or the recording
$ nbe run jetstream/ack-ack/dotnet2
View the source code or learn how to run this example yourself

Code

Install NuGet packages NATS.Net and Microsoft.Extensions.Logging.Console.

using NATS.Client.Core;
using NATS.Client.JetStream;
using NATS.Client.JetStream.Models;

NATS_URL environment variable can be used to pass the locations of the NATS servers.

var url = Environment.GetEnvironmentVariable("NATS_URL") ?? "127.0.0.1:4222";

Connect to NATS server. Since connection is disposable at the end of our scope we should flush our buffers and close connection cleanly.

var opts = new NatsOpts
{
    Url = url,
    Name = "NATS-by-Example",
};
await using var nats = new NatsConnection(opts);

Get a JetStream Context

var js = new NatsJSContext(nats);

Stream Setup

var stream = "verifyAckStream";
var subject = "verifyAckSubject";
var consumerName1 = "consumer1";
var consumerName2 = "consumer2";

Remove the stream first so we have a clean starting point.

try
{
    await js.DeleteStreamAsync(stream);
}
catch (NatsJSApiException e) when (e is { Error.Code: 404 })
{
}

Create the stream

var streamConfig = new StreamConfig(stream, [subject])
{
    Storage = StreamConfigStorage.Memory,
};
await js.CreateStreamAsync(streamConfig);

Publish a couple messages, so we can look at the state

await js.PublishAsync(subject, "A");
await js.PublishAsync(subject, "B");

Consume a message with 2 different consumers The first consumer will Ack without confirmation The second consumer will AckSync which confirms that ack was handled.

Consumer 1, regular ack

Console.WriteLine("Consumer 1");
var consumer1 = await js.CreateOrUpdateConsumerAsync(stream, new ConsumerConfig(consumerName1));
Console.WriteLine("  Start");
Console.WriteLine($"    pending messages: {consumer1.Info.NumPending}");
Console.WriteLine($"    messages with ack pending: {consumer1.Info.NumAckPending}");


var next = await consumer1.NextAsync<string>();

refresh the consumer to update it’s state

await consumer1.RefreshAsync();
Console.WriteLine("  After received but before ack");
Console.WriteLine($"    pending messages: {consumer1.Info.NumPending}");
Console.WriteLine($"    messages with ack pending: {consumer1.Info.NumAckPending}");


if (next is { } msg1)
{
    await msg1.AckAsync();
}


await consumer1.RefreshAsync();
Console.WriteLine("  After ack");
Console.WriteLine($"    pending messages: {consumer1.Info.NumPending}");
Console.WriteLine($"    messages with ack pending: {consumer1.Info.NumAckPending}");

Consumer 2 Double Ack

var consumer2 = await js.CreateOrUpdateConsumerAsync(stream, new ConsumerConfig(consumerName2));
Console.WriteLine("Consumer 2");
Console.WriteLine("  Start");
Console.WriteLine($"    pending messages: {consumer1.Info.NumPending}");
Console.WriteLine($"    messages with ack pending: {consumer1.Info.NumAckPending}");


next = await consumer2.NextAsync<string>();


await consumer2.RefreshAsync();
Console.WriteLine("  After received but before ack");
Console.WriteLine($"    pending messages: {consumer2.Info.NumPending}");
Console.WriteLine($"    messages with ack pending: {consumer2.Info.NumAckPending}");


if (next is { } msg2)
{
    await msg2.AckAsync(new AckOpts { DoubleAck = true });
}


await consumer2.RefreshAsync();
Console.WriteLine("  After ack");
Console.WriteLine($"    pending messages: {consumer2.Info.NumPending}");
Console.WriteLine($"    messages with ack pending: {consumer2.Info.NumAckPending}");

Output

info: NATS.Client.Core.NatsConnection[1001]
      Try to connect NATS nats://nats:4222
info: NATS.Client.Core.Internal.NatsReadProtocolProcessor[1005]
      Received server info: ServerInfo { Id = NA4I74EZCLUSA4FD7XDAPHXW4P2CLLUHRM4LO5WXG4LN3QE46LK2EYWE, Name = NA4I74EZCLUSA4FD7XDAPHXW4P2CLLUHRM4LO5WXG4LN3QE46LK2EYWE, Version = 2.10.4, ProtocolVersion = 1, GitCommit = abc47f7, GoVersion = go1.21.3, Host = 0.0.0.0, Port = 4222, HeadersSupported = True, AuthRequired = False, TlsRequired = False, TlsVerify = False, TlsAvailable = False, MaxPayload = 1048576, JetStreamAvailable = True, ClientId = 5, ClientIp = 172.18.0.3, Nonce = , Cluster = , ClusterDynamic = False, ClientConnectUrls = , WebSocketConnectUrls = , LameDuckMode = False }
info: NATS.Client.Core.NatsConnection[1001]
      Connect succeed NATS-by-Example, NATS nats://nats:4222
info: NATS-by-Example[0]
      Consumer 1, Start # pending messages: 2, messages with ack pending: 0
info: NATS-by-Example[0]
      Consumer 1, After received but before ack # pending messages: 1, messages with ack pending: 1
info: NATS-by-Example[0]
      Consumer 1, After ack # pending messages: 1, messages with ack pending: 0
info: NATS-by-Example[0]
      Consumer 2, Start # pending messages: 2, messages with ack pending: 0
info: NATS-by-Example[0]
      Consumer 2, After received but before ack # pending messages: 1, messages with ack pending: 1
info: NATS-by-Example[0]
      Consumer 2, After ack # pending messages: 1, messages with ack pending: 0
info: NATS-by-Example[0]
      Bye!
info: NATS.Client.Core.NatsConnection[1001]
      Disposing connection NATS-by-Example

Recording

Note, playback is half speed to make it a bit easier to follow.