Skip to content

Commit 53fd500

Browse files
author
jyellick
committed
Implement solo orderer
Change-Id: I1bf2c44e5dc77a13b4749ea82c1df7fed618fd2f Signed-off-by: jyellick <[email protected]>
1 parent 16ca7b0 commit 53fd500

File tree

11 files changed

+1159
-0
lines changed

11 files changed

+1159
-0
lines changed

orderer/atomicbroadcast/block.go

+31
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
Copyright IBM Corp. 2016 All Rights Reserved.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package atomicbroadcast
18+
19+
import (
20+
"github.com/golang/protobuf/proto"
21+
"github.com/hyperledger/fabric/core/util"
22+
)
23+
24+
func (b *Block) Hash() []byte {
25+
data, err := proto.Marshal(b) // XXX this is wrong, protobuf is not the right mechanism to serialize for a hash
26+
if err != nil {
27+
panic("This should never fail and is generally irrecoverable")
28+
}
29+
30+
return util.ComputeCryptoHash(data)
31+
}

orderer/main.go

+52
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
Copyright IBM Corp. 2016 All Rights Reserved.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package main
18+
19+
import (
20+
"fmt"
21+
"net"
22+
"os"
23+
"time"
24+
25+
"github.com/hyperledger/fabric/orderer/solo"
26+
27+
"google.golang.org/grpc"
28+
)
29+
30+
func main() {
31+
32+
address := os.Getenv("ORDERER_LISTEN_ADDRESS")
33+
if address == "" {
34+
address = "127.0.0.1"
35+
}
36+
37+
port := os.Getenv("ORDERER_LISTEN_PORT")
38+
if port == "" {
39+
port = "5005"
40+
}
41+
42+
lis, err := net.Listen("tcp", address+":"+port)
43+
if err != nil {
44+
fmt.Println("Failed to listen:", err)
45+
return
46+
}
47+
48+
grpcServer := grpc.NewServer()
49+
50+
solo.New(100, 10, 10, 10*time.Second, grpcServer)
51+
grpcServer.Serve(lis)
52+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
Copyright IBM Corp. 2016 All Rights Reserved.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package main
18+
19+
import (
20+
"fmt"
21+
"time"
22+
23+
ab "github.com/hyperledger/fabric/orderer/atomicbroadcast"
24+
"golang.org/x/net/context"
25+
"google.golang.org/grpc"
26+
)
27+
28+
type broadcastClient struct {
29+
client ab.AtomicBroadcast_BroadcastClient
30+
}
31+
32+
// newBroadcastClient creates a simple instance of the broadcastClient interface
33+
func newBroadcastClient(client ab.AtomicBroadcast_BroadcastClient) *broadcastClient {
34+
return &broadcastClient{client: client}
35+
}
36+
37+
func (s *broadcastClient) broadcast(transaction []byte) error {
38+
return s.client.Send(&ab.BroadcastMessage{transaction})
39+
}
40+
41+
func main() {
42+
serverAddr := "127.0.0.1:5005"
43+
conn, err := grpc.Dial(serverAddr, grpc.WithInsecure())
44+
defer conn.Close()
45+
if err != nil {
46+
fmt.Println("Error connecting:", err)
47+
return
48+
}
49+
client, err := ab.NewAtomicBroadcastClient(conn).Broadcast(context.TODO())
50+
if err != nil {
51+
fmt.Println("Error connecting:", err)
52+
return
53+
}
54+
55+
s := newBroadcastClient(client)
56+
s.broadcast([]byte(fmt.Sprintf("Testing %v", time.Now())))
57+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
/*
2+
Copyright IBM Corp. 2016 All Rights Reserved.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package main
18+
19+
import (
20+
"fmt"
21+
22+
ab "github.com/hyperledger/fabric/orderer/atomicbroadcast"
23+
"golang.org/x/net/context"
24+
"google.golang.org/grpc"
25+
)
26+
27+
type deliverClient struct {
28+
client ab.AtomicBroadcast_DeliverClient
29+
windowSize uint64
30+
unAcknowledged uint64
31+
}
32+
33+
func newDeliverClient(client ab.AtomicBroadcast_DeliverClient, windowSize uint64) *deliverClient {
34+
return &deliverClient{client: client, windowSize: windowSize}
35+
}
36+
37+
func (r *deliverClient) seekOldest() error {
38+
return r.client.Send(&ab.DeliverUpdate{
39+
Type: &ab.DeliverUpdate_Seek{
40+
Seek: &ab.SeekInfo{
41+
Start: ab.SeekInfo_OLDEST,
42+
WindowSize: r.windowSize,
43+
},
44+
},
45+
})
46+
}
47+
48+
func (r *deliverClient) seekNewest() error {
49+
return r.client.Send(&ab.DeliverUpdate{
50+
Type: &ab.DeliverUpdate_Seek{
51+
Seek: &ab.SeekInfo{
52+
Start: ab.SeekInfo_NEWEST,
53+
WindowSize: r.windowSize,
54+
},
55+
},
56+
})
57+
}
58+
59+
func (r *deliverClient) seek(blockNumber uint64) error {
60+
return r.client.Send(&ab.DeliverUpdate{
61+
Type: &ab.DeliverUpdate_Seek{
62+
Seek: &ab.SeekInfo{
63+
Start: ab.SeekInfo_SPECIFIED,
64+
SpecifiedNumber: blockNumber,
65+
WindowSize: r.windowSize,
66+
},
67+
},
68+
})
69+
}
70+
71+
func (r *deliverClient) readUntilClose() {
72+
for {
73+
msg, err := r.client.Recv()
74+
if err != nil {
75+
return
76+
}
77+
78+
switch t := msg.Type.(type) {
79+
case *ab.DeliverResponse_Error:
80+
if t.Error == ab.Status_SUCCESS {
81+
fmt.Println("ERROR! Received success in error field")
82+
return
83+
}
84+
fmt.Println("Got error ", t)
85+
case *ab.DeliverResponse_Block:
86+
fmt.Println("Received block: ", t.Block)
87+
r.unAcknowledged++
88+
if r.unAcknowledged >= r.windowSize/2 {
89+
fmt.Println("Sending acknowledgement")
90+
err = r.client.Send(&ab.DeliverUpdate{Type: &ab.DeliverUpdate_Acknowledgement{Acknowledgement: &ab.Acknowledgement{Number: t.Block.Number}}})
91+
if err != nil {
92+
return
93+
}
94+
r.unAcknowledged = 0
95+
}
96+
default:
97+
fmt.Println("Received unknock: ", t)
98+
return
99+
}
100+
}
101+
}
102+
103+
func main() {
104+
serverAddr := "127.0.0.1:5005"
105+
conn, err := grpc.Dial(serverAddr, grpc.WithInsecure())
106+
if err != nil {
107+
fmt.Println("Error connecting:", err)
108+
return
109+
}
110+
client, err := ab.NewAtomicBroadcastClient(conn).Deliver(context.TODO())
111+
if err != nil {
112+
fmt.Println("Error connecting:", err)
113+
return
114+
}
115+
116+
s := newDeliverClient(client, 10)
117+
s.seekOldest()
118+
s.readUntilClose()
119+
120+
}

orderer/solo/broadcast.go

+112
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
/*
2+
Copyright IBM Corp. 2016 All Rights Reserved.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package solo
18+
19+
import (
20+
"time"
21+
22+
ab "github.com/hyperledger/fabric/orderer/atomicbroadcast"
23+
)
24+
25+
type broadcastServer struct {
26+
queue chan *ab.BroadcastMessage
27+
batchSize int
28+
batchTimeout time.Duration
29+
rl *ramLedger
30+
exitChan chan struct{}
31+
}
32+
33+
func newBroadcastServer(queueSize, batchSize int, batchTimeout time.Duration, rs *ramLedger) *broadcastServer {
34+
bs := newPlainBroadcastServer(queueSize, batchSize, batchTimeout, rs)
35+
bs.exitChan = make(chan struct{})
36+
go bs.main()
37+
return bs
38+
}
39+
40+
func newPlainBroadcastServer(queueSize, batchSize int, batchTimeout time.Duration, rl *ramLedger) *broadcastServer {
41+
bs := &broadcastServer{
42+
queue: make(chan *ab.BroadcastMessage, queueSize),
43+
batchSize: batchSize,
44+
batchTimeout: batchTimeout,
45+
rl: rl,
46+
}
47+
return bs
48+
}
49+
50+
func (bs *broadcastServer) halt() {
51+
close(bs.exitChan)
52+
}
53+
54+
func (bs *broadcastServer) main() {
55+
var curBatch []*ab.BroadcastMessage
56+
outer:
57+
for {
58+
timer := time.After(bs.batchTimeout)
59+
select {
60+
case msg := <-bs.queue:
61+
curBatch = append(curBatch, msg)
62+
if len(curBatch) < bs.batchSize {
63+
continue
64+
}
65+
logger.Debugf("Batch size met, creating block")
66+
case <-timer:
67+
if len(curBatch) == 0 {
68+
continue outer
69+
}
70+
logger.Debugf("Batch timer expired, creating block")
71+
case <-bs.exitChan:
72+
logger.Debugf("Exiting")
73+
return
74+
}
75+
76+
block := &ab.Block{
77+
Number: bs.rl.newest.block.Number + 1,
78+
PrevHash: bs.rl.newest.block.Hash(),
79+
Messages: curBatch,
80+
}
81+
curBatch = nil
82+
83+
bs.rl.appendBlock(block)
84+
}
85+
}
86+
87+
func (bs *broadcastServer) handleBroadcast(srv ab.AtomicBroadcast_BroadcastServer) error {
88+
for {
89+
msg, err := srv.Recv()
90+
if err != nil {
91+
return err
92+
}
93+
94+
if msg.Data == nil {
95+
err = srv.Send(&ab.BroadcastResponse{ab.Status_BAD_REQUEST})
96+
if err != nil {
97+
return err
98+
}
99+
}
100+
101+
select {
102+
case bs.queue <- msg:
103+
err = srv.Send(&ab.BroadcastResponse{ab.Status_SUCCESS})
104+
default:
105+
err = srv.Send(&ab.BroadcastResponse{ab.Status_SERVICE_UNAVAILABLE})
106+
}
107+
108+
if err != nil {
109+
return err
110+
}
111+
}
112+
}

0 commit comments

Comments
 (0)