Skip to content

Commit

Permalink
Merge pull request #225 from anandrgitnirman/prodversion
Browse files Browse the repository at this point in the history
Issue #218 : GRPC Healthcheck support on Daemon
  • Loading branch information
vinthedark authored Mar 5, 2019
2 parents baab198 + 2fd9f04 commit 94d2bca
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 84 deletions.
30 changes: 10 additions & 20 deletions metrics/clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ package metrics
import (
"bytes"
"context"
"encoding/json"
"errors"
pb "github.com/singnet/snet-daemon/metrics/services"
"github.com/singnet/snet-daemon/config"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/health/grpc_health_v1"
"io/ioutil"
"net/http"
"time"
Expand All @@ -24,36 +24,26 @@ type Response struct {
}

// Calls a gRPC endpoint for heartbeat (gRPC Client)
func callgRPCServiceHeartbeat(grpcAddress string) ([]byte, error) {
func callgRPCServiceHeartbeat(serviceUrl string) (grpc_health_v1.HealthCheckResponse_ServingStatus, error) {
// Set up a connection to the server.
conn, err := grpc.Dial(grpcAddress, grpc.WithInsecure())
conn, err := grpc.Dial(serviceUrl, grpc.WithInsecure())
if err != nil {
log.WithError(err).Warningf("unable to connect to grpc endpoint: %v", err)
return nil, err
return grpc_health_v1.HealthCheckResponse_NOT_SERVING, err
}
defer conn.Close()
// create the client instance
client := pb.NewHeartbeatClient(conn)
// connect to the server and call the required method
client := grpc_health_v1.NewHealthClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

//call the heartbeat rpc method
resp, err := client.Check(ctx, &pb.Empty{})
req := grpc_health_v1.HealthCheckRequest{Service:config.GetString(config.ServiceId)}
resp, err := client.Check(ctx,&req)
if err != nil {
log.WithError(err).Warningf("error in calling the heartbeat service : %v", err)
return nil, err
}
//convert enum to string, because json marshal doesnt do it
responseConv := &Response{ServiceID: resp.ServiceID, Status: resp.Status.String()}
jsonResp, err := json.Marshal(responseConv)
if err != nil {
log.Infof("response received : %v", responseConv)
log.WithError(err).Warningf("invalid service response : %v", err)
return nil, err
return grpc_health_v1.HealthCheckResponse_UNKNOWN, err
}
log.Infof("service heartbeat received : %s", string(jsonResp))
return jsonResp, nil
return resp.Status,nil
}

// calls the service heartbeat and relay the message to daemon (HTTP client for heartbeat)
Expand Down
38 changes: 20 additions & 18 deletions metrics/clients_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,37 @@ package metrics

import (
"context"
"encoding/json"
"github.com/singnet/snet-daemon/metrics/services"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc"
pb "google.golang.org/grpc/health/grpc_health_v1"
"net"
"testing"

pb "github.com/singnet/snet-daemon/metrics/services"
)

// server is used to implement api.HeartbeatServer
type server struct{}

func (s *server) Check(ctx context.Context, in *pb.Empty) (*pb.HeartbeatMsg, error) {
return &pb.HeartbeatMsg{ServiceID: "SAMPLE002", Status: pb.HeartbeatMsg_SERVING}, nil
func (s *server) Check(ctx context.Context, in *pb.HealthCheckRequest) (*pb.HealthCheckResponse, error) {
return &pb.HealthCheckResponse{Status: pb.HealthCheckResponse_SERVING}, nil
}

const (
testPort = ":33333"
)

type clientImplHeartBeat struct {

}

// Check implements `service Health`.
func (service *clientImplHeartBeat) Check( ctx context.Context, req *pb.HealthCheckRequest) (*pb.HealthCheckResponse, error) {
return &pb.HealthCheckResponse{Status:pb.HealthCheckResponse_SERVING},nil
}

func (service *clientImplHeartBeat) Watch(*pb.HealthCheckRequest, pb.Health_WatchServer) (error) {
return nil
}

// mocks grpc service endpoint for unit tests
func StartMockGrpcService() {
ch := make(chan int)
Expand All @@ -37,7 +47,7 @@ func StartMockGrpcService() {
panic(err)
}
grpcServer := grpc.NewServer()
pb.RegisterHeartbeatServer(grpcServer, &server{})
pb.RegisterHealthServer(grpcServer, &clientImplHeartBeat{})
ch <- 0
grpcServer.Serve(lis)
}()
Expand All @@ -53,13 +63,7 @@ func Test_callgRPCServiceHeartbeat(t *testing.T) {
assert.False(t, err != nil)

assert.NotEqual(t, `{}`, string(heartbeat), "Service Heartbeat must not be empty.")
assert.Equal(t, `{"serviceID":"SAMPLE002","status":"SERVING"}`, string(heartbeat),
"Unexpected service heartbeat")

var sHeartbeat grpc_health_v1.HeartbeatMsg
err = json.Unmarshal(heartbeat, &sHeartbeat)
assert.True(t, err != nil)
assert.Equal(t, "SAMPLE002", sHeartbeat.ServiceID, "Unexpected service ID")
assert.Equal(t,heartbeat.String(),pb.HealthCheckResponse_SERVING.String())

serviceURL = "localhost:26000"
heartbeat, err = callgRPCServiceHeartbeat(serviceURL)
Expand All @@ -68,20 +72,18 @@ func Test_callgRPCServiceHeartbeat(t *testing.T) {

func Test_callHTTPServiceHeartbeat(t *testing.T) {
serviceURL := "http://demo3208027.mockable.io/heartbeat"

heartbeat, err := callHTTPServiceHeartbeat(serviceURL)
assert.False(t, err != nil)

assert.NotEqual(t, string(heartbeat), `{}`, "Service Heartbeat must not be empty.")
assert.Equal(t, string(heartbeat), `{"serviceID":"SERVICE001", "status":"SERVING"}`,
"Unexpected service heartbeat")

var sHeartbeat grpc_health_v1.HeartbeatMsg
/* var sHeartbeat pb.HeartbeatMsg
err = json.Unmarshal(heartbeat, &sHeartbeat)
assert.True(t, err != nil)
assert.Equal(t, sHeartbeat.ServiceID, "SERVICE001", "Unexpected service ID")
serviceURL = "http://demo3208027.mockable.io"
*/ serviceURL = "http://demo3208027.mockable.io"
heartbeat, err = callHTTPServiceHeartbeat(serviceURL)
assert.True(t, err != nil)
}
Expand Down
49 changes: 34 additions & 15 deletions metrics/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
// All rights reserved.
// <<add licence terms for code reuse>>

//go:generate protoc -I services/ services/heartbeat.proto --go_out=plugins=grpc:services

// package for monitoring and reporting the daemon metrics
package metrics
Expand All @@ -13,8 +12,11 @@ import (
"fmt"
"github.com/singnet/snet-daemon/config"
log "github.com/sirupsen/logrus"
"golang.org/x/net/context"
"google.golang.org/grpc/health/grpc_health_v1"
"net/http"
"strconv"
"strings"
"time"
)

Expand Down Expand Up @@ -62,36 +64,34 @@ func SetNoHeartbeatURLState(state bool) {
func ValidateHeartbeatConfig() error {
//initialize the url state to false
SetNoHeartbeatURLState(false)

// check if the configured type is not supported
hbType := config.GetString(config.GetString(config.ServiceHeartbeatType))
if hbType != "grpc" && hbType != "http" && hbType != "none" && hbType != "" {
hbType := config.GetString(config.ServiceHeartbeatType)
if hbType != "grpc" && hbType != "http" && hbType != "https" && hbType != "none" && hbType != "" {
return fmt.Errorf("unrecognized heartbet service type : '%+v'", hbType)
}

// if the URLs are empty, or hbtype is None or empty consider it as not configured
if hbType == "" || hbType == "none" || config.GetString(config.HeartbeatServiceEndpoint) == "" {
SetNoHeartbeatURLState(true)
} else if !config.IsValidUrl(config.GetString(config.HeartbeatServiceEndpoint)) {
return errors.New("service endpoint must be a valid URL")
}

return nil
}

// prepares the heartbeat, which includes calling to underlying service DAemon is serving
func GetHeartbeat(serviceURL string, serviceType string, serviceID string) DaemonHeartbeat {
heartbeat := DaemonHeartbeat{GetDaemonID(), strconv.FormatInt(getEpochTime(), 10), Online.String(), "{}"}

func GetHeartbeat(serviceURL string, serviceType string, serviceID string) (heartbeat DaemonHeartbeat,err error) {
heartbeat = DaemonHeartbeat{GetDaemonID(), strconv.FormatInt(getEpochTime(), 10), Online.String(), "{}"}
var curResp = `{"serviceID":"` + serviceID + `","status":"NOT_SERVING"}`
if serviceType == "none" || serviceType == "" || isNoHeartbeatURL {
curResp = `{"serviceID":"` + serviceID + `","status":"SERVING"}`
} else {
var svcHeartbeat []byte
var err error
if serviceType == "grpc" {
svcHeartbeat, err = callgRPCServiceHeartbeat(serviceURL)
} else if serviceType == "http" {
var response grpc_health_v1.HealthCheckResponse_ServingStatus
response, err = callgRPCServiceHeartbeat(serviceURL)
//Standardize this as well on the response being sent
heartbeat.Status = response.String()
} else if serviceType == "http" || serviceType == "https" {
svcHeartbeat, err = callHTTPServiceHeartbeat(serviceURL)
}
if err != nil {
Expand All @@ -113,7 +113,7 @@ func GetHeartbeat(serviceURL string, serviceType string, serviceID string) Daemo
}
}
heartbeat.ServiceHeartbeat = curResp
return heartbeat
return heartbeat,err
}

// Heartbeat request handler function : upon request it will hit the service for status and
Expand All @@ -122,14 +122,33 @@ func HeartbeatHandler(rw http.ResponseWriter, r *http.Request) {
// read the heartbeat service type and corresponding URL
serviceType := config.GetString(config.ServiceHeartbeatType)
serviceURL := config.GetString(config.HeartbeatServiceEndpoint)
serviceID := config.ServiceId
heartbeat := GetHeartbeat(serviceURL, serviceType, serviceID)
serviceID := config.GetString(config.ServiceId)
heartbeat,_ := GetHeartbeat(serviceURL, serviceType, serviceID)
err := json.NewEncoder(rw).Encode(heartbeat)
if err != nil {
log.WithError(err).Infof("Failed to write heartbeat message.")
}
}

// Check implements `service Health`.
func (service *DaemonHeartbeat) Check( ctx context.Context, req *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) {

heartbeat,err := GetHeartbeat(config.GetString(config.HeartbeatServiceEndpoint), config.GetString(config.ServiceHeartbeatType),
config.GetString(config.ServiceId))

if strings.Compare(heartbeat.Status,Online.String()) == 0 {
return &grpc_health_v1.HealthCheckResponse{Status:grpc_health_v1.HealthCheckResponse_SERVING},nil
}

return &grpc_health_v1.HealthCheckResponse{Status:grpc_health_v1.HealthCheckResponse_SERVICE_UNKNOWN},errors.New("Service heartbeat unknown "+err.Error())
}

// Watch implements `service Watch todo for later`.
func (service *DaemonHeartbeat) Watch(*grpc_health_v1.HealthCheckRequest, grpc_health_v1.Health_WatchServer) (error) {
return nil
}


/*
service heartbeat/grpc heartbeat
{"serviceID":"sample1", "status":"SERVING"}
Expand Down
14 changes: 7 additions & 7 deletions metrics/heartbeat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ package metrics

import (
"encoding/json"
"google.golang.org/grpc/health/grpc_health_v1"
"io/ioutil"
"net/http"
"net/http/httptest"
"testing"

"github.com/singnet/snet-daemon/metrics/services"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -60,7 +60,7 @@ func Test_GetHeartbeat(t *testing.T) {
serviceType := "http"
serviveID := "SERVICE001"

dHeartbeat := GetHeartbeat(serviceURL, serviceType, serviveID)
dHeartbeat,_ := GetHeartbeat(serviceURL, serviceType, serviveID)
assert.NotNil(t, dHeartbeat, "heartbeat must not be nil")

assert.Equal(t, dHeartbeat.Status, Online.String(), "Invalid State")
Expand All @@ -73,14 +73,14 @@ func Test_GetHeartbeat(t *testing.T) {
assert.Equal(t, dHeartbeat.ServiceHeartbeat, `{"serviceID":"SERVICE001", "status":"SERVING"}`,
"Unexpected service heartbeat")

var sHeartbeat grpc_health_v1.HeartbeatMsg
var sHeartbeat DaemonHeartbeat
err := json.Unmarshal([]byte(dHeartbeat.ServiceHeartbeat), &sHeartbeat)
assert.True(t, err != nil)
assert.Equal(t, sHeartbeat.ServiceID, "SERVICE001", "Unexpected service ID")
assert.True(t, err == nil)
assert.Equal(t, sHeartbeat.Status, grpc_health_v1.HealthCheckResponse_SERVING.String())

// check with some timeout URL
serviceURL = "http://demo3208027.mockable.io"
dHeartbeat = GetHeartbeat(serviceURL, serviceType, serviveID)
dHeartbeat,_ = GetHeartbeat(serviceURL, serviceType, serviveID)
assert.NotNil(t, dHeartbeat, "heartbeat must not be nil")

assert.Equal(t, dHeartbeat.Status, Warning.String(), "Invalid State")
Expand Down Expand Up @@ -116,5 +116,5 @@ func TestSetNoHeartbeatURLState(t *testing.T) {
func TestValidateHeartbeatConfig(t *testing.T) {
err := ValidateHeartbeatConfig()
assert.Nil(t, err)
assert.Equal(t, true, isNoHeartbeatURL)
assert.Equal(t, false, isNoHeartbeatURL)
}
21 changes: 0 additions & 21 deletions metrics/services/heartbeat.proto

This file was deleted.

13 changes: 11 additions & 2 deletions snetd/cmd/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@ type Components struct {
blockchain *blockchain.Processor
etcdClient *etcddb.EtcdClient
etcdServer *etcddb.EtcdServer
atomicStorage escrow.AtomicStorage
atomicStorage escrow.AtomicStorage
paymentChannelService escrow.PaymentChannelService
escrowPaymentHandler handler.PaymentHandler
grpcInterceptor grpc.StreamServerInterceptor
paymentChannelStateService *escrow.PaymentChannelStateService
etcdLockerStorage *escrow.PrefixedAtomicStorage
etcdLockerStorage *escrow.PrefixedAtomicStorage
providerControlService *escrow.ProviderControlService
daemonHeartbeat *metrics.DaemonHeartbeat
}

func InitComponents(cmd *cobra.Command) (components *Components) {
Expand Down Expand Up @@ -249,3 +250,11 @@ func (components *Components) ProviderControlService() (service *escrow.Provider
return components.providerControlService
}

func (components *Components) DaemonHeartBeat() (service *metrics.DaemonHeartbeat) {
if components.daemonHeartbeat != nil {
return components.daemonHeartbeat
}
metrics.SetDaemonGrpId(components.ServiceMetaData().GetDaemonGroupIDString())
components.daemonHeartbeat = &metrics.DaemonHeartbeat{DaemonID:metrics.GetDaemonID()}
return components.daemonHeartbeat
}
3 changes: 2 additions & 1 deletion snetd/cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"crypto/tls"
"fmt"
"github.com/singnet/snet-daemon/metrics"
"google.golang.org/grpc/health/grpc_health_v1"
"net"
"net/http"
"os"
Expand Down Expand Up @@ -179,7 +180,7 @@ func (d daemon) start() {
)
escrow.RegisterPaymentChannelStateServiceServer(d.grpcServer, d.components.PaymentChannelStateService())
escrow.RegisterProviderControlServiceServer(d.grpcServer,d.components.ProviderControlService())

grpc_health_v1.RegisterHealthServer(d.grpcServer,d.components.DaemonHeartBeat())
mux := cmux.New(d.lis)
// Use "prefix" matching to support "application/grpc*" e.g. application/grpc+proto or +json
// Use SendSettings for compatibility with Java gRPC clients:
Expand Down

0 comments on commit 94d2bca

Please sign in to comment.