Skip to content

Commit 22273c3

Browse files
committed
Add SBFT test facilities
Change-Id: I89e203db13cb0d77f3264377604e492d8e75a107 Signed-off-by: Gabor Hosszu <[email protected]>
1 parent f2a4bcb commit 22273c3

File tree

5 files changed

+322
-8
lines changed

5 files changed

+322
-8
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
/*
2+
Copyright Digital Asset Holdings, LLC 2016 All Rights Reserved.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package main
18+
19+
import (
20+
"fmt"
21+
cb "github.com/hyperledger/fabric/protos/common"
22+
ab "github.com/hyperledger/fabric/protos/orderer"
23+
"github.com/op/go-logging"
24+
"github.com/golang/protobuf/proto"
25+
"golang.org/x/net/context"
26+
"google.golang.org/grpc"
27+
"time"
28+
)
29+
30+
var logger = logging.MustGetLogger("sbft_test")
31+
32+
var UPDATE byte = 0
33+
var SEND byte = 1
34+
35+
var NEEDED_UPDATES = 2
36+
var NEEDED_SENT = 1
37+
38+
func main() {
39+
logger.Info("Creating an Atomic Broadcast GRPC connection.")
40+
timeout := 4 * time.Second
41+
clientconn, err := grpc.Dial(":7101", grpc.WithBlock(), grpc.WithTimeout(timeout), grpc.WithInsecure())
42+
if err != nil {
43+
logger.Errorf("Failed to connect to GRPC: %s", err)
44+
return
45+
}
46+
client := ab.NewAtomicBroadcastClient(clientconn)
47+
48+
resultch := make(chan byte)
49+
errorch := make(chan error)
50+
51+
logger.Info("Starting a goroutine waiting for ledger updates.")
52+
go updateReceiver(resultch, errorch, client)
53+
54+
logger.Info("Starting a single broadcast sender goroutine.")
55+
go broadcastSender(resultch, errorch, client)
56+
57+
checkResults(resultch, errorch)
58+
}
59+
60+
func checkResults(resultch chan byte, errorch chan error) {
61+
l := len(errorch)
62+
for i := 0; i < l; i++ {
63+
errres := <-errorch
64+
logger.Error(errres)
65+
}
66+
67+
updates := 0
68+
sentBroadcast := 0
69+
for i := 0; i < 3; i++ {
70+
select {
71+
case result := <-resultch:
72+
switch result {
73+
case UPDATE:
74+
updates++
75+
case SEND:
76+
sentBroadcast++
77+
}
78+
case <-time.After(30 * time.Second):
79+
continue
80+
}
81+
}
82+
if updates != NEEDED_UPDATES {
83+
logger.Errorf("We did not get all the ledger updates.")
84+
} else if sentBroadcast != NEEDED_SENT {
85+
logger.Errorf("We were unable to send all the broadcasts.")
86+
} else {
87+
logger.Info("Successfully sent and received everything.")
88+
}
89+
}
90+
91+
func updateReceiver(resultch chan byte, errorch chan error, client ab.AtomicBroadcastClient) {
92+
logger.Info("{Update Receiver} Creating a ledger update delivery stream.")
93+
dstream, err := client.Deliver(context.Background())
94+
if err != nil {
95+
errorch <- fmt.Errorf("Failed to get Deliver stream: %s", err)
96+
return
97+
}
98+
dstream.Send(&ab.DeliverUpdate{Type: &ab.DeliverUpdate_Seek{Seek: &ab.SeekInfo{Start: ab.SeekInfo_NEWEST, WindowSize: 10}}})
99+
logger.Info("{Update Receiver} Listening to ledger updates.")
100+
for i := 0; i < 2; i++ {
101+
m, inerr := dstream.Recv()
102+
if inerr != nil {
103+
errorch <- fmt.Errorf("Failed to receive consensus: %s", inerr)
104+
return
105+
}
106+
b := m.Type.(*ab.DeliverResponse_Block)
107+
logger.Info("{Update Receiver} Received a ledger update.")
108+
for i, tx := range b.Block.Data.Data {
109+
pl := &cb.Payload{}
110+
e := &cb.Envelope{}
111+
merr1 := proto.Unmarshal(tx, e)
112+
merr2 := proto.Unmarshal(e.Payload, pl)
113+
if merr1 == nil && merr2 == nil {
114+
logger.Infof("{Update Receiver} %d - %v", i+1, pl.Data)
115+
}
116+
}
117+
resultch <- UPDATE
118+
}
119+
logger.Info("{Update Receiver} Exiting...")
120+
}
121+
122+
func broadcastSender(resultch chan byte, errorch chan error, client ab.AtomicBroadcastClient) {
123+
logger.Info("{Broadcast Sender} Waiting before sending.")
124+
<-time.After(5 * time.Second)
125+
bstream, err := client.Broadcast(context.Background())
126+
if err != nil {
127+
errorch <- fmt.Errorf("Failed to get broadcast stream: %s", err)
128+
return
129+
}
130+
bs := []byte{0, 1, 2, 3}
131+
pl := &cb.Payload{Data: bs}
132+
mpl, err := proto.Marshal(pl)
133+
if err != nil {
134+
panic("Failed to marshal payload.")
135+
}
136+
bstream.Send(&cb.Envelope{Payload: mpl})
137+
logger.Infof("{Broadcast Sender} Broadcast sent: %v", bs)
138+
logger.Info("{Broadcast Sender} Exiting...")
139+
resultch <- SEND
140+
}

orderer/sbft/backend/backendab.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ package backend
1818

1919
import (
2020
"github.com/golang/protobuf/proto"
21-
ab "github.com/hyperledger/fabric/protos/orderer"
2221
"github.com/hyperledger/fabric/orderer/solo"
22+
ab "github.com/hyperledger/fabric/protos/orderer"
2323
)
2424

2525
type BackendAB struct {
@@ -51,7 +51,7 @@ func (b *BackendAB) Broadcast(srv ab.AtomicBroadcast_BroadcastServer) error {
5151
}
5252
req, err := proto.Marshal(envelope)
5353
if err != nil {
54-
return err
54+
panic(err)
5555
}
5656
b.backend.enqueueRequest(req)
5757
err = srv.Send(&ab.BroadcastResponse{ab.Status_SUCCESS})

orderer/sbft/local-deploy.sh

+3-3
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,9 @@ cat > config.json <<EOF
4949
"consensus": {
5050
"n" : $count,
5151
"f" : $fail,
52-
"batch_size_bytes" : 3,
53-
"batch_duration_nsec" : 0,
54-
"request_timeout_nsec" : 2
52+
"batch_size_bytes" : 1000,
53+
"batch_duration_nsec" : 1000000000,
54+
"request_timeout_nsec" : 1000000000
5555
},
5656
"peers": [${peerconf}]
5757
}

orderer/sbft/sbft_test.go

+174
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
/*
2+
Copyright Digital Asset Holdings, LLC 2016 All Rights Reserved.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package main
18+
19+
import (
20+
"fmt"
21+
"io/ioutil"
22+
"os"
23+
"testing"
24+
"time"
25+
26+
"github.com/golang/protobuf/proto"
27+
cb "github.com/hyperledger/fabric/protos/common"
28+
ab "github.com/hyperledger/fabric/protos/orderer"
29+
"github.com/op/go-logging"
30+
"golang.org/x/net/context"
31+
"google.golang.org/grpc"
32+
)
33+
34+
var logger = logging.MustGetLogger("sbft_test")
35+
36+
var UPDATE byte = 0
37+
var SEND byte = 1
38+
39+
var NEEDED_UPDATES = 2
40+
var NEEDED_SENT = 1
41+
42+
func TestSbftPeer(t *testing.T) {
43+
tempDir, err := ioutil.TempDir("", "sbft_test")
44+
if err != nil {
45+
panic("Failed to create a temporary directory")
46+
}
47+
// We only need the path as the directory will be created
48+
// by the peer - TODO: modify sbft to tolerate an existing
49+
// directory
50+
os.RemoveAll(tempDir)
51+
defer func() {
52+
os.RemoveAll(tempDir)
53+
}()
54+
c := flags{init: "testdata/config.json",
55+
listenAddr: ":6101",
56+
grpcAddr: ":7101",
57+
certFile: "testdata/cert1.pem",
58+
keyFile: "testdata/key.pem",
59+
dataDir: tempDir}
60+
61+
logger.Info("Initialization of instance.")
62+
err = initInstance(c)
63+
if err != nil {
64+
t.Errorf("Initialization failed: %s", err)
65+
return
66+
}
67+
logging.SetLevel(logging.DEBUG, "")
68+
69+
logger.Info("Starting an instance in the background.")
70+
go serve(c)
71+
<-time.After(5 * time.Second)
72+
73+
logger.Info("Creating an Atomic Broadcast GRPC connection.")
74+
timeout := 4 * time.Second
75+
clientconn, err := grpc.Dial(":7101", grpc.WithBlock(), grpc.WithTimeout(timeout), grpc.WithInsecure())
76+
if err != nil {
77+
t.Errorf("Failed to connect to GRPC: %s", err)
78+
return
79+
}
80+
client := ab.NewAtomicBroadcastClient(clientconn)
81+
82+
resultch := make(chan byte)
83+
errorch := make(chan error)
84+
85+
logger.Info("Starting a goroutine waiting for ledger updates.")
86+
go updateReceiver(t, resultch, errorch, client)
87+
88+
logger.Info("Starting a single broadcast sender goroutine.")
89+
go broadcastSender(t, resultch, errorch, client)
90+
91+
checkResults(t, resultch, errorch)
92+
}
93+
94+
func checkResults(t *testing.T, resultch chan byte, errorch chan error) {
95+
l := len(errorch)
96+
for i := 0; i < l; i++ {
97+
errres := <-errorch
98+
t.Error(errres)
99+
}
100+
101+
updates := 0
102+
sentBroadcast := 0
103+
for i := 0; i < 3; i++ {
104+
select {
105+
case result := <-resultch:
106+
switch result {
107+
case UPDATE:
108+
updates++
109+
case SEND:
110+
sentBroadcast++
111+
}
112+
case <-time.After(30 * time.Second):
113+
continue
114+
}
115+
}
116+
if updates != NEEDED_UPDATES {
117+
t.Errorf("We did not get all the ledger updates.")
118+
} else if sentBroadcast != NEEDED_SENT {
119+
t.Errorf("We were unable to send all the broadcasts.")
120+
} else {
121+
logger.Info("Successfully sent and received everything.")
122+
}
123+
}
124+
125+
func updateReceiver(t *testing.T, resultch chan byte, errorch chan error, client ab.AtomicBroadcastClient) {
126+
logger.Info("{Update Receiver} Creating a ledger update delivery stream.")
127+
dstream, err := client.Deliver(context.Background())
128+
if err != nil {
129+
errorch <- fmt.Errorf("Failed to get Deliver stream: %s", err)
130+
return
131+
}
132+
dstream.Send(&ab.DeliverUpdate{Type: &ab.DeliverUpdate_Seek{Seek: &ab.SeekInfo{Start: ab.SeekInfo_NEWEST, WindowSize: 10}}})
133+
logger.Info("{Update Receiver} Listening to ledger updates.")
134+
for i := 0; i < 2; i++ {
135+
m, inerr := dstream.Recv()
136+
if inerr != nil {
137+
errorch <- fmt.Errorf("Failed to receive consensus: %s", inerr)
138+
return
139+
}
140+
b := m.Type.(*ab.DeliverResponse_Block)
141+
logger.Info("{Update Receiver} Received a ledger update.")
142+
for i, tx := range b.Block.Data.Data {
143+
pl := &cb.Payload{}
144+
e := &cb.Envelope{}
145+
merr1 := proto.Unmarshal(tx, e)
146+
merr2 := proto.Unmarshal(e.Payload, pl)
147+
if merr1 == nil && merr2 == nil {
148+
logger.Infof("{Update Receiver} %d - %v", i+1, pl.Data)
149+
}
150+
}
151+
resultch <- UPDATE
152+
}
153+
logger.Info("{Update Receiver} Exiting...")
154+
}
155+
156+
func broadcastSender(t *testing.T, resultch chan byte, errorch chan error, client ab.AtomicBroadcastClient) {
157+
logger.Info("{Broadcast Sender} Waiting before sending.")
158+
<-time.After(5 * time.Second)
159+
bstream, err := client.Broadcast(context.Background())
160+
if err != nil {
161+
errorch <- fmt.Errorf("Failed to get broadcast stream: %s", err)
162+
return
163+
}
164+
bs := []byte{0, 1, 2, 3}
165+
pl := &cb.Payload{Data: bs}
166+
mpl, err := proto.Marshal(pl)
167+
if err != nil {
168+
panic("Failed to marshal payload.")
169+
}
170+
bstream.Send(&cb.Envelope{Payload: mpl})
171+
logger.Infof("{Broadcast Sender} Broadcast sent: %v", bs)
172+
logger.Info("{Broadcast Sender} Exiting...")
173+
resultch <- SEND
174+
}

orderer/sbft/testdata/config.json

+3-3
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@
22
"consensus": {
33
"n" : 1,
44
"f" : 0,
5-
"batch_size_bytes" : 3,
6-
"batch_duration_nsec" : 0,
7-
"request_timeout_nsec" : 60
5+
"batch_size_bytes" : 1000,
6+
"batch_duration_nsec" : 1000000000,
7+
"request_timeout_nsec" : 1000000000
88
},
99
"peers": [
1010
{

0 commit comments

Comments
 (0)