Skip to content

Commit

Permalink
Merge pull request #594 from pls-github-dont-suspend-me/dev
Browse files Browse the repository at this point in the history
Add integration test for grpcToGRPC
  • Loading branch information
semyon-dev authored Sep 25, 2024
2 parents 9ad2d82 + bb57c66 commit cb36641
Show file tree
Hide file tree
Showing 5 changed files with 278 additions and 18 deletions.
3 changes: 2 additions & 1 deletion etcddb/etcddb_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ func NewEtcdClientFromVip(vip *viper.Viper, metaData *blockchain.OrganizationMet
var etcdv3 *clientv3.Client

if utils.CheckIfHttps(metaData.GetPaymentStorageEndPoints()) {
tlsConfig, err := getTlsConfig()
var tlsConfig *tls.Config
tlsConfig, err = getTlsConfig()
if err != nil {
return nil, err
}
Expand Down
17 changes: 17 additions & 0 deletions handler/intergrationtests/grpc_stream_test.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
syntax = "proto3";

package handler;

option go_package = "../intergrationtests";

service ExampleStreamingService {
rpc Stream(stream InStream) returns (stream OutStream) {}
}

message InStream {
string message = 1;
}

message OutStream {
string message = 1;
}
236 changes: 236 additions & 0 deletions handler/intergrationtests/grpc_to_grpc_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
//go:generate protoc grpc_stream_test.proto --go-grpc_out=. --go_out=.

package intergrationtests

import (
"context"
"io"
"net"
"testing"
"time"

"github.com/singnet/snet-daemon/blockchain"
"github.com/singnet/snet-daemon/config"
"github.com/singnet/snet-daemon/handler"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/spf13/viper"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

type integrationTestEnvironment struct {
serverA *grpc.Server
listenerA *net.Listener
serverB *grpc.Server
listenerB *net.Listener
}

func setupTestConfig() {
testConfigJson := `
{
"blockchain_enabled": true,
"blockchain_network_selected": "sepolia",
"daemon_end_point": "127.0.0.1:8080",
"daemon_group_name":"default_group",
"payment_channel_storage_type": "etcd",
"ipfs_end_point": "http://ipfs.singularitynet.io:80",
"ipfs_timeout" : 30,
"passthrough_enabled": true,
"passthrough_endpoint":"http://127.0.0.1:5002",
"service_id": "ExampleServiceId",
"organization_id": "ExampleOrganizationId",
"metering_enabled": false,
"ssl_cert": "",
"ssl_key": "",
"max_message_size_in_mb" : 4,
"daemon_type": "grpc",
"enable_dynamic_pricing":false,
"allowed_user_flag" :false,
"auto_ssl_domain": "",
"auto_ssl_cache_dir": ".certs",
"private_key": "",
"log": {
"level": "info",
"timezone": "UTC",
"formatter": {
"type": "text",
"timestamp_format": "2006-01-02T15:04:05.999Z07:00"
},
"output": {
"type": ["file", "stdout"],
"file_pattern": "./snet-daemon.%Y%m%d.log",
"current_link": "./snet-daemon.log",
"max_size_in_mb": 10,
"max_age_in_days": 7,
"rotation_count": 0
},
"hooks": []
},
"payment_channel_storage_client": {
"connection_timeout": "0s",
"request_timeout": "0s",
"hot_reload": true
},
"payment_channel_storage_server": {
"id": "storage-1",
"scheme": "http",
"host" : "127.0.0.1",
"client_port": 2379,
"peer_port": 2380,
"token": "unique-token",
"cluster": "storage-1=http://127.0.0.1:2380",
"startup_timeout": "1m",
"data_dir": "storage-data-dir-1.etcd",
"log_level": "info",
"log_outputs": ["./etcd-server.log"],
"enabled": false
},
"alerts_email": "",
"service_heartbeat_type": "http",
"token_expiry_in_minutes": 1440,
"model_training_enabled": false
}`

var testConfig = viper.New()
err := config.ReadConfigFromJsonString(testConfig, testConfigJson)
if err != nil {
zap.L().Fatal("Error in reading config")
}

config.SetVip(testConfig)
}

func startServerA(port string, h grpc.StreamHandler) (*grpc.Server, *net.Listener) {
lis, err := net.Listen("tcp", port)
if err != nil {
zap.L().Fatal("Failed to listen", zap.Error(err))
}

grpcServer := grpc.NewServer()
RegisterExampleStreamingServiceServer(grpcServer, &ServiceA{h: h})

go func() {
if err := grpcServer.Serve(lis); err != nil {
zap.L().Fatal("Failed to serve", zap.Error(err))
}
}()
return grpcServer, &lis
}

func startServerB(port string) (*grpc.Server, *net.Listener) {
lis, err := net.Listen("tcp", port)
if err != nil {
zap.L().Fatal("Failed to listen", zap.Error(err))
}

grpcServer := grpc.NewServer()
RegisterExampleStreamingServiceServer(grpcServer, &ServiceB{})

go func() {
if err := grpcServer.Serve(lis); err != nil {
zap.L().Fatal("Failed to serve", zap.Error(err))
}
}()
return grpcServer, &lis
}

func setupEnvironment() *integrationTestEnvironment {
setupTestConfig()
serviceMetadata := &blockchain.ServiceMetadata{
ServiceType: "grpc",
Encoding: "proto",
}
grpcToGrpc := handler.NewGrpcHandler(serviceMetadata)
grpcServerA, listenerA := startServerA(":5001", grpcToGrpc)
grpcServerB, listenerB := startServerB(":5002")

testEnv := &integrationTestEnvironment{
serverA: grpcServerA,
listenerA: listenerA,
serverB: grpcServerB,
listenerB: listenerB,
}

return testEnv
}

func teardownEnvironment(env *integrationTestEnvironment) {
env.serverA.Stop()
env.serverB.Stop()
(*env.listenerA).Close()
(*env.listenerB).Close()
}

type ServiceA struct {
UnimplementedExampleStreamingServiceServer
h grpc.StreamHandler
}

type ServiceB struct {
UnimplementedExampleStreamingServiceServer
}

func (s *ServiceA) Stream(stream ExampleStreamingService_StreamServer) error {
// Forward the stream to grpcToGrpc handler
err := s.h(nil, stream)
if err != nil {
return err
}
return nil
}

func (s *ServiceB) Stream(stream ExampleStreamingService_StreamServer) error {
for {
// Receive the input from the proxied stream
req, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}

// Send back the response to the stream
err = stream.Send(&OutStream{
Message: "Response from Server B: " + req.Message,
})
if err != nil {
return err
}
}
}

func TestGrpcToGrpc_StreamingIntegration(t *testing.T) {
// Setup test environment with real servers
testEnv := setupEnvironment()
defer teardownEnvironment(testEnv)

// Create a gRPC client to interact with Server A
connA, err := grpc.NewClient((*testEnv.listenerA).Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
require.NoError(t, err)
defer connA.Close()

clientA := NewExampleStreamingServiceClient(connA)

// Simulate a bidirectional streaming RPC call from clientA to Server A
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

stream, err := clientA.Stream(ctx)
require.NoError(t, err)

// Send a message from the client A to Server A
err = stream.Send(&InStream{Message: "Hello from client A"})
require.NoError(t, err)

// Receive the response from Server B (proxied by grpcToGrpc)
resp, err := stream.Recv()
require.NoError(t, err)
assert.Equal(t, "Response from Server B: Hello from client A", resp.Message)

// Close the stream
stream.CloseSend()
}
21 changes: 12 additions & 9 deletions metrics/clients_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@ package metrics

import (
"context"
"net"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
"google.golang.org/grpc"
pb "google.golang.org/grpc/health/grpc_health_v1"
"net"
"testing"
)

// server is used to implement api.HeartbeatServer
Expand Down Expand Up @@ -101,14 +102,16 @@ func (suite *ClientTestSuite) Test_callHTTPServiceHeartbeat() {
assert.True(suite.T(), err != nil)
}

func (suite *ClientTestSuite) Test_callRegisterService() {
// TODO: refactor register service

daemonID := GetDaemonID()
// func (suite *ClientTestSuite) Test_callRegisterService() {

result := callRegisterService(daemonID, suite.serviceURL+"/register")
assert.Equal(suite.T(), true, result)
// daemonID := GetDaemonID()

result = callRegisterService(daemonID, suite.serviceURL+"/registererror")
assert.Equal(suite.T(), false, result)
// result := callRegisterService(daemonID, suite.serviceURL+"/register")
// assert.Equal(suite.T(), true, result)

}
// result = callRegisterService(daemonID, suite.serviceURL+"/registererror")
// assert.Equal(suite.T(), false, result)

// }
19 changes: 11 additions & 8 deletions metrics/register_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@
package metrics

import (
"testing"

"github.com/stretchr/testify/suite"
"google.golang.org/grpc"
"testing"

"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -40,15 +41,17 @@ func TestGetDaemonID(t *testing.T) {
assert.NotEqual(t, "48d343313a1e06093c81830103b45496cc7c277f321049e9ee632fd03207d042", daemonID)
}

func (suite *RegisterTestSuite) TestRegisterDaemon() {
// TODO: refactor register service

result := RegisterDaemon(suite.serviceURL + "/register")
assert.Equal(suite.T(), true, result)
// func (suite *RegisterTestSuite) TestRegisterDaemon() {

wrongserviceURL := "https://localhost:9999/registererror"
result = RegisterDaemon(wrongserviceURL)
assert.Equal(suite.T(), false, result)
}
// result := RegisterDaemon(suite.serviceURL + "/register")
// assert.Equal(suite.T(), true, result)

// wrongserviceURL := "https://localhost:9999/registererror"
// result = RegisterDaemon(wrongserviceURL)
// assert.Equal(suite.T(), false, result)
// }

func (suite *RegisterTestSuite) TestSetDaemonGrpId() {
grpid := "group01"
Expand Down

0 comments on commit cb36641

Please sign in to comment.