Skip to content

Commit ebb3cb9

Browse files
committed
Enable block event generation
This changeset enables block event generation support for 1.0 Change-Id: Id50c5e8fdd63c1e4d693a90b5438d9c8fa26de3c Signed-off-by: Patrick Mullaney <[email protected]>
1 parent 85456c0 commit ebb3cb9

File tree

3 files changed

+101
-13
lines changed

3 files changed

+101
-13
lines changed

core/committer/noopssinglechain/client.go

+4
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"github.com/hyperledger/fabric/core/committer"
2525
"github.com/hyperledger/fabric/core/ledger/kvledger"
2626
"github.com/hyperledger/fabric/core/util"
27+
"github.com/hyperledger/fabric/events/producer"
2728
"github.com/hyperledger/fabric/protos/common"
2829
"github.com/hyperledger/fabric/protos/orderer"
2930
putils "github.com/hyperledger/fabric/protos/utils"
@@ -327,6 +328,9 @@ func (d *DeliverService) readUntilClose() {
327328
// Gossip messages with other nodes
328329
logger.Debugf("Gossiping block [%d], peers number [%d]", seqNum, numberOfPeers)
329330
d.gossip.Gossip(gossipMsg)
331+
if err = producer.SendProducerBlockEvent(block); err != nil {
332+
logger.Errorf("Error sending block event %s", err)
333+
}
330334

331335
d.unAcknowledged++
332336
if d.unAcknowledged >= d.windowSize/2 {

events/events_test.go

+5-6
Original file line numberDiff line numberDiff line change
@@ -81,12 +81,11 @@ func (a *Adapter) Disconnected(err error) {
8181
}
8282
}
8383

84-
func createTestBlock() *ehpb.Event {
84+
func createTestBlock() *common.Block {
8585
block := common.NewBlock(1, []byte{})
8686
block.Data.Data = [][]byte{[]byte("tx1"), []byte("tx2")}
8787
block.Header.DataHash = block.Data.Hash()
88-
emsg := producer.CreateBlockEvent(block)
89-
return emsg
88+
return block
9089
}
9190

9291
func createTestChaincodeEvent(tid string, typ string) *ehpb.Event {
@@ -122,13 +121,13 @@ func TestReceiveAnyMessage(t *testing.T) {
122121
var err error
123122

124123
adapter.count = 1
125-
emsg := createTestBlock()
126-
if err = producer.Send(emsg); err != nil {
124+
block := createTestBlock()
125+
if err = producer.SendProducerBlockEvent(block); err != nil {
127126
t.Fail()
128127
t.Logf("Error sending message %s", err)
129128
}
130129

131-
emsg = createTestChaincodeEvent("0xffffffff", "event2")
130+
emsg := createTestChaincodeEvent("0xffffffff", "event2")
132131
if err = producer.Send(emsg); err != nil {
133132
t.Fail()
134133
t.Logf("Error sending message %s", err)

events/producer/eventhelper.go

+92-7
Original file line numberDiff line numberDiff line change
@@ -17,21 +17,106 @@ limitations under the License.
1717
package producer
1818

1919
import (
20+
"fmt"
21+
22+
"github.com/golang/protobuf/proto"
2023
"github.com/hyperledger/fabric/protos/common"
21-
ehpb "github.com/hyperledger/fabric/protos/peer"
24+
pb "github.com/hyperledger/fabric/protos/peer"
25+
"github.com/hyperledger/fabric/protos/utils"
26+
"github.com/op/go-logging"
2227
)
2328

29+
var logger *logging.Logger // package-level logger
30+
31+
func init() {
32+
logger = logging.MustGetLogger("eventhub_producer")
33+
}
34+
35+
// SendProducerBlockEvent sends block event to clients
36+
func SendProducerBlockEvent(block *common.Block) error {
37+
bevent := &common.Block{}
38+
bevent.Header = block.Header
39+
bevent.Metadata = block.Metadata
40+
bevent.Data = &common.BlockData{}
41+
for _, d := range block.Data.Data {
42+
if d != nil {
43+
if env, err := utils.GetEnvelopeFromBlock(d); err != nil {
44+
logger.Errorf("Error getting tx from block(%s)\n", err)
45+
} else if env != nil {
46+
// get the payload from the envelope
47+
payload, err := utils.GetPayload(env)
48+
if err != nil {
49+
return fmt.Errorf("Could not extract payload from envelope, err %s", err)
50+
}
51+
52+
if common.HeaderType(payload.Header.ChainHeader.Type) == common.HeaderType_ENDORSER_TRANSACTION {
53+
tx, err := utils.GetTransaction(payload.Data)
54+
if err != nil {
55+
logger.Errorf("Error unmarshalling transaction payload for block event: %s", err)
56+
continue
57+
}
58+
chaincodeActionPayload := &pb.ChaincodeActionPayload{}
59+
err = proto.Unmarshal(tx.Actions[0].Payload, chaincodeActionPayload)
60+
if err != nil {
61+
logger.Errorf("Error unmarshalling transaction action payload for block event: %s", err)
62+
continue
63+
}
64+
65+
propRespPayload := &pb.ProposalResponsePayload{}
66+
err = proto.Unmarshal(chaincodeActionPayload.Action.ProposalResponsePayload, propRespPayload)
67+
if err != nil {
68+
logger.Errorf("Error unmarshalling proposal response payload for block event: %s", err)
69+
continue
70+
}
71+
//ENDORSER_ACTION, ProposalResponsePayload.Extension field contains ChaincodeAction
72+
caPayload := &pb.ChaincodeAction{}
73+
err = proto.Unmarshal(propRespPayload.Extension, caPayload)
74+
if err != nil {
75+
logger.Errorf("Error unmarshalling chaincode action for block event: %s", err)
76+
continue
77+
}
78+
// Drop read write set from transaction before sending block event
79+
caPayload.Results = nil
80+
propRespPayload.Extension, err = proto.Marshal(caPayload)
81+
if err != nil {
82+
logger.Errorf("Error marshalling tx proposal extension payload for block event: %s", err)
83+
continue
84+
}
85+
// Marshal Transaction again and append to block to be sent
86+
chaincodeActionPayload.Action.ProposalResponsePayload, err = proto.Marshal(propRespPayload)
87+
if err != nil {
88+
logger.Errorf("Error marshalling tx proposal payload for block event: %s", err)
89+
continue
90+
}
91+
tx.Actions[0].Payload, err = proto.Marshal(chaincodeActionPayload)
92+
if err != nil {
93+
logger.Errorf("Error marshalling tx action payload for block event: %s", err)
94+
continue
95+
}
96+
if t, err := proto.Marshal(tx); err == nil {
97+
bevent.Data.Data = append(bevent.Data.Data, t)
98+
logger.Infof("calling sendProducerBlockEvent\n")
99+
} else {
100+
logger.Infof("Cannot marshal transaction %s\n", err)
101+
}
102+
}
103+
}
104+
}
105+
}
106+
return Send(CreateBlockEvent(bevent))
107+
}
108+
24109
//CreateBlockEvent creates a Event from a Block
25-
func CreateBlockEvent(te *common.Block) *ehpb.Event {
26-
return &ehpb.Event{Event: &ehpb.Event_Block{Block: te}}
110+
func CreateBlockEvent(te *common.Block) *pb.Event {
111+
return &pb.Event{Event: &pb.Event_Block{Block: te}}
27112
}
28113

29114
//CreateChaincodeEvent creates a Event from a ChaincodeEvent
30-
func CreateChaincodeEvent(te *ehpb.ChaincodeEvent) *ehpb.Event {
31-
return &ehpb.Event{Event: &ehpb.Event_ChaincodeEvent{ChaincodeEvent: te}}
115+
func CreateChaincodeEvent(te *pb.ChaincodeEvent) *pb.Event {
116+
return &pb.Event{Event: &pb.Event_ChaincodeEvent{ChaincodeEvent: te}}
32117
}
33118

34119
//CreateRejectionEvent creates an Event from TxResults
35-
func CreateRejectionEvent(tx *ehpb.Transaction, errorMsg string) *ehpb.Event {
36-
return &ehpb.Event{Event: &ehpb.Event_Rejection{Rejection: &ehpb.Rejection{Tx: tx, ErrorMsg: errorMsg}}}
120+
func CreateRejectionEvent(tx *pb.Transaction, errorMsg string) *pb.Event {
121+
return &pb.Event{Event: &pb.Event_Rejection{Rejection: &pb.Rejection{Tx: tx, ErrorMsg: errorMsg}}}
37122
}

0 commit comments

Comments
 (0)