Skip to content

Commit 2bd0e6d

Browse files
committed
[FAB-4305] Implement gRPC keepalive support
There are a couple of defects related to long running gRPC connections being broken by intermediaries due to inactivity. The latest version of the gRPC libraries added support for keepliaves to deal with this situation. This change adds keepalive support for gRPC server and client connections. The defaults are currently being used and no user configuration has been exposed. The defaults are sane (5min inactivity and 20sec ping response) for typical environments. Will submit a separate change for gRPC configuration in general, although we can decide whether or not to take that on at this point. Change-Id: Idf63e37eb219e7961a2217193d6c00a69d259998 Signed-off-by: Gari Singh <[email protected]>
1 parent 3b40efa commit 2bd0e6d

File tree

6 files changed

+150
-0
lines changed

6 files changed

+150
-0
lines changed

core/comm/config.go

+66
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,11 @@ SPDX-License-Identifier: Apache-2.0
77
package comm
88

99
import (
10+
"time"
11+
1012
"github.com/spf13/viper"
13+
"google.golang.org/grpc"
14+
"google.golang.org/grpc/keepalive"
1115
)
1216

1317
var (
@@ -18,8 +22,34 @@ var (
1822
// Max send and receive bytes for grpc clients and servers
1923
maxRecvMsgSize = 100 * 1024 * 1024
2024
maxSendMsgSize = 100 * 1024 * 1024
25+
// Default keepalive options
26+
keepaliveOptions = KeepaliveOptions{
27+
ClientKeepaliveTime: 300, // 5 min
28+
ClientKeepaliveTimeout: 20, // 20 sec - gRPC default
29+
ServerKeepaliveTime: 7200, // 2 hours - gRPC default
30+
ServerKeepaliveTimeout: 20, // 20 sec - gRPC default
31+
}
2132
)
2233

34+
// KeepAliveOptions is used to set the gRPC keepalive settings for both
35+
// clients and servers
36+
type KeepaliveOptions struct {
37+
// ClientKeepaliveTime is the duration in seconds after which if the client
38+
// does not see any activity from the server it pings the server to see
39+
// if it is alive
40+
ClientKeepaliveTime int
41+
// ClientKeepaliveTimeout is the duration the client waits for a response
42+
// from the server after sending a ping before closing the connection
43+
ClientKeepaliveTimeout int
44+
// ServerKeepaliveTime is the duration in seconds after which if the server
45+
// does not see any activity from the client it pings the client to see
46+
// if it is alive
47+
ServerKeepaliveTime int
48+
// ServerKeepaliveTimeout is the duration the server waits for a response
49+
// from the client after sending a ping before closing the connection
50+
ServerKeepaliveTimeout int
51+
}
52+
2353
// cacheConfiguration caches common package scoped variables
2454
func cacheConfiguration() {
2555
if !configurationCached {
@@ -59,3 +89,39 @@ func MaxSendMsgSize() int {
5989
func SetMaxSendMsgSize(size int) {
6090
maxSendMsgSize = size
6191
}
92+
93+
// SetKeepaliveOptions sets the gRPC keepalive options for both clients and
94+
// servers
95+
func SetKeepaliveOptions(ka KeepaliveOptions) {
96+
keepaliveOptions = ka
97+
}
98+
99+
// ServerKeepaliveOptions returns the gRPC keepalive options for servers
100+
func ServerKeepaliveOptions() []grpc.ServerOption {
101+
var serverOpts []grpc.ServerOption
102+
kap := keepalive.ServerParameters{
103+
Time: time.Duration(keepaliveOptions.ServerKeepaliveTime) * time.Second,
104+
Timeout: time.Duration(keepaliveOptions.ServerKeepaliveTimeout) * time.Second,
105+
}
106+
serverOpts = append(serverOpts, grpc.KeepaliveParams(kap))
107+
kep := keepalive.EnforcementPolicy{
108+
// needs to match clientKeepalive
109+
MinTime: time.Duration(keepaliveOptions.ClientKeepaliveTime) * time.Second,
110+
// allow keepalive w/o rpc
111+
PermitWithoutStream: true,
112+
}
113+
serverOpts = append(serverOpts, grpc.KeepaliveEnforcementPolicy(kep))
114+
return serverOpts
115+
}
116+
117+
// ClientKeepaliveOptions returns the gRPC keepalive options for clients
118+
func ClientKeepaliveOptions() []grpc.DialOption {
119+
var dialOpts []grpc.DialOption
120+
kap := keepalive.ClientParameters{
121+
Time: time.Duration(keepaliveOptions.ClientKeepaliveTime) * time.Second,
122+
Timeout: time.Duration(keepaliveOptions.ClientKeepaliveTimeout) * time.Second,
123+
PermitWithoutStream: true,
124+
}
125+
dialOpts = append(dialOpts, grpc.WithKeepaliveParams(kap))
126+
return dialOpts
127+
}

core/comm/config_test.go

+16
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,22 @@ func TestConfig(t *testing.T) {
2727
assert.EqualValues(t, size, MaxRecvMsgSize())
2828
assert.EqualValues(t, size, MaxSendMsgSize())
2929

30+
// set keepalive options
31+
timeout := 1000
32+
ka := KeepaliveOptions{
33+
ClientKeepaliveTime: timeout,
34+
ClientKeepaliveTimeout: timeout + 1,
35+
ServerKeepaliveTime: timeout + 2,
36+
ServerKeepaliveTimeout: timeout + 3,
37+
}
38+
SetKeepaliveOptions(ka)
39+
assert.EqualValues(t, timeout, keepaliveOptions.ClientKeepaliveTime)
40+
assert.EqualValues(t, timeout+1, keepaliveOptions.ClientKeepaliveTimeout)
41+
assert.EqualValues(t, timeout+2, keepaliveOptions.ServerKeepaliveTime)
42+
assert.EqualValues(t, timeout+3, keepaliveOptions.ServerKeepaliveTimeout)
43+
assert.EqualValues(t, 2, len(ServerKeepaliveOptions()))
44+
assert.Equal(t, 1, len(ClientKeepaliveOptions()))
45+
3046
// reset cache
3147
configurationCached = false
3248
viper.Set("peer.tls.enabled", true)

core/comm/server.go

+3
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,9 @@ func NewGRPCServerFromListener(listener net.Listener, secureConfig SecureServerC
168168
// set max send and recv msg sizes
169169
serverOpts = append(serverOpts, grpc.MaxSendMsgSize(MaxSendMsgSize()))
170170
serverOpts = append(serverOpts, grpc.MaxRecvMsgSize(MaxRecvMsgSize()))
171+
// set the keepalive options
172+
serverOpts = append(serverOpts, ServerKeepaliveOptions()...)
173+
171174
grpcServer.server = grpc.NewServer(serverOpts...)
172175

173176
return grpcServer, nil

core/comm/server_test.go

+60
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"crypto/x509"
2222
"errors"
2323
"fmt"
24+
"io"
2425
"io/ioutil"
2526
"log"
2627
"net"
@@ -34,6 +35,7 @@ import (
3435
"golang.org/x/net/context"
3536
"google.golang.org/grpc"
3637
"google.golang.org/grpc/credentials"
38+
"google.golang.org/grpc/transport"
3739

3840
"github.com/hyperledger/fabric/core/comm"
3941
testpb "github.com/hyperledger/fabric/core/comm/testdata/grpc"
@@ -1357,3 +1359,61 @@ func TestSetClientRootCAs(t *testing.T) {
13571359
}
13581360

13591361
}
1362+
1363+
func TestKeepaliveNoClientResponse(t *testing.T) {
1364+
t.Parallel()
1365+
// set up GRPCServer instance
1366+
kap := comm.KeepaliveOptions{
1367+
ServerKeepaliveTime: 2,
1368+
ServerKeepaliveTimeout: 1,
1369+
}
1370+
comm.SetKeepaliveOptions(kap)
1371+
testAddress := "localhost:9400"
1372+
srv, err := comm.NewGRPCServer(testAddress, comm.SecureServerConfig{})
1373+
assert.NoError(t, err, "Unexpected error starting GRPCServer")
1374+
go srv.Start()
1375+
defer srv.Stop()
1376+
1377+
// test connection close if client does not response to ping
1378+
// net client will not response to keepalive
1379+
client, err := net.Dial("tcp", testAddress)
1380+
assert.NoError(t, err, "Unexpected error dialing GRPCServer")
1381+
defer client.Close()
1382+
// sleep past keepalive timeout
1383+
time.Sleep(4 * time.Second)
1384+
data := make([]byte, 24)
1385+
for {
1386+
_, err = client.Read(data)
1387+
if err == nil {
1388+
continue
1389+
}
1390+
assert.EqualError(t, err, io.EOF.Error(), "Expected io.EOF")
1391+
break
1392+
}
1393+
}
1394+
1395+
func TestKeepaliveClientResponse(t *testing.T) {
1396+
t.Parallel()
1397+
// set up GRPCServer instance
1398+
kap := comm.KeepaliveOptions{
1399+
ServerKeepaliveTime: 2,
1400+
ServerKeepaliveTimeout: 1,
1401+
}
1402+
comm.SetKeepaliveOptions(kap)
1403+
testAddress := "localhost:9401"
1404+
srv, err := comm.NewGRPCServer(testAddress, comm.SecureServerConfig{})
1405+
assert.NoError(t, err, "Unexpected error starting GRPCServer")
1406+
go srv.Start()
1407+
defer srv.Stop()
1408+
1409+
// test that connection does not close with response to ping
1410+
clientTransport, err := transport.NewClientTransport(context.Background(),
1411+
transport.TargetInfo{Addr: testAddress}, transport.ConnectOptions{})
1412+
assert.NoError(t, err, "Unexpected error creating client transport")
1413+
defer clientTransport.Close()
1414+
// sleep past keepalive timeout
1415+
time.Sleep(4 * time.Second)
1416+
// try to create a stream
1417+
_, err = clientTransport.NewStream(context.Background(), &transport.CallHdr{})
1418+
assert.NoError(t, err, "Unexpected error creating stream")
1419+
}

core/deliverservice/deliveryclient.go

+2
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,8 @@ func DefaultConnectionFactory(endpoint string) (*grpc.ClientConn, error) {
194194
// set max send/recv msg sizes
195195
dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(comm.MaxRecvMsgSize()),
196196
grpc.MaxCallSendMsgSize(comm.MaxSendMsgSize())))
197+
// set the keepalive options
198+
dialOpts = append(dialOpts, comm.ClientKeepaliveOptions()...)
197199

198200
if comm.TLSEnabled() {
199201
dialOpts = append(dialOpts, grpc.WithTransportCredentials(comm.GetCASupport().GetDeliverServiceCredentials()))

peer/node/start.go

+3
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,9 @@ func serve(args []string) error {
157157
// set max send/recv msg sizes
158158
dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(comm.MaxRecvMsgSize()),
159159
grpc.MaxCallSendMsgSize(comm.MaxSendMsgSize())))
160+
// set the keepalive options
161+
dialOpts = append(dialOpts, comm.ClientKeepaliveOptions()...)
162+
160163
if comm.TLSEnabled() {
161164
dialOpts = append(dialOpts, grpc.WithTransportCredentials(comm.GetCASupport().GetPeerCredentials()))
162165
} else {

0 commit comments

Comments
 (0)