Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tests for ifs utils and add hot reloading for etcd client #589

Merged
merged 9 commits into from
Sep 11, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,6 @@ storage-*
*log
data.etcd/

# vscode
.vscode

6 changes: 3 additions & 3 deletions authutils/auth_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,12 @@ func CheckIfTokenHasExpired(expiredBlock *big.Int) error {

// Get the current block number from on chain
func CurrentBlock() (*big.Int, error) {
if ethClient, err := blockchain.GetEthereumClient(); err != nil {
if ethHttpClient, _, err := blockchain.CreateEthereumClients(); err != nil {
return nil, err
} else {
defer ethClient.RawClient.Close()
defer ethHttpClient.RawClient.Close()
var currentBlockHex string
if err = ethClient.RawClient.CallContext(context.Background(), &currentBlockHex, "eth_blockNumber"); err != nil {
if err = ethHttpClient.RawClient.CallContext(context.Background(), &currentBlockHex, "eth_blockNumber"); err != nil {
zap.L().Error("error determining current block", zap.Error(err))
return nil, fmt.Errorf("error determining current block: %v", err)
}
Expand Down
49 changes: 40 additions & 9 deletions blockchain/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ type jobInfo struct {

type Processor struct {
enabled bool
ethClient *ethclient.Client
rawClient *rpc.Client
ethHttpClient *ethclient.Client
rawHttpClient *rpc.Client
ethWSClient *ethclient.Client
rawWSClient *rpc.Client
sigHasher func([]byte) []byte
privateKey *ecdsa.PrivateKey
address string
Expand All @@ -55,11 +57,13 @@ func NewProcessor(metadata *ServiceMetadata) (Processor, error) {

// Setup ethereum client

if ethclients, err := GetEthereumClient(); err != nil {
if ethHttpClients, ethWSClients, err := CreateEthereumClients(); err != nil {
return p, errors.Wrap(err, "error creating RPC client")
} else {
p.rawClient = ethclients.RawClient
p.ethClient = ethclients.EthClient
p.rawHttpClient = ethHttpClients.RawClient
p.ethHttpClient = ethHttpClients.EthClient
p.rawWSClient = ethWSClients.RawClient
p.ethWSClient = ethWSClients.EthClient
}

// TODO: if address is not in config, try to load it using network
Expand All @@ -68,7 +72,7 @@ func NewProcessor(metadata *ServiceMetadata) (Processor, error) {

p.escrowContractAddress = metadata.GetMpeAddress()

if mpe, err := NewMultiPartyEscrow(p.escrowContractAddress, p.ethClient); err != nil {
if mpe, err := NewMultiPartyEscrow(p.escrowContractAddress, p.ethHttpClient); err != nil {
return p, errors.Wrap(err, "error instantiating MultiPartyEscrow contract")
} else {
p.multiPartyEscrow = mpe
Expand All @@ -82,6 +86,23 @@ func NewProcessor(metadata *ServiceMetadata) (Processor, error) {
return p, nil
}

func (processor *Processor) ReconnectToWsClient() error {
processor.ethWSClient.Close()
processor.rawHttpClient.Close()

zap.L().Debug("Try to reconnect to websocket client")

newEthWSClients, err := CreateWSEthereumClient()
if err != nil {
return err
}

processor.ethWSClient = newEthWSClients.EthClient
processor.rawWSClient = newEthWSClients.RawClient

return nil
}

func (processor *Processor) Enabled() (enabled bool) {
return processor.enabled
}
Expand All @@ -94,11 +115,19 @@ func (processor *Processor) MultiPartyEscrow() *MultiPartyEscrow {
return processor.multiPartyEscrow
}

func (processor *Processor) GetEthHttpClient() *ethclient.Client {
return processor.ethHttpClient
}

func (processor *Processor) GetEthWSClient() *ethclient.Client {
return processor.ethWSClient
}

func (processor *Processor) CurrentBlock() (currentBlock *big.Int, err error) {
// We have to do a raw call because the standard method of ethClient.HeaderByNumber(ctx, nil) errors on
// unmarshaling the response currently. See https://github.com/ethereum/go-ethereum/issues/3230
var currentBlockHex string
if err = processor.rawClient.CallContext(context.Background(), &currentBlockHex, "eth_blockNumber"); err != nil {
if err = processor.rawHttpClient.CallContext(context.Background(), &currentBlockHex, "eth_blockNumber"); err != nil {
zap.L().Error("error determining current block", zap.Error(err))
return nil, fmt.Errorf("error determining current block: %v", err)
}
Expand All @@ -114,6 +143,8 @@ func (processor *Processor) HasIdentity() bool {
}

func (processor *Processor) Close() {
processor.ethClient.Close()
processor.rawClient.Close()
processor.ethHttpClient.Close()
processor.rawHttpClient.Close()
processor.ethWSClient.Close()
processor.rawWSClient.Close()
}
50 changes: 40 additions & 10 deletions blockchain/ethereumClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ package blockchain
import (
"context"
"encoding/base64"

"github.com/singnet/snet-daemon/config"

"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/rpc"
"github.com/pkg/errors"
"github.com/singnet/snet-daemon/config"
)

type EthereumClient struct {
Expand All @@ -19,21 +21,49 @@ func basicAuth(username, password string) string {
return base64.StdEncoding.EncodeToString([]byte(auth))
}

func GetEthereumClient() (*EthereumClient, error) {
func CreateEthereumClients() (*EthereumClient, *EthereumClient, error) {
ethereumHttpClient, err := CreateHTTPEthereumClient()
if err != nil {
return nil, nil, err
}

ethereumWsClient, err := CreateWSEthereumClient()
if err != nil {
return nil, nil, err
}

return ethereumHttpClient, ethereumWsClient, nil
}

ethereumClient := new(EthereumClient)
if client, err := rpc.DialOptions(context.Background(),
config.GetBlockChainEndPoint(),
rpc.WithHeader("Authorization", "Basic "+basicAuth("", config.GetString(config.BlockchainProviderApiKey)))); err != nil {
func CreateHTTPEthereumClient() (*EthereumClient, error) {
ethereumHttpClient := new(EthereumClient)
httpClient, err := rpc.DialOptions(
context.Background(),
config.GetBlockChainHTTPEndPoint(),
rpc.WithHeader("Authorization", "Basic "+basicAuth("", config.GetString(config.BlockchainProviderApiKey))))
if err != nil {
return nil, errors.Wrap(err, "error creating RPC client")
} else {
ethereumClient.RawClient = client
ethereumClient.EthClient = ethclient.NewClient(client)
}

return ethereumClient, nil
ethereumHttpClient.RawClient = httpClient
ethereumHttpClient.EthClient = ethclient.NewClient(httpClient)
return ethereumHttpClient, nil
}

func CreateWSEthereumClient() (*EthereumClient, error) {
ethereumWsClient := new(EthereumClient)
wsClient, err := rpc.DialOptions(
context.Background(),
config.GetBlockChainWSEndPoint(),
rpc.WithHeader("Authorization", "Basic "+basicAuth("", config.GetString(config.BlockchainProviderApiKey))))
if err != nil {
return nil, errors.Wrap(err, "error creating RPC WebSocket client")
}
ethereumWsClient.RawClient = wsClient
ethereumWsClient.EthClient = ethclient.NewClient(wsClient)
return ethereumWsClient, nil
}

func (ethereumClient *EthereumClient) Close() {
if ethereumClient != nil {
ethereumClient.EthClient.Close()
Expand Down
8 changes: 4 additions & 4 deletions blockchain/orginzationMetadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func InitOrganizationMetaDataFromJson(jsonData string) (metaData *OrganizationMe
return nil, err
}
if err = checkMandatoryFields(metaData); err != nil {
zap.L().Error("Error in check mdandatory fields", zap.Error(err))
zap.L().Error("Error in check mandatory fields", zap.Error(err))
return nil, err
}

Expand All @@ -125,10 +125,10 @@ func checkMandatoryFields(metaData *OrganizationMetaData) (err error) {
if metaData.daemonGroup.PaymentDetails.PaymentChannelStorageClient.Endpoints == nil {
err = fmt.Errorf("Mandatory field : ETCD Client Endpoints are mising for the Group %v ", metaData.daemonGroup.GroupName)
}
if &metaData.recipientPaymentAddress == nil {
if metaData.recipientPaymentAddress == (common.Address{}) {
err = fmt.Errorf("Mandatory field : Recepient Address is missing for the Group %v ", metaData.daemonGroup.GroupName)
}
return
return err
}

func setDerivedAttributes(metaData *OrganizationMetaData) (err error) {
Expand Down Expand Up @@ -187,7 +187,7 @@ func getMetaDataURI() []byte {

organizationRegistered, err := reg.GetOrganizationById(nil, orgId)
if err != nil || !organizationRegistered.Found {
zap.L().Panic("Error Retrieving contract details for the Given Organization", zap.String("OrganizationId", config.GetString(config.OrganizationId)))
zap.L().Panic("Error Retrieving contract details for the Given Organization", zap.String("OrganizationId", config.GetString(config.OrganizationId)), zap.Error(err))
}
return organizationRegistered.OrgMetadataURI[:]
}
Expand Down
25 changes: 22 additions & 3 deletions blockchain/serviceMetadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/bufbuild/protocompile"
pproto "github.com/emicklei/proto"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/pkg/errors"
"github.com/singnet/snet-daemon/config"
"github.com/singnet/snet-daemon/ipfsutils"
Expand Down Expand Up @@ -277,13 +278,31 @@ func ReadServiceMetaDataFromLocalFile(filename string) (*ServiceMetadata, error)
}

func getRegistryCaller() (reg *RegistryCaller) {
ethClient, err := GetEthereumClient()
ethHttpClient, err := CreateHTTPEthereumClient()
if err != nil {
zap.L().Panic("Unable to get Blockchain client ", zap.Error(err))
}
defer ethClient.Close()
defer ethHttpClient.Close()
registryContractAddress := getRegistryAddressKey()
reg, err = NewRegistryCaller(registryContractAddress, ethClient.EthClient)
reg, err = NewRegistryCaller(registryContractAddress, ethHttpClient.EthClient)
if err != nil {
zap.L().Panic("Error instantiating Registry contract for the given Contract Address", zap.Error(err), zap.Any("registryContractAddress", registryContractAddress))
}
return reg
}

func GetRegistryCaller(ethHttpClient *ethclient.Client) *RegistryCaller {
registryContractAddress := getRegistryAddressKey()
reg, err := NewRegistryCaller(registryContractAddress, ethHttpClient)
if err != nil {
zap.L().Panic("Error instantiating Registry contract for the given Contract Address", zap.Error(err), zap.Any("registryContractAddress", registryContractAddress))
}
return reg
}

func GetRegistryFilterer(ethWsClient *ethclient.Client) *RegistryFilterer {
registryContractAddress := getRegistryAddressKey()
reg, err := NewRegistryFilterer(registryContractAddress, ethWsClient)
if err != nil {
zap.L().Panic("Error instantiating Registry contract for the given Contract Address", zap.Error(err), zap.Any("registryContractAddress", registryContractAddress))
}
Expand Down
18 changes: 18 additions & 0 deletions blockchain/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,21 @@ func ToChecksumAddress(hexAddress string) string {
mixedAddress := common.NewMixedcaseAddress(address)
return mixedAddress.Address().String()
}

/*
MakeTopicFilterer is used to generate a filter for querying Ethereum logs or contract events.
Ethereum topics (such as for events) are 32-byte fixed-size values (common for hashing
in Ethereum logs). This function takes a string parameter, converts it into a 32-byte array,
and returns it in a slice. This allows developers to create filters when looking for
specific events or log entries based on the topic.
*/
func MakeTopicFilterer(param string) [][32]byte {
// Create a 32-byte array
var param32Byte [32]byte

// Convert the string to a byte slice and copy up to 32 bytes
copy(param32Byte[:], []byte(param)[:min(len(param), 32)])

// Return the filter with a single element (the 32-byte array)
return [][32]byte{param32Byte}
}
39 changes: 24 additions & 15 deletions config/blockchain_network_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,19 @@ import (
)

type NetworkSelected struct {
NetworkName string
EthereumJSONRPCEndpoint string
NetworkId string
RegistryAddressKey string
NetworkName string
EthereumJSONRPCHTTPEndpoint string
EthereumJSONRPCWSEndpoint string
NetworkId string
RegistryAddressKey string
}

const (
BlockChainNetworkFileName = "resources/blockchain_network_config.json"
EthereumJsonRpcEndpointKey = "ethereum_json_rpc_endpoint"
NetworkId = "network_id"
RegistryAddressKey = "registry_address_key"
BlockChainNetworkFileName = "resources/blockchain_network_config.json"
EthereumJsonRpcHTTPEndpointKey = "ethereum_json_rpc_http_endpoint"
EthereumJsonRpcWSEndpointKey = "ethereum_json_rpc_ws_endpoint"
NetworkId = "network_id"
RegistryAddressKey = "registry_address_key"
)

var networkSelected = &NetworkSelected{}
Expand All @@ -38,7 +40,8 @@ func determineNetworkSelected(data []byte) (err error) {
//Ethereum End point and Network ID mapped to
networkSelected.NetworkName = networkName
networkSelected.RegistryAddressKey = getDetailsFromJsonOrConfig(dynamicBinding[networkName].(map[string]any)[RegistryAddressKey], RegistryAddressKey)
networkSelected.EthereumJSONRPCEndpoint = getDetailsFromJsonOrConfig(dynamicBinding[networkName].(map[string]any)[EthereumJsonRpcEndpointKey], EthereumJsonRpcEndpointKey)
networkSelected.EthereumJSONRPCHTTPEndpoint = getDetailsFromJsonOrConfig(dynamicBinding[networkName].(map[string]any)[EthereumJsonRpcHTTPEndpointKey], EthereumJsonRpcHTTPEndpointKey)
networkSelected.EthereumJSONRPCWSEndpoint = getDetailsFromJsonOrConfig(dynamicBinding[networkName].(map[string]any)[EthereumJsonRpcWSEndpointKey], EthereumJsonRpcWSEndpointKey)
networkSelected.NetworkId = fmt.Sprintf("%v", dynamicBinding[networkName].(map[string]any)[NetworkId])

return err
Expand All @@ -61,9 +64,13 @@ func GetNetworkId() string {
return networkSelected.NetworkId
}

// Get the block chain end point associated with the Network selected
func GetBlockChainEndPoint() string {
return networkSelected.EthereumJSONRPCEndpoint
// Get the blockchain endpoint associated with the Network selected
func GetBlockChainHTTPEndPoint() string {
return networkSelected.EthereumJSONRPCHTTPEndpoint
}

func GetBlockChainWSEndPoint() string {
return networkSelected.EthereumJSONRPCWSEndpoint
}

// Get the Registry address of the contract
Expand Down Expand Up @@ -110,9 +117,11 @@ func deriveDatafromJSON(data []byte) (err error) {
networkSelected.RegistryAddressKey = fmt.Sprintf("%v", m[GetNetworkId()].(map[string]any)["address"])

zap.L().Info("Derive data from JSON", zap.String("Network", GetString(BlockChainNetworkSelected)),
zap.String("NetwrokId", GetNetworkId()),
zap.String("RegistryAddress", GetRegistryAddress()),
zap.String("Blockchain endpoint", GetBlockChainEndPoint()))
zap.String("Netwrok id", GetNetworkId()),
zap.String("Registry address", GetRegistryAddress()),
zap.String("Blockchain http endpoint", GetBlockChainHTTPEndPoint()),
zap.String("Blockchain ws endpoint", GetBlockChainWSEndPoint()),
)
return nil
}

Expand Down
Loading