Skip to content

Commit 6daedfd

Browse files
author
Srinivasan Muralidharan
committed
re-enable sending of chaincode events
Chaincode events were originally stored in TransactionResults in NonHashData(see fabric.proto). TransactionResults was removed as part of removing error transactions from ledger which broke sending of chaincode events. This is fixed in the patch which stores chaincode events directly in NonHashData. Sample is also enhanced. Change-Id: Ifee8facb2e4760e878f15716af0e48346536c476 Signed-off-by: Srinivasan Muralidharan <[email protected]>
1 parent 6be9353 commit 6daedfd

File tree

6 files changed

+117
-9
lines changed

6 files changed

+117
-9
lines changed

core/ledger/ledger.go

+42-1
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,31 @@ func (ledger *Ledger) CommitTxBatch(id interface{}, transactions []*protos.Trans
159159
writeBatch := gorocksdb.NewWriteBatch()
160160
defer writeBatch.Destroy()
161161
block := protos.NewBlock(transactions, metadata)
162-
block.NonHashData = &protos.NonHashData{}
162+
163+
ccEvents := []*protos.ChaincodeEvent{}
164+
165+
if transactionResults != nil {
166+
ccEvents = make([]*protos.ChaincodeEvent, len(transactionResults))
167+
for i := 0; i < len(transactionResults); i++ {
168+
if transactionResults[i].ChaincodeEvent != nil {
169+
ccEvents[i] = transactionResults[i].ChaincodeEvent
170+
} else {
171+
//We need the index so we can map the chaincode
172+
//event to the transaction that generated it.
173+
//Hence need an entry for cc event even if one
174+
//wasn't generated for the transaction. We cannot
175+
//use a nil cc event as protobuf does not like
176+
//elements of a repeated array to be nil.
177+
//
178+
//We should discard empty events without chaincode
179+
//ID when sending out events.
180+
ccEvents[i] = &protos.ChaincodeEvent{}
181+
}
182+
}
183+
}
184+
185+
//store chaincode events directly in NonHashData. This will likely change in New Consensus where we can move them to Transaction
186+
block.NonHashData = &protos.NonHashData{ChaincodeEvents: ccEvents}
163187
newBlockNumber, err := ledger.blockchain.addPersistenceChangesForNewBlock(context.TODO(), block, stateHash, writeBatch)
164188
if err != nil {
165189
ledger.resetForNextTxGroup(false)
@@ -180,6 +204,10 @@ func (ledger *Ledger) CommitTxBatch(id interface{}, transactions []*protos.Trans
180204
ledger.blockchain.blockPersistenceStatus(true)
181205

182206
sendProducerBlockEvent(block)
207+
208+
//send chaincode events from transaction results
209+
sendChaincodeEvents(transactionResults)
210+
183211
if len(transactionResults) != 0 {
184212
ledgerLogger.Debug("There were some erroneous transactions. We need to send a 'TX rejected' message here.")
185213
}
@@ -491,3 +519,16 @@ func sendProducerBlockEvent(block *protos.Block) {
491519

492520
producer.Send(producer.CreateBlockEvent(block))
493521
}
522+
523+
//send chaincode events created by transactions
524+
func sendChaincodeEvents(trs []*protos.TransactionResult) {
525+
if trs != nil {
526+
for _, tr := range trs {
527+
//we store empty chaincode events in the protobuf repeated array to make protobuf happy.
528+
//when we replay off a block ignore empty events
529+
if tr.ChaincodeEvent != nil && tr.ChaincodeEvent.ChaincodeID != "" {
530+
producer.Send(producer.CreateChaincodeEvent(tr.ChaincodeEvent))
531+
}
532+
}
533+
}
534+
}

events/producer/handler.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ func (d *handler) register(iMsg []*pb.Interest) error {
6464
//if successfully done, continue....
6565
for _, v := range iMsg {
6666
if err := registerHandler(v, d); err != nil {
67-
producerLogger.Errorf("could not register %s", v)
67+
producerLogger.Errorf("could not register %s: %s", v, err)
6868
continue
6969
}
7070
d.addInterest(v)

examples/events/block-listener/README.md

+34-4
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
# What is block-listener
2-
block-listener.go will connect to a peer and receive blocks. For every transaction in the block, it will test the ErrorCode field and print success/failure.
2+
block-listener.go will connect to a peer and receive blocks events, transaction rejection events and chaincode events (if a chaincode emits events).
33

44
# To Run
55
```sh
66
1. go build
77

8-
2. ./block-listener -events-address=< event address >
8+
2. ./block-listener -events-address=< event address > -listen-to-rejections=< true | false > -events-from-chaincode=< chaincode ID >
99
```
1010

1111
# Example with PBFT
@@ -21,9 +21,11 @@ docker run --rm -it -e CORE_VM_ENDPOINT=http://172.17.0.1:2375 -e CORE_PEER_ID=v
2121
docker run --rm -it -e CORE_VM_ENDPOINT=http://172.17.0.1:2375 -e CORE_PEER_ID=vp3 -e CORE_PEER_ADDRESSAUTODETECT=true -e CORE_PEER_DISCOVERY_ROOTNODE=172.17.0.2:7051 -e CORE_PEER_VALIDATOR_CONSENSUS_PLUGIN=pbft hyperledger/fabric-peer peer node start
2222

2323
## Attach event client to a Peer
24-
./block-listener -events-address=172.17.0.2:7054
24+
```sh
25+
./block-listener -events-address=172.17.0.2:7053 -listen-to-rejections=true
26+
```
2527

26-
Event client should output "Event Address: 172.17.0.2:7054" and wait for events.
28+
Event client should output "Event Address: 172.17.0.2:7053" and wait for events.
2729

2830
## Create a deploy transaction
2931
Submit a transaction to deploy chaincode_example02.
@@ -50,3 +52,31 @@ CORE_PEER_ADDRESS=172.17.0.2:7051 peer chaincode invoke -n 1edd7021ab71b766f4928
5052
```
5153

5254
Notice error transaction in events client.
55+
56+
# Tesing chaincode events
57+
Chaincode github.com/hyperledger/fabric/examples/chaincode/go/eventsender can be used to test event sender.
58+
## Deploy eventsender chaincode
59+
Stop the event listener and restart it as follows
60+
61+
```
62+
CORE_PEER_ADDRESS=172.17.0.2:7051 ./peer chaincode deploy -p github.com/hyperledger/fabric/examples/chaincode/go/eventsender -c '{"Function":"init", "Args":[]}'
63+
```
64+
65+
```
66+
Note the chaincode ID of the eventsender chaincode. This will be used in the commands below.
67+
```
68+
## Restart event listener
69+
Stop the event listener if running and restart it with `-events-from-chaincode` option
70+
71+
```sh
72+
./block-listener -events-address=172.17.0.2:7053 -listen-to-rejections=true -events-from-chaincode=< event sender chaincode ID>
73+
```
74+
75+
76+
##Send an invoke request to event sender
77+
78+
```sh
79+
CORE_PEER_ADDRESS=172.17.0.2:7051 ./peer chaincode invoke -n < eventsender chaincode ID > -c '{"Function":"greet", "Args":["hello","world"]}'
80+
```
81+
82+
Note the output from the event listener terminal showing a chaincode event from the event sender chaincode in addition to the block event generated by the transaction.

examples/events/block-listener/block-listener.go

+27-3
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,23 @@ import (
3131
type adapter struct {
3232
notfy chan *pb.Event_Block
3333
rejected chan *pb.Event_Rejection
34+
cEvent chan *pb.Event_ChaincodeEvent
3435
listenToRejections bool
36+
chaincodeID string
3537
}
3638

3739
//GetInterestedEvents implements consumer.EventAdapter interface for registering interested events
3840
func (a *adapter) GetInterestedEvents() ([]*pb.Interest, error) {
41+
if a.chaincodeID != "" {
42+
return []*pb.Interest{
43+
{EventType: pb.EventType_BLOCK},
44+
{EventType: pb.EventType_REJECTION},
45+
{EventType: pb.EventType_CHAINCODE,
46+
RegInfo: &pb.Interest_ChaincodeRegInfo{
47+
ChaincodeRegInfo: &pb.ChaincodeReg{
48+
ChaincodeID: a.chaincodeID,
49+
EventName: ""}}}}, nil
50+
}
3951
return []*pb.Interest{{EventType: pb.EventType_BLOCK}, {EventType: pb.EventType_REJECTION}}, nil
4052
}
4153

@@ -49,6 +61,10 @@ func (a *adapter) Recv(msg *pb.Event) (bool, error) {
4961
a.rejected <- o
5062
return true, nil
5163
}
64+
if o, e := msg.Event.(*pb.Event_ChaincodeEvent); e {
65+
a.cEvent <- o
66+
return true, nil
67+
}
5268
a.notfy <- nil
5369
return false, nil
5470
}
@@ -59,12 +75,12 @@ func (a *adapter) Disconnected(err error) {
5975
os.Exit(1)
6076
}
6177

62-
func createEventClient(eventAddress string, listenToRejections bool) *adapter {
78+
func createEventClient(eventAddress string, listenToRejections bool, cid string) *adapter {
6379
var obcEHClient *consumer.EventsClient
6480

6581
done := make(chan *pb.Event_Block)
6682
reject := make(chan *pb.Event_Rejection)
67-
adapter := &adapter{notfy: done, rejected: reject, listenToRejections: listenToRejections}
83+
adapter := &adapter{notfy: done, rejected: reject, listenToRejections: listenToRejections, chaincodeID: cid, cEvent: make(chan *pb.Event_ChaincodeEvent)}
6884
obcEHClient = consumer.NewEventsClient(eventAddress, adapter)
6985
if err := obcEHClient.Start(); err != nil {
7086
fmt.Printf("could not start chat %s\n", err)
@@ -78,13 +94,15 @@ func createEventClient(eventAddress string, listenToRejections bool) *adapter {
7894
func main() {
7995
var eventAddress string
8096
var listenToRejections bool
97+
var chaincodeID string
8198
flag.StringVar(&eventAddress, "events-address", "0.0.0.0:7053", "address of events server")
8299
flag.BoolVar(&listenToRejections, "listen-to-rejections", false, "whether to listen to rejection events")
100+
flag.StringVar(&chaincodeID, "events-from-chaincode", "", "listen to events from given chaincode")
83101
flag.Parse()
84102

85103
fmt.Printf("Event Address: %s\n", eventAddress)
86104

87-
a := createEventClient(eventAddress, listenToRejections)
105+
a := createEventClient(eventAddress, listenToRejections, chaincodeID)
88106
if a == nil {
89107
fmt.Printf("Error creating event client\n")
90108
return
@@ -106,6 +124,12 @@ func main() {
106124
fmt.Printf("Received rejected transaction\n")
107125
fmt.Printf("--------------\n")
108126
fmt.Printf("Transaction error:\n%s\t%s\n", r.Rejection.Tx.Uuid, r.Rejection.ErrorMsg)
127+
case ce := <-a.cEvent:
128+
fmt.Printf("\n")
129+
fmt.Printf("\n")
130+
fmt.Printf("Received chaincode event\n")
131+
fmt.Printf("------------------------\n")
132+
fmt.Printf("Chaincode Event:%v\n", ce)
109133
}
110134
}
111135
}

protos/fabric.pb.go

+10
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

protos/fabric.proto

+3
Original file line numberDiff line numberDiff line change
@@ -110,8 +110,11 @@ message BlockchainInfo {
110110
// the block hash when verifying the blockchain.
111111
// localLedgerCommitTimestamp - The time at which the block was added
112112
// to the ledger on the local peer.
113+
// chaincodeEvent - is an array ChaincodeEvents, one per transaction in the
114+
// block
113115
message NonHashData {
114116
google.protobuf.Timestamp localLedgerCommitTimestamp = 1;
117+
repeated ChaincodeEvent chaincodeEvents = 2;
115118
}
116119

117120
// Interface exported by the server.

0 commit comments

Comments
 (0)