Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Training v2 #609

Merged
merged 23 commits into from
Feb 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
df53a00
training v2 init
semyon-dev Dec 16, 2024
3c446bd
feat(training): add worker pool for updating validating, training sta…
elmiringos Dec 19, 2024
99d7132
test(training): add tests for updating model statuses functionality
elmiringos Dec 19, 2024
f0834c1
feat(training): add new storage type - pending model storage for mode…
elmiringos Dec 24, 2024
2376dbc
test(training): update integration tests for ManageUpdateModelStatusW…
elmiringos Dec 24, 2024
c029a46
test(training): add key serialization test, update test provider serv…
elmiringos Dec 25, 2024
6635402
new methods for train v2
semyon-dev Dec 27, 2024
4a954a8
Merge remote-tracking branch 'elmir/trainingV2' into trainingV2
semyon-dev Dec 27, 2024
fed47af
fix pending storage
semyon-dev Dec 27, 2024
ce408c9
refactor trainv2
semyon-dev Dec 27, 2024
8c90f1f
Added public model storage for public models (#12)
elmiringos Jan 9, 2025
aa2db13
Updated tests and owner verification method (#13)
elmiringos Jan 17, 2025
216ed50
add pay interceptors for training & fix streams
semyon-dev Jan 24, 2025
cc79529
Merge remote-tracking branch 'semyon/trainingV2' into trainingV2
semyon-dev Jan 24, 2025
0f7595a
fixes for training, filters for get_all_models, update go.mod
semyon-dev Feb 10, 2025
ff6ba87
fix tests
semyon-dev Feb 10, 2025
f00a818
Merge branch 'dev' into trainingV2
semyon-dev Feb 10, 2025
7714305
Update build.yml
semyon-dev Feb 10, 2025
dafa256
auth for training & fix bugs
semyon-dev Feb 20, 2025
67921df
optimize eth clients & refactor
semyon-dev Feb 20, 2025
fe19dfb
fix tests
semyon-dev Feb 20, 2025
e203f3f
fix tests
semyon-dev Feb 20, 2025
4cf3ae9
fix get_all_models
semyon-dev Feb 20, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ jobs:
- name: download and install
uses: actions/setup-go@v5
with:
go-version: '1.23.4'
go-version: '1.23.6'

- name: install protoc (protobuf)
uses: arduino/setup-protoc@v3
with:
version: "27.2"
version: "29.3"
include-pre-releases: false

- name: chmod +x
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ jobs:
- name: download and install
uses: actions/setup-go@v5
with:
go-version: '1.23.4'
go-version: '1.23.6'

- name: install protoc (protobuf)
uses: arduino/setup-protoc@v3
with:
version: "27.2"
version: "29.3"
include-pre-releases: false

- name: chmod to allow run script
Expand Down
63 changes: 31 additions & 32 deletions authutils/auth_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ import (
"encoding/hex"
"errors"
"fmt"
"github.com/singnet/snet-daemon/v5/blockchain"
"math/big"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/singnet/snet-daemon/v5/blockchain"
"go.uber.org/zap"
)

Expand All @@ -25,6 +25,18 @@ const (
AllowedBlockChainDifference = 5
)

func VerifySigner(message []byte, signature []byte, signer common.Address) error {
derivedSigner, err := GetSignerAddressFromMessage(message, signature)
if err != nil {
zap.L().Error(err.Error())
return err
}
if err = VerifyAddress(*derivedSigner, signer); err != nil {
return err
}
return nil
}

func GetSignerAddressFromMessage(message, signature []byte) (signer *common.Address, err error) {
messageFieldLog := zap.String("message", blockchain.BytesToBase64(message))
signatureFieldLog := zap.String("signature", blockchain.BytesToBase64(signature))
Expand Down Expand Up @@ -53,33 +65,33 @@ func GetSignerAddressFromMessage(message, signature []byte) (signer *common.Addr
messageHashFieldLog)
return nil, errors.New("incorrect signature data")
}
publicKeyFieldLog := zap.Any("publicKey", publicKey)
//publicKeyFieldLog := zap.Any("publicKey", publicKey)

keyOwnerAddress := crypto.PubkeyToAddress(*publicKey)
keyOwnerAddressFieldLog := zap.Any("keyOwnerAddress", keyOwnerAddress)
zap.L().Debug("Message signature parsed",
messageFieldLog,
signatureFieldLog,
messageHashFieldLog,
publicKeyFieldLog,
//messageFieldLog,
//signatureFieldLog,
//messageHashFieldLog,
//publicKeyFieldLog,
keyOwnerAddressFieldLog)

return &keyOwnerAddress, nil
}

// VerifySigner Verify the signature done by given singer or not
// returns nil if signer indeed sign the message and signature proves it, if not throws an error
func VerifySigner(message []byte, signature []byte, signer common.Address) error {
signerFromMessage, err := GetSignerAddressFromMessage(message, signature)
if err != nil {
zap.L().Error("error from getSignerAddressFromMessage", zap.Error(err))
return err
}
if signerFromMessage.String() == signer.String() {
return nil
}
return fmt.Errorf("incorrect signer")
}
//func VerifySigner(message []byte, signature []byte, signer common.Address) error {
// signerFromMessage, err := GetSignerAddressFromMessage(message, signature)
// if err != nil {
// zap.L().Error("error from getSignerAddressFromMessage", zap.Error(err))
// return err
// }
// if signerFromMessage.String() == signer.String() {
// return nil
// }
// return fmt.Errorf("incorrect signer")
//}

// CompareWithLatestBlockNumber Check if the block number passed is not more +- 5 from the latest block number on chain
func CompareWithLatestBlockNumber(blockNumberPassed *big.Int) error {
Expand All @@ -94,22 +106,9 @@ func CompareWithLatestBlockNumber(blockNumberPassed *big.Int) error {
return nil
}

// CheckIfTokenHasExpired Check if the block number ( date on which the token was issued is not more than 1 month)
func CheckIfTokenHasExpired(expiredBlock *big.Int) error {
currentBlockNumber, err := CurrentBlock()
if err != nil {
return err
}

if expiredBlock.Cmp(currentBlockNumber) < 0 {
return fmt.Errorf("authentication failed as the Free Call Token passed has expired")
}
return nil
}

// CurrentBlock Get the current block number from on chain
func CurrentBlock() (*big.Int, error) {
if ethHttpClient, _, err := blockchain.CreateEthereumClients(); err != nil {
if ethHttpClient, err := blockchain.CreateHTTPEthereumClient(); err != nil {
return nil, err
} else {
defer ethHttpClient.RawClient.Close()
Expand Down Expand Up @@ -138,7 +137,7 @@ func GetSignature(message []byte, privateKey *ecdsa.PrivateKey) (signature []byt

signature, err := crypto.Sign(hash, privateKey)
if err != nil {
panic(fmt.Sprintf("Cannot sign test message: %v", err))
zap.L().Fatal(fmt.Sprintf("Cannot sign test message: %v", err))
}

return signature
Expand Down
19 changes: 2 additions & 17 deletions authutils/auth_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,10 @@ package authutils

import (
"github.com/ethereum/go-ethereum/common"
"math/big"
"testing"
"time"

"github.com/singnet/snet-daemon/v5/config"
"github.com/stretchr/testify/assert"
"math/big"
"testing"
)

func TestCompareWithLatestBlockNumber(t *testing.T) {
Expand All @@ -21,19 +19,6 @@ func TestCompareWithLatestBlockNumber(t *testing.T) {
currentBlockNum, _ = CurrentBlock()
err = CompareWithLatestBlockNumber(currentBlockNum.Add(currentBlockNum, big.NewInt(1)))
assert.Equal(t, nil, err)

}

func TestCheckAllowedBlockDifferenceForToken(t *testing.T) {
config.Vip().Set(config.BlockChainNetworkSelected, "sepolia")
config.Validate()
currentBlockNum, _ := CurrentBlock()
err := CheckIfTokenHasExpired(currentBlockNum.Sub(currentBlockNum, big.NewInt(20000)))
assert.Equal(t, err.Error(), "authentication failed as the Free Call Token passed has expired")
time.Sleep(250 * time.Millisecond) // because of HTTP 429 Too Many Requests
currentBlockNum, _ = CurrentBlock()
err = CheckIfTokenHasExpired(currentBlockNum.Add(currentBlockNum, big.NewInt(20)))
assert.Equal(t, nil, err)
}

func TestVerifyAddress(t *testing.T) {
Expand Down
40 changes: 27 additions & 13 deletions blockchain/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,18 +56,14 @@ func NewProcessor(metadata *ServiceMetadata) (Processor, error) {
}

// Setup ethereum client

if ethHttpClients, ethWSClients, err := CreateEthereumClients(); err != nil {
if ethHttpClients, err := CreateEthereumClient(); err != nil {
return p, errors.Wrap(err, "error creating RPC client")
} else {
p.rawHttpClient = ethHttpClients.RawClient
p.ethHttpClient = ethHttpClients.EthClient
p.rawWSClient = ethWSClients.RawClient
p.ethWSClient = ethWSClients.EthClient
}

// TODO: if address is not in config, try to load it using network

//TODO: Read this from github

p.escrowContractAddress = metadata.GetMpeAddress()
Expand All @@ -92,6 +88,13 @@ func (processor *Processor) ReconnectToWsClient() error {

zap.L().Debug("Try to reconnect to websocket client")

return processor.ConnectToWsClient()
}

func (processor *Processor) ConnectToWsClient() error {

zap.L().Debug("Try to connect to websocket client")

newEthWSClients, err := CreateWSEthereumClient()
if err != nil {
return err
Expand Down Expand Up @@ -124,17 +127,24 @@ func (processor *Processor) GetEthWSClient() *ethclient.Client {
}

func (processor *Processor) CurrentBlock() (currentBlock *big.Int, err error) {
// We have to do a raw call because the standard method of ethClient.HeaderByNumber(ctx, nil) errors on
// unmarshaling the response currently. See https://github.com/ethereum/go-ethereum/issues/3230
var currentBlockHex string
if err = processor.rawHttpClient.CallContext(context.Background(), &currentBlockHex, "eth_blockNumber"); err != nil {
latestBlock, err := processor.ethHttpClient.BlockNumber(context.Background())
if err != nil {
zap.L().Error("error determining current block", zap.Error(err))
return nil, fmt.Errorf("error determining current block: %v", err)
}
return new(big.Int).SetUint64(latestBlock), nil
}

currentBlockBytes := common.FromHex(currentBlockHex)
currentBlock = new(big.Int).SetBytes(currentBlockBytes)
func (processor *Processor) CompareWithLatestBlockNumber(blockNumberPassed *big.Int, allowedBlockChainDifference uint64) (err error) {
latestBlockNumber, err := processor.CurrentBlock()
if err != nil {
return err
}

differenceInBlockNumber := blockNumberPassed.Sub(blockNumberPassed, latestBlockNumber)
if differenceInBlockNumber.Abs(differenceInBlockNumber).Uint64() > allowedBlockChainDifference {
return fmt.Errorf("authentication failed as the signature passed has expired")
}
return
}

Expand All @@ -145,6 +155,10 @@ func (processor *Processor) HasIdentity() bool {
func (processor *Processor) Close() {
processor.ethHttpClient.Close()
processor.rawHttpClient.Close()
processor.ethWSClient.Close()
processor.rawWSClient.Close()
if processor.ethWSClient != nil {
processor.ethWSClient.Close()
}
if processor.rawWSClient != nil {
processor.rawWSClient.Close()
}
}
64 changes: 64 additions & 0 deletions blockchain/blockchain_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package blockchain

import (
"github.com/singnet/snet-daemon/v5/config"
"math/big"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
)

const metadataJson = "{\n \"version\": 1,\n \"display_name\": \"semyon_dev\",\n \"encoding\": \"proto\",\n \"service_type\": \"grpc\",\n \"service_api_source\": \"ipfs://QmV9bBsLAZfXGibdU3isPwDn8SxAPkqg6YzcKamSCxnBCR\",\n \"mpe_address\": \"0x7E0aF8988DF45B824b2E0e0A87c6196897744970\",\n \"groups\": [\n {\n \"group_name\": \"default_group\",\n \"endpoints\": [\n \"http://localhost:7000\"\n ],\n \"pricing\": [\n {\n \"price_model\": \"fixed_price\",\n \"price_in_cogs\": 1,\n \"default\": true\n }\n ],\n \"group_id\": \"FtNuizEOUsVCd5f2Fij9soehtRSb58LlTePgkVnsgVI=\",\n \"free_call_signer_address\": \"0x747155e03c892B8b311B7Cfbb920664E8c6792fA\",\n \"free_calls\": 25,\n \"daemon_addresses\": [\n \"0x747155e03c892B8b311B7Cfbb920664E8c6792fA\"\n ]\n },\n {\n \"group_name\": \"not_default\",\n \"endpoints\": [\n \"http://localhost:7000\"\n ],\n \"pricing\": [\n {\n \"price_model\": \"fixed_price\",\n \"price_in_cogs\": 1,\n \"default\": true\n }\n ],\n \"group_id\": \"udN0SLIvsDdvQQe3Ltv/NwqCh7sPKdz4scYmlI7AMdE=\",\n \"free_call_signer_address\": \"0x747155e03c892B8b311B7Cfbb920664E8c6792fA\",\n \"free_calls\": 35,\n \"daemon_addresses\": [\n \"0x747155e03c892B8b311B7Cfbb920664E8c6792fA\"\n ]\n }\n ],\n \"assets\": {},\n \"media\": [],\n \"tags\": [],\n \"service_description\": {\n \"description\": \"Test service with localhost endpoint!\",\n \"url\": \"\"\n }\n}"

// ProcessorTestSuite is a test suite for the Processor struct
type ProcessorTestSuite struct {
suite.Suite
processor *Processor
}

// SetupSuite initializes the Ethereum client before running the tests
func (suite *ProcessorTestSuite) SetupSuite() {
config.Vip().Set(config.BlockchainEnabledKey, true)
config.Vip().Set(config.BlockChainNetworkSelected, "sepolia")
config.Validate()
srv, err := InitServiceMetaDataFromJson([]byte(metadataJson))
if err != nil {
return
}
p, err := NewProcessor(srv)
assert.Nil(suite.T(), err)
suite.processor = &p
}

// ✅ Test: If the block number difference is within the allowed limit → no error
func (suite *ProcessorTestSuite) TestCompareWithLatestBlockNumber_WithinLimit() {
latestBlock, err := suite.processor.CurrentBlock()
suite.Require().NoError(err, "CurrentBlock() should not return an error")

// Simulate a block number within the allowed range (+2)
blockNumberPassed := new(big.Int).Add(latestBlock, big.NewInt(2))
err = suite.processor.CompareWithLatestBlockNumber(blockNumberPassed, 5)

// Expect no error
assert.NoError(suite.T(), err, "Expected no error when block difference is within the limit")
}

// ❌ Test: If the block number difference exceeds the allowed limit → return an error
func (suite *ProcessorTestSuite) TestCompareWithLatestBlockNumber_ExceedsLimit() {
latestBlock, err := suite.processor.CurrentBlock()
suite.Require().NoError(err, "CurrentBlock() should not return an error")

// Simulate a block number exceeding the allowed limit (+10)
blockNumberPassed := new(big.Int).Add(latestBlock, big.NewInt(10))
err = suite.processor.CompareWithLatestBlockNumber(blockNumberPassed, 5)

// Expect an error
assert.Error(suite.T(), err, "Expected an error when block difference exceeds the limit")
assert.Contains(suite.T(), err.Error(), "authentication failed", "Error message should indicate signature expiration")
}

// Run the test suite
func TestProcessorTestSuite(t *testing.T) {
suite.Run(t, new(ProcessorTestSuite))
}
12 changes: 3 additions & 9 deletions blockchain/ethereumClient.go → blockchain/ethereum_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,12 @@ func basicAuth(username, password string) string {
return base64.StdEncoding.EncodeToString([]byte(auth))
}

func CreateEthereumClients() (*EthereumClient, *EthereumClient, error) {
func CreateEthereumClient() (*EthereumClient, error) {
ethereumHttpClient, err := CreateHTTPEthereumClient()
if err != nil {
return nil, nil, err
return nil, err
}

ethereumWsClient, err := CreateWSEthereumClient()
if err != nil {
return nil, nil, err
}

return ethereumHttpClient, ethereumWsClient, nil
return ethereumHttpClient, nil
}

func CreateHTTPEthereumClient() (*EthereumClient, error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,13 +184,13 @@ func GetOrganizationMetaDataFromIPFS(hash string) (*OrganizationMetaData, error)
}

func getMetaDataURI() []byte {
//Block chain call here to get the hash of the metadata for the given Organization
// Blockchain call here to get the hash of the metadata for the given Organization
reg := getRegistryCaller()
orgId := StringToBytes32(config.GetString(config.OrganizationId))

organizationRegistered, err := reg.GetOrganizationById(nil, orgId)
if err != nil || !organizationRegistered.Found {
zap.L().Panic("Error Retrieving contract details for the Given Organization", zap.String("OrganizationId", config.GetString(config.OrganizationId)), zap.Error(err))
zap.L().Panic("Error Retrieving contract details for the Given Organization, recheck blockchain provider endpoint", zap.String("OrganizationId", config.GetString(config.OrganizationId)), zap.Error(err))
}
return organizationRegistered.OrgMetadataURI[:]
}
Expand Down
File renamed without changes.
Loading
Loading