Skip to content

Commit

Permalink
inprocess: Support tracing message sizes (#11406)
Browse files Browse the repository at this point in the history
  • Loading branch information
shivaspeaks authored Oct 11, 2024
1 parent a01a9e2 commit ca43d78
Show file tree
Hide file tree
Showing 10 changed files with 207 additions and 103 deletions.
9 changes: 8 additions & 1 deletion census/src/test/java/io/grpc/census/CensusModulesTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import io.grpc.ClientInterceptors;
import io.grpc.ClientStreamTracer;
import io.grpc.Context;
import io.grpc.KnownLength;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.ServerCall;
Expand Down Expand Up @@ -99,6 +100,7 @@
import io.opencensus.trace.Tracer;
import io.opencensus.trace.propagation.BinaryFormat;
import io.opencensus.trace.propagation.SpanContextParseException;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -136,7 +138,7 @@ public class CensusModulesTest {
ClientStreamTracer.StreamInfo.newBuilder()
.setCallOptions(CallOptions.DEFAULT.withOption(NAME_RESOLUTION_DELAYED, 10L)).build();

private static class StringInputStream extends InputStream {
private static class StringInputStream extends InputStream implements KnownLength {
final String string;

StringInputStream(String string) {
Expand All @@ -149,6 +151,11 @@ public int read() {
// passed to the InProcess server and consumed by MARSHALLER.parse().
throw new UnsupportedOperationException("Should not be called");
}

@Override
public int available() throws IOException {
return string == null ? 0 : string.length();
}
}

private static final MethodDescriptor.Marshaller<String> MARSHALLER =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,13 +136,6 @@ protected abstract InternalServer newServer(
*/
protected abstract String testAuthority(InternalServer server);

/**
* Returns true (which is default) if the transport reports message sizes to StreamTracers.
*/
protected boolean sizesReported() {
return true;
}

protected final Attributes eagAttrs() {
return EAG_ATTRS;
}
Expand All @@ -163,9 +156,9 @@ public void log(ChannelLogLevel level, String messageFormat, Object... args) {}
* tests in an indeterminate state.
*/
protected InternalServer server;
private ServerTransport serverTransport;
private ManagedClientTransport client;
private MethodDescriptor<String, String> methodDescriptor =
protected ServerTransport serverTransport;
protected ManagedClientTransport client;
protected MethodDescriptor<String, String> methodDescriptor =
MethodDescriptor.<String, String>newBuilder()
.setType(MethodDescriptor.MethodType.UNKNOWN)
.setFullMethodName("service/method")
Expand All @@ -182,22 +175,22 @@ public void log(ChannelLogLevel level, String messageFormat, Object... args) {}
"tracer-key", Metadata.ASCII_STRING_MARSHALLER);
private final String tracerKeyValue = "tracer-key-value";

private ManagedClientTransport.Listener mockClientTransportListener
protected ManagedClientTransport.Listener mockClientTransportListener
= mock(ManagedClientTransport.Listener.class);
private MockServerListener serverListener = new MockServerListener();
protected MockServerListener serverListener = new MockServerListener();
private ArgumentCaptor<Throwable> throwableCaptor = ArgumentCaptor.forClass(Throwable.class);
private final TestClientStreamTracer clientStreamTracer1 = new TestHeaderClientStreamTracer();
protected final TestClientStreamTracer clientStreamTracer1 = new TestHeaderClientStreamTracer();
private final TestClientStreamTracer clientStreamTracer2 = new TestHeaderClientStreamTracer();
private final ClientStreamTracer[] tracers = new ClientStreamTracer[] {
protected final ClientStreamTracer[] tracers = new ClientStreamTracer[] {
clientStreamTracer1, clientStreamTracer2
};
private final ClientStreamTracer[] noopTracers = new ClientStreamTracer[] {
new ClientStreamTracer() {}
};

private final TestServerStreamTracer serverStreamTracer1 = new TestServerStreamTracer();
protected final TestServerStreamTracer serverStreamTracer1 = new TestServerStreamTracer();
private final TestServerStreamTracer serverStreamTracer2 = new TestServerStreamTracer();
private final ServerStreamTracer.Factory serverStreamTracerFactory = mock(
protected final ServerStreamTracer.Factory serverStreamTracerFactory = mock(
ServerStreamTracer.Factory.class,
delegatesTo(new ServerStreamTracer.Factory() {
final ArrayDeque<TestServerStreamTracer> tracers =
Expand Down Expand Up @@ -857,26 +850,16 @@ public void basicStream() throws Exception {
message.close();
assertThat(clientStreamTracer1.nextOutboundEvent())
.matches("outboundMessageSent\\(0, -?[0-9]+, -?[0-9]+\\)");
if (sizesReported()) {
assertThat(clientStreamTracer1.getOutboundWireSize()).isGreaterThan(0L);
assertThat(clientStreamTracer1.getOutboundUncompressedSize()).isGreaterThan(0L);
} else {
assertThat(clientStreamTracer1.getOutboundWireSize()).isEqualTo(0L);
assertThat(clientStreamTracer1.getOutboundUncompressedSize()).isEqualTo(0L);
}
assertThat(clientStreamTracer1.getOutboundWireSize()).isGreaterThan(0L);
assertThat(clientStreamTracer1.getOutboundUncompressedSize()).isGreaterThan(0L);
assertThat(serverStreamTracer1.nextInboundEvent()).isEqualTo("inboundMessage(0)");
assertNull("no additional message expected", serverStreamListener.messageQueue.poll());

clientStream.halfClose();
assertTrue(serverStreamListener.awaitHalfClosed(TIMEOUT_MS, TimeUnit.MILLISECONDS));

if (sizesReported()) {
assertThat(serverStreamTracer1.getInboundWireSize()).isGreaterThan(0L);
assertThat(serverStreamTracer1.getInboundUncompressedSize()).isGreaterThan(0L);
} else {
assertThat(serverStreamTracer1.getInboundWireSize()).isEqualTo(0L);
assertThat(serverStreamTracer1.getInboundUncompressedSize()).isEqualTo(0L);
}
assertThat(serverStreamTracer1.getInboundWireSize()).isGreaterThan(0L);
assertThat(serverStreamTracer1.getInboundUncompressedSize()).isGreaterThan(0L);
assertThat(serverStreamTracer1.nextInboundEvent())
.matches("inboundMessageRead\\(0, -?[0-9]+, -?[0-9]+\\)");

Expand Down Expand Up @@ -907,25 +890,15 @@ public void basicStream() throws Exception {
assertNotNull("message expected", message);
assertThat(serverStreamTracer1.nextOutboundEvent())
.matches("outboundMessageSent\\(0, -?[0-9]+, -?[0-9]+\\)");
if (sizesReported()) {
assertThat(serverStreamTracer1.getOutboundWireSize()).isGreaterThan(0L);
assertThat(serverStreamTracer1.getOutboundUncompressedSize()).isGreaterThan(0L);
} else {
assertThat(serverStreamTracer1.getOutboundWireSize()).isEqualTo(0L);
assertThat(serverStreamTracer1.getOutboundUncompressedSize()).isEqualTo(0L);
}
assertThat(serverStreamTracer1.getOutboundWireSize()).isGreaterThan(0L);
assertThat(serverStreamTracer1.getOutboundUncompressedSize()).isGreaterThan(0L);
assertTrue(clientStreamTracer1.getInboundHeaders());
assertThat(clientStreamTracer1.nextInboundEvent()).isEqualTo("inboundMessage(0)");
assertEquals("Hi. Who are you?", methodDescriptor.parseResponse(message));
assertThat(clientStreamTracer1.nextInboundEvent())
.matches("inboundMessageRead\\(0, -?[0-9]+, -?[0-9]+\\)");
if (sizesReported()) {
assertThat(clientStreamTracer1.getInboundWireSize()).isGreaterThan(0L);
assertThat(clientStreamTracer1.getInboundUncompressedSize()).isGreaterThan(0L);
} else {
assertThat(clientStreamTracer1.getInboundWireSize()).isEqualTo(0L);
assertThat(clientStreamTracer1.getInboundUncompressedSize()).isEqualTo(0L);
}
assertThat(clientStreamTracer1.getInboundWireSize()).isGreaterThan(0L);
assertThat(clientStreamTracer1.getInboundUncompressedSize()).isGreaterThan(0L);

message.close();
assertNull("no additional message expected", clientStreamListener.messageQueue.poll());
Expand Down Expand Up @@ -1285,17 +1258,10 @@ public void onReady() {
serverStream.close(Status.OK, new Metadata());
assertTrue(clientStreamTracer1.getOutboundHeaders());
assertTrue(clientStreamTracer1.getInboundHeaders());
if (sizesReported()) {
assertThat(clientStreamTracer1.getInboundWireSize()).isGreaterThan(0L);
assertThat(clientStreamTracer1.getInboundUncompressedSize()).isGreaterThan(0L);
assertThat(serverStreamTracer1.getOutboundWireSize()).isGreaterThan(0L);
assertThat(serverStreamTracer1.getOutboundUncompressedSize()).isGreaterThan(0L);
} else {
assertThat(clientStreamTracer1.getInboundWireSize()).isEqualTo(0L);
assertThat(clientStreamTracer1.getInboundUncompressedSize()).isEqualTo(0L);
assertThat(serverStreamTracer1.getOutboundWireSize()).isEqualTo(0L);
assertThat(serverStreamTracer1.getOutboundUncompressedSize()).isEqualTo(0L);
}
assertThat(clientStreamTracer1.getInboundWireSize()).isGreaterThan(0L);
assertThat(clientStreamTracer1.getInboundUncompressedSize()).isGreaterThan(0L);
assertThat(serverStreamTracer1.getOutboundWireSize()).isGreaterThan(0L);
assertThat(serverStreamTracer1.getOutboundUncompressedSize()).isGreaterThan(0L);
assertNull(clientStreamTracer1.getInboundTrailers());
assertSame(status, clientStreamTracer1.getStatus());
// There is a race between client cancelling and server closing. The final status seen by the
Expand Down Expand Up @@ -2184,7 +2150,7 @@ private static void runIfNotNull(Runnable runnable) {
}
}

private static void startTransport(
protected static void startTransport(
ManagedClientTransport clientTransport,
ManagedClientTransport.Listener listener) {
runIfNotNull(clientTransport.start(listener));
Expand All @@ -2202,7 +2168,7 @@ public void streamCreated(Attributes transportAttrs, Metadata metadata) {
}
}

private static class MockServerListener implements ServerListener {
public static class MockServerListener implements ServerListener {
public final BlockingQueue<MockServerTransportListener> listeners
= new LinkedBlockingQueue<>();
private final SettableFuture<?> shutdown = SettableFuture.create();
Expand Down Expand Up @@ -2233,7 +2199,7 @@ public MockServerTransportListener takeListenerOrFail(long timeout, TimeUnit uni
}
}

private static class MockServerTransportListener implements ServerTransportListener {
public static class MockServerTransportListener implements ServerTransportListener {
public final ServerTransport transport;
public final BlockingQueue<StreamCreation> streams = new LinkedBlockingQueue<>();
private final SettableFuture<?> terminated = SettableFuture.create();
Expand Down Expand Up @@ -2281,8 +2247,8 @@ public StreamCreation takeStreamOrFail(long timeout, TimeUnit unit)
}
}

private static class ServerStreamListenerBase implements ServerStreamListener {
private final BlockingQueue<InputStream> messageQueue = new LinkedBlockingQueue<>();
public static class ServerStreamListenerBase implements ServerStreamListener {
public final BlockingQueue<InputStream> messageQueue = new LinkedBlockingQueue<>();
// Would have used Void instead of Object, but null elements are not allowed
private final BlockingQueue<Object> readyQueue = new LinkedBlockingQueue<>();
private final CountDownLatch halfClosedLatch = new CountDownLatch(1);
Expand Down Expand Up @@ -2341,8 +2307,8 @@ public void closed(Status status) {
}
}

private static class ClientStreamListenerBase implements ClientStreamListener {
private final BlockingQueue<InputStream> messageQueue = new LinkedBlockingQueue<>();
public static class ClientStreamListenerBase implements ClientStreamListener {
public final BlockingQueue<InputStream> messageQueue = new LinkedBlockingQueue<>();
// Would have used Void instead of Object, but null elements are not allowed
private final BlockingQueue<Object> readyQueue = new LinkedBlockingQueue<>();
private final SettableFuture<Metadata> headers = SettableFuture.create();
Expand Down Expand Up @@ -2399,7 +2365,7 @@ public void closed(Status status, RpcProgress rpcProgress, Metadata trailers) {
}
}

private static class StreamCreation {
public static class StreamCreation {
public final ServerStream stream;
public final String method;
public final Metadata headers;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ public static InProcessChannelBuilder forAddress(String name, int port) {
private ScheduledExecutorService scheduledExecutorService;
private int maxInboundMetadataSize = Integer.MAX_VALUE;
private boolean transportIncludeStatusCause = false;
private long assumedMessageSize = -1;

private InProcessChannelBuilder(@Nullable SocketAddress directAddress, @Nullable String target) {

Expand All @@ -117,10 +118,6 @@ public ClientTransportFactory buildClientTransportFactory() {
managedChannelImplBuilder.setStatsRecordStartedRpcs(false);
managedChannelImplBuilder.setStatsRecordFinishedRpcs(false);
managedChannelImplBuilder.setStatsRecordRetryMetrics(false);

// By default, In-process transport should not be retriable as that leaks memory. Since
// there is no wire, bytes aren't calculated so buffer limit isn't respected
managedChannelImplBuilder.disableRetry();
}

@Internal
Expand Down Expand Up @@ -225,9 +222,24 @@ public InProcessChannelBuilder propagateCauseWithStatus(boolean enable) {
return this;
}

/**
* Assumes RPC messages are the specified size. This avoids serializing
* messages for metrics and retry memory tracking. This can dramatically
* improve performance when accurate message sizes are not needed and if
* nothing else needs the serialized message.
* @param assumedMessageSize length of InProcess transport's messageSize.
* @return this
* @throws IllegalArgumentException if assumedMessageSize is negative.
*/
public InProcessChannelBuilder assumedMessageSize(long assumedMessageSize) {
checkArgument(assumedMessageSize >= 0, "assumedMessageSize must be >= 0");
this.assumedMessageSize = assumedMessageSize;
return this;
}

ClientTransportFactory buildTransportFactory() {
return new InProcessClientTransportFactory(
scheduledExecutorService, maxInboundMetadataSize, transportIncludeStatusCause);
return new InProcessClientTransportFactory(scheduledExecutorService,
maxInboundMetadataSize, transportIncludeStatusCause, assumedMessageSize);
}

void setStatsEnabled(boolean value) {
Expand All @@ -243,15 +255,17 @@ static final class InProcessClientTransportFactory implements ClientTransportFac
private final int maxInboundMetadataSize;
private boolean closed;
private final boolean includeCauseWithStatus;
private long assumedMessageSize;

private InProcessClientTransportFactory(
@Nullable ScheduledExecutorService scheduledExecutorService,
int maxInboundMetadataSize, boolean includeCauseWithStatus) {
int maxInboundMetadataSize, boolean includeCauseWithStatus, long assumedMessageSize) {
useSharedTimer = scheduledExecutorService == null;
timerService = useSharedTimer
? SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE) : scheduledExecutorService;
this.maxInboundMetadataSize = maxInboundMetadataSize;
this.includeCauseWithStatus = includeCauseWithStatus;
this.assumedMessageSize = assumedMessageSize;
}

@Override
Expand All @@ -263,7 +277,7 @@ public ConnectionClientTransport newClientTransport(
// TODO(carl-mastrangelo): Pass channelLogger in.
return new InProcessTransport(
addr, maxInboundMetadataSize, options.getAuthority(), options.getUserAgent(),
options.getEagAttributes(), includeCauseWithStatus);
options.getEagAttributes(), includeCauseWithStatus, assumedMessageSize);
}

@Override
Expand Down
Loading

0 comments on commit ca43d78

Please sign in to comment.