Skip to content

Commit

Permalink
Retry request on DeadLetterResponse (#1822)
Browse files Browse the repository at this point in the history
Instead of failing virtual actor request when DeadLetterResponse is received, retry the request.

Also make sure that DeadLetterResponse is not returned to client when request is called with object as type parameter.

Fixes #1799
  • Loading branch information
michalnarwojsz authored Oct 17, 2022
1 parent 1bf095b commit 6c27bc8
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 14 deletions.
28 changes: 14 additions & 14 deletions src/Proto.Cluster/DefaultClusterContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,20 @@ public DefaultClusterContext(Cluster cluster)
if (task.IsCompleted)
{
var untypedResult = MessageEnvelope.UnwrapMessage(task.Result);

if (untypedResult is DeadLetterResponse)
{
if (!context.System.Shutdown.IsCancellationRequested && Logger.IsEnabled(LogLevel.Debug))
{
Logger.LogDebug("TryRequestAsync failed, dead PID from {Source}", source);
}

RefreshFuture();
await RemoveFromSource(clusterIdentity, PidSource.Lookup, pid);

continue;
}

if (untypedResult is T t1)
{
return t1;
Expand All @@ -130,25 +143,12 @@ public DefaultClusterContext(Cluster cluster)
{
return TimeoutOrThrow();
}

if (typeof(T) == typeof(MessageEnvelope))
{
return (T)(object)MessageEnvelope.Wrap(task.Result);
}

if (untypedResult is DeadLetterResponse)
{
if (!context.System.Shutdown.IsCancellationRequested && Logger.IsEnabled(LogLevel.Debug))
{
Logger.LogDebug("TryRequestAsync failed, dead PID from {Source}", source);
}

RefreshFuture();
await RemoveFromSource(clusterIdentity, PidSource.Lookup, pid);

break;
}

Logger.LogError("Unexpected message. Was type {Type} but expected {ExpectedType}",
untypedResult.GetType(), typeof(T));

Expand Down
56 changes: 56 additions & 0 deletions tests/Proto.Cluster.Tests/RetryOnDeadLetterTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
using System;
using System.Linq;
using System.Threading.Tasks;
using ClusterTest.Messages;
using FluentAssertions;
using Xunit;

namespace Proto.Cluster.Tests;

public class RetryOnDeadLetterTests
{
[Fact]
public async Task ShouldRetryRequestOnDeadLetterResponseRegardlessOfResponseType()
{
await using var fixture = new Fixture(1);
await fixture.InitializeAsync();

var member = fixture.Members.First();
var identity = CreateIdentity("dead-letter-test");

// make sure the actor is created and the PID is cached
await member.RequestAsync<Pong>(identity, EchoActor.Kind, new Ping(), CancellationTokens.FromSeconds(1));

// pretend we have an invalid PID in the cache
var otherMember = await fixture.SpawnNode();
if (member.PidCache.TryGet(ClusterIdentity.Create(identity, EchoActor.Kind), out var pid))
{
var newPid = PID.FromAddress(otherMember.System.Address, pid.Id);
if (!member.PidCache.TryUpdate(ClusterIdentity.Create(identity, EchoActor.Kind), newPid, pid))
{
Assert.Fail("Failed to replace actor's pid with a fake one in the pid cache");
}
}
else
{
Assert.Fail("Did not find expected actor identity in the pid cache");
}

// check if the correct response type is returned
var response = await member.RequestAsync<object>(identity, EchoActor.Kind, new Ping(), CancellationTokens.FromSeconds(1));
response.Should().BeOfType<Pong>();

}

private string CreateIdentity(string baseId) => $"{Guid.NewGuid().ToString("N").Substring(0, 6)}-{baseId}-";

private class Fixture : BaseInMemoryClusterFixture
{
public Fixture(int clusterSize)
: base(clusterSize, cc => cc
.WithActorRequestTimeout(TimeSpan.FromSeconds(1))
)
{
}
}
}

0 comments on commit 6c27bc8

Please sign in to comment.