Skip to content

Commit 606cc4f

Browse files
committed
block-listener: parse chaincode events from block event
Client and their associated support libraries should now parse block events for chaincode events associated with individual transactions. Change-Id: I9b12deef293cfe50e7e634301c990d4c95ec7cf2 Signed-off-by: Patrick Mullaney <[email protected]>
1 parent ed01846 commit 606cc4f

File tree

3 files changed

+75
-23
lines changed

3 files changed

+75
-23
lines changed

examples/events/block-listener/block-listener.go

+52-23
Original file line numberDiff line numberDiff line change
@@ -22,29 +22,19 @@ import (
2222
"os"
2323

2424
"github.com/hyperledger/fabric/events/consumer"
25+
"github.com/hyperledger/fabric/protos/common"
2526
pb "github.com/hyperledger/fabric/protos/peer"
27+
"github.com/hyperledger/fabric/protos/utils"
2628
)
2729

2830
type adapter struct {
2931
notfy chan *pb.Event_Block
3032
rejected chan *pb.Event_Rejection
31-
cEvent chan *pb.Event_ChaincodeEvent
3233
listenToRejections bool
33-
chaincodeID string
3434
}
3535

3636
//GetInterestedEvents implements consumer.EventAdapter interface for registering interested events
3737
func (a *adapter) GetInterestedEvents() ([]*pb.Interest, error) {
38-
if a.chaincodeID != "" {
39-
return []*pb.Interest{
40-
{EventType: pb.EventType_BLOCK},
41-
{EventType: pb.EventType_REJECTION},
42-
{EventType: pb.EventType_CHAINCODE,
43-
RegInfo: &pb.Interest_ChaincodeRegInfo{
44-
ChaincodeRegInfo: &pb.ChaincodeReg{
45-
ChaincodeID: a.chaincodeID,
46-
EventName: ""}}}}, nil
47-
}
4838
return []*pb.Interest{{EventType: pb.EventType_BLOCK}, {EventType: pb.EventType_REJECTION}}, nil
4939
}
5040

@@ -60,10 +50,6 @@ func (a *adapter) Recv(msg *pb.Event) (bool, error) {
6050
}
6151
return true, nil
6252
}
63-
if o, e := msg.Event.(*pb.Event_ChaincodeEvent); e {
64-
a.cEvent <- o
65-
return true, nil
66-
}
6753
return false, fmt.Errorf("Receive unkown type event: %v", msg)
6854
}
6955

@@ -78,7 +64,7 @@ func createEventClient(eventAddress string, listenToRejections bool, cid string)
7864

7965
done := make(chan *pb.Event_Block)
8066
reject := make(chan *pb.Event_Rejection)
81-
adapter := &adapter{notfy: done, rejected: reject, listenToRejections: listenToRejections, chaincodeID: cid, cEvent: make(chan *pb.Event_ChaincodeEvent)}
67+
adapter := &adapter{notfy: done, rejected: reject, listenToRejections: listenToRejections}
8268
obcEHClient, _ = consumer.NewEventsClient(eventAddress, 5, adapter)
8369
if err := obcEHClient.Start(); err != nil {
8470
fmt.Printf("could not start chat %s\n", err)
@@ -89,6 +75,48 @@ func createEventClient(eventAddress string, listenToRejections bool, cid string)
8975
return adapter
9076
}
9177

78+
// getChainCodeEvents parses block events for chaincode events associated with individual transactions
79+
func getChainCodeEvents(tdata []byte) (*pb.ChaincodeEvent, error) {
80+
if tdata == nil {
81+
return nil, fmt.Errorf("Cannot extract payload from nil transaction")
82+
}
83+
84+
if env, err := utils.GetEnvelopeFromBlock(tdata); err != nil {
85+
return nil, fmt.Errorf("Error getting tx from block(%s)\n", err)
86+
} else if env != nil {
87+
// get the payload from the envelope
88+
payload, err := utils.GetPayload(env)
89+
if err != nil {
90+
return nil, fmt.Errorf("Could not extract payload from envelope, err %s", err)
91+
}
92+
93+
if common.HeaderType(payload.Header.ChainHeader.Type) == common.HeaderType_ENDORSER_TRANSACTION {
94+
tx, err := utils.GetTransaction(payload.Data)
95+
if err != nil {
96+
return nil, fmt.Errorf("Error unmarshalling transaction payload for block event: %s", err)
97+
}
98+
chaincodeActionPayload, err := utils.GetChaincodeActionPayload(tx.Actions[0].Payload)
99+
if err != nil {
100+
return nil, fmt.Errorf("Error unmarshalling transaction action payload for block event: %s", err)
101+
}
102+
propRespPayload, err := utils.GetProposalResponsePayload(chaincodeActionPayload.Action.ProposalResponsePayload)
103+
if err != nil {
104+
return nil, fmt.Errorf("Error unmarshalling proposal response payload for block event: %s", err)
105+
}
106+
caPayload, err := utils.GetChaincodeAction(propRespPayload.Extension)
107+
if err != nil {
108+
return nil, fmt.Errorf("Error unmarshalling chaincode action for block event: %s", err)
109+
}
110+
ccEvent, err := utils.GetChaincodeEvents(caPayload.Events)
111+
112+
if ccEvent != nil {
113+
return ccEvent, nil
114+
}
115+
}
116+
}
117+
return nil, fmt.Errorf("No events found")
118+
}
119+
92120
func main() {
93121
var eventAddress string
94122
var listenToRejections bool
@@ -115,6 +143,13 @@ func main() {
115143
fmt.Printf("--------------\n")
116144
for _, r := range b.Block.Data.Data {
117145
fmt.Printf("Transaction:\n\t[%v]\n", r)
146+
if event, err := getChainCodeEvents(r); err == nil {
147+
if event.ChaincodeID == chaincodeID {
148+
fmt.Printf("Received chaincode event\n")
149+
fmt.Printf("------------------------\n")
150+
fmt.Printf("Chaincode Event:%+v\n", event)
151+
}
152+
}
118153
}
119154
case r := <-a.rejected:
120155
fmt.Printf("\n")
@@ -124,12 +159,6 @@ func main() {
124159
//TODO get TxID from pb.ChaincodeHeader from TransactionAction's Header
125160
//fmt.Printf("Transaction error:\n%s\t%s\n", r.Rejection.Tx.Txid, r.Rejection.ErrorMsg)
126161
fmt.Printf("Transaction error:\n%s\n", r.Rejection.ErrorMsg)
127-
case ce := <-a.cEvent:
128-
fmt.Printf("\n")
129-
fmt.Printf("\n")
130-
fmt.Printf("Received chaincode event\n")
131-
fmt.Printf("------------------------\n")
132-
fmt.Printf("Chaincode Event:%v\n", ce)
133162
}
134163
}
135164
}

protos/utils/proputils.go

+11
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,17 @@ func GetChaincodeAction(caBytes []byte) (*peer.ChaincodeAction, error) {
9898
return chaincodeAction, nil
9999
}
100100

101+
// GetChaincodeEvents gets the ChaincodeEvents given chaicnode event bytes
102+
func GetChaincodeEvents(eBytes []byte) (*peer.ChaincodeEvent, error) {
103+
chaincodeEvent := &peer.ChaincodeEvent{}
104+
err := proto.Unmarshal(eBytes, chaincodeEvent)
105+
if err != nil {
106+
return nil, err
107+
}
108+
109+
return chaincodeEvent, nil
110+
}
111+
101112
// GetProposalResponsePayload gets the proposal response payload
102113
func GetProposalResponsePayload(prpBytes []byte) (*peer.ProposalResponsePayload, error) {
103114
prp := &peer.ProposalResponsePayload{}

protos/utils/proputils_test.go

+12
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,18 @@ func TestProposalResponse(t *testing.T) {
165165
return
166166
}
167167

168+
event, err := GetChaincodeEvents(act.Events)
169+
if err != nil {
170+
t.Fatalf("Failure while unmarshalling the ChainCodeEvents")
171+
return
172+
}
173+
174+
// sanity check on the event
175+
if string(event.ChaincodeID) != "ccid" {
176+
t.Fatalf("Invalid actions after unmarshalling")
177+
return
178+
}
179+
168180
pr := &pb.ProposalResponse{
169181
Payload: prpBytes,
170182
Endorsement: &pb.Endorsement{Endorser: []byte("endorser"), Signature: []byte("signature")},

0 commit comments

Comments
 (0)