Skip to content

Commit e9d3ac2

Browse files
committed
node-SDK: add support for fabric events(block, chaincode, transactional)
This patch adds support for fabric events to the SDK. It defaults to the same timer based mechanism for transactional events if the app doesn't connect to a peer event source(eventHubConnect). Change-Id: Ib9f9f79f11438eb67d5ada7666ce1d117af511a5 Signed-off-by: Patrick Mullaney <[email protected]>
1 parent b1ea9cd commit e9d3ac2

File tree

3 files changed

+229
-41
lines changed

3 files changed

+229
-41
lines changed

sdk/node/package.json

+2
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,11 @@
1212
"crypto": "0.0.3",
1313
"debug": "^2.2.0",
1414
"elliptic": "^6.2.3",
15+
"es6-set": "^0.1.4",
1516
"events": "^1.1.0",
1617
"fs": "0.0.2",
1718
"grpc": "^0.13.2-pre1",
19+
"hashtable": "^2.0.2",
1820
"js-sha3": "^0.5.1",
1921
"json-stringify-safe": "^5.0.1",
2022
"jsrsasign": "^5.0.10",

sdk/node/src/hfc.ts

+221-41
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ var jsrsa = require('jsrsasign');
5555
var elliptic = require('elliptic');
5656
var sha3 = require('js-sha3');
5757
var BN = require('bn.js');
58+
var Set = require('es6-set');
59+
var HashTable = require('hashtable');
5860
import * as crypto from "./crypto"
5961
import * as stats from "./stats"
6062
import * as sdk_util from "./sdk_util"
@@ -369,6 +371,7 @@ export interface TransactionProtobuf {
369371
setConfidentialityProtocolVersion(version:string):void;
370372
setNonce(nonce:Buffer):void;
371373
setToValidators(Buffer):void;
374+
getTxid():string;
372375
getChaincodeID():{buffer: Buffer};
373376
setChaincodeID(buffer:Buffer):void;
374377
getMetadata():{buffer: Buffer};
@@ -419,6 +422,9 @@ export class Chain {
419422
// The member services used for this chain
420423
private memberServices:MemberServices;
421424

425+
// The eventHub service used for this chain
426+
private eventHub:EventHub;
427+
422428
// The key-val store used for this chain
423429
private keyValStore:KeyValStore;
424430

@@ -430,14 +436,15 @@ export class Chain {
430436

431437
// Temporary variables to control how long to wait for deploy and invoke to complete before
432438
// emitting events. This will be removed when the SDK is able to receive events from the
433-
private deployWaitTime:number = 20;
439+
private deployWaitTime:number = 30;
434440
private invokeWaitTime:number = 5;
435441

436442
// The crypto primitives object
437443
cryptoPrimitives:crypto.Crypto;
438444

439445
constructor(name:string) {
440446
this.name = name;
447+
this.eventHub = new EventHub();
441448
}
442449

443450
/**
@@ -520,6 +527,29 @@ export class Chain {
520527
}
521528
};
522529

530+
/**
531+
* Get the eventHub service associated this chain.
532+
* @returns {eventHub} Return the current eventHub service, or undefined if not set.
533+
*/
534+
getEventHub():EventHub{
535+
return this.eventHub;
536+
};
537+
538+
/**
539+
* Set and connect to the peer to be used as the event source.
540+
*/
541+
eventHubConnect(peeraddr: string):void {
542+
this.eventHub.setPeerAddr(peeraddr);
543+
this.eventHub.connect();
544+
};
545+
546+
/**
547+
* Set and connect to the peer to be used as the event source.
548+
*/
549+
eventHubDisconnect():void {
550+
this.eventHub.disconnect();
551+
};
552+
523553
/**
524554
* Determine if security is enabled.
525555
*/
@@ -1144,6 +1174,10 @@ export class TransactionContext extends events.EventEmitter {
11441174
private binding: any;
11451175
private tcert:TCert;
11461176
private attrs:string[];
1177+
private complete:boolean;
1178+
private timeoutId:any;
1179+
private waitTime:number;
1180+
private cevent:any;
11471181

11481182
constructor(member:Member, tcert:TCert) {
11491183
super();
@@ -1152,6 +1186,8 @@ export class TransactionContext extends events.EventEmitter {
11521186
this.memberServices = this.chain.getMemberServices();
11531187
this.tcert = tcert;
11541188
this.nonce = this.chain.cryptoPrimitives.generateNonce();
1189+
this.complete = false;
1190+
this.timeoutId = null;
11551191
}
11561192

11571193
/**
@@ -1376,7 +1412,47 @@ export class TransactionContext extends events.EventEmitter {
13761412
});
13771413
self.getChain().sendTransaction(tx, emitter);
13781414
} else {
1379-
self.getChain().sendTransaction(tx, self);
1415+
let txType = tx.pb.getType();
1416+
let uuid = tx.pb.getTxid();
1417+
let eh = self.getChain().getEventHub();
1418+
// async deploy and invokes need to maintain
1419+
// tx context(completion status(self.complete))
1420+
if ( txType == _fabricProto.Transaction.Type.CHAINCODE_DEPLOY) {
1421+
self.cevent = new EventDeployComplete(uuid, tx.chaincodeID);
1422+
self.waitTime = self.getChain().getDeployWaitTime();
1423+
} else if ( txType == _fabricProto.Transaction.Type.CHAINCODE_INVOKE) {
1424+
self.cevent = new EventInvokeComplete("Tx "+uuid+" complete");
1425+
self.waitTime = self.getChain().getInvokeWaitTime();
1426+
}
1427+
eh.registerTxEvent(uuid, function (uuid) {
1428+
self.complete = true;
1429+
if (self.timeoutId) {
1430+
clearTimeout(self.timeoutId);
1431+
}
1432+
eh.unregisterTxEvent(uuid);
1433+
self.emit("complete", self.cevent);
1434+
});
1435+
self.getChain().sendTransaction(tx, self);
1436+
// sync query can be skipped as response
1437+
// is processed and event generated in sendTransaction
1438+
// no timeout processing is necessary
1439+
if ( txType != _fabricProto.Transaction.Type.CHAINCODE_QUERY) {
1440+
debug("waiting %d seconds before emitting complete event", self.waitTime);
1441+
self.timeoutId = setTimeout(function() {
1442+
debug("timeout uuid=", uuid);
1443+
if(!self.complete)
1444+
// emit error if eventhub connect otherwise
1445+
// emit a complete event as done previously
1446+
if(eh.isconnected())
1447+
self.emit("error","timed out waiting for transaction to complete");
1448+
else
1449+
self.emit("complete",self.cevent);
1450+
else
1451+
eh.unregisterTxEvent(uuid);
1452+
},
1453+
self.waitTime * 1000
1454+
);
1455+
}
13801456
}
13811457
} else {
13821458
debug('Missing TCert...');
@@ -2130,7 +2206,6 @@ export class Peer {
21302206
let event = new EventDeploySubmitted(response.msg.toString(), tx.chaincodeID);
21312207
debug("EventDeploySubmitted event: %j", event);
21322208
eventEmitter.emit("submitted", event);
2133-
self.waitForDeployComplete(eventEmitter,event);
21342209
}
21352210
} else {
21362211
// Deploy completed with status "FAILURE" or "UNDEFINED"
@@ -2144,7 +2219,6 @@ export class Peer {
21442219
eventEmitter.emit("error", new EventTransactionError("the invoke response is missing the transaction UUID"));
21452220
} else {
21462221
eventEmitter.emit("submitted", new EventInvokeSubmitted(response.msg.toString()));
2147-
self.waitForInvokeComplete(eventEmitter);
21482222
}
21492223
} else {
21502224
// Invoke completed with status "FAILURE" or "UNDEFINED"
@@ -2166,43 +2240,6 @@ export class Peer {
21662240
});
21672241
};
21682242

2169-
/**
2170-
* TODO: Temporary hack to wait until the deploy event has hopefully completed.
2171-
* This does not detect if an error occurs in the peer or chaincode when deploying.
2172-
* When peer event listening is added to the SDK, this will be implemented correctly.
2173-
*/
2174-
private waitForDeployComplete(eventEmitter:events.EventEmitter, submitted:EventDeploySubmitted): void {
2175-
let waitTime = this.chain.getDeployWaitTime();
2176-
debug("waiting %d seconds before emitting deploy complete event",waitTime);
2177-
setTimeout(
2178-
function() {
2179-
let event = new EventDeployComplete(
2180-
submitted.uuid,
2181-
submitted.chaincodeID,
2182-
"TODO: get actual results; waited "+waitTime+" seconds and assumed deploy was successful"
2183-
);
2184-
eventEmitter.emit("complete",event);
2185-
},
2186-
waitTime * 1000
2187-
);
2188-
}
2189-
2190-
/**
2191-
* TODO: Temporary hack to wait until the deploy event has hopefully completed.
2192-
* This does not detect if an error occurs in the peer or chaincode when deploying.
2193-
* When peer event listening is added to the SDK, this will be implemented correctly.
2194-
*/
2195-
private waitForInvokeComplete(eventEmitter:events.EventEmitter): void {
2196-
let waitTime = this.chain.getInvokeWaitTime();
2197-
debug("waiting %d seconds before emitting invoke complete event",waitTime);
2198-
setTimeout(
2199-
function() {
2200-
eventEmitter.emit("complete",new EventInvokeComplete("waited "+waitTime+" seconds and assumed invoke was successful"));
2201-
},
2202-
waitTime * 1000
2203-
);
2204-
}
2205-
22062243
/**
22072244
* Remove the peer from the chain.
22082245
*/
@@ -2726,3 +2763,146 @@ export function getChain(chainName, create) {
27262763
export function newFileKeyValStore(dir:string):KeyValStore {
27272764
return new FileKeyValStore(dir);
27282765
}
2766+
2767+
/**
2768+
* The ChainCodeCBE is used internal to the EventHub to hold chaincode event registration callbacks.
2769+
*/
2770+
class ChainCodeCBE {
2771+
ccid: string;
2772+
eventname: string;
2773+
payload: Uint8Array;
2774+
cb: Function;
2775+
constructor(ccid: string,eventname: string,payload: Uint8Array, cb: Function) {
2776+
this.ccid = ccid;
2777+
this.eventname = eventname;
2778+
this.payload = payload;
2779+
this.cb = cb;
2780+
}
2781+
}
2782+
2783+
/**
2784+
* The EventHub is used to distribute events from a specific event source(peer)
2785+
*/
2786+
export class EventHub {
2787+
// peer addr to connect to
2788+
private peeraddr: string;
2789+
// grpc events interface
2790+
private events: any;
2791+
// grpc event client interface
2792+
private client: any;
2793+
// grpc chat streaming interface
2794+
private call: any;
2795+
// hashtable of clients registered for chaincode events
2796+
private chaincodeRegistrants: any;
2797+
// set of clients registered for block events
2798+
private blockRegistrants: any;
2799+
// hashtable of clients registered for transactional events
2800+
private txRegistrants: any;
2801+
// fabric connection state of this eventhub
2802+
private connected: boolean;
2803+
constructor() {
2804+
this.chaincodeRegistrants = new HashTable();
2805+
this.blockRegistrants = new Set();
2806+
this.txRegistrants = new HashTable();
2807+
this.peeraddr = "localhost:7053";
2808+
this.connected = false;
2809+
}
2810+
2811+
public setPeerAddr(peeraddr: string) {
2812+
this.peeraddr = peeraddr;
2813+
}
2814+
2815+
public isconnected() {
2816+
return this.connected;
2817+
}
2818+
2819+
public connect() {
2820+
if (this.connected) return;
2821+
this.events = grpc.load(__dirname + "/protos/events.proto" ).protos;
2822+
this.client = new this.events.Events(this.peeraddr,grpc.credentials.createInsecure());
2823+
this.call = this.client.chat();
2824+
this.connected = true;
2825+
this.registerBlockEvent(this.txCallback);
2826+
2827+
let eh = this; // for callback context
2828+
this.call.on('data', function(event) {
2829+
if ( event.Event == "chaincodeEvent" ) {
2830+
var cbe = eh.chaincodeRegistrants.get(event.chaincodeEvent.chaincodeID + "/" + event.chaincodeEvent.eventName);
2831+
if ( cbe ) {
2832+
cbe.payload = event.chaincodeEvent.payload;
2833+
cbe.cb(cbe);
2834+
}
2835+
} else if ( event.Event == "block") {
2836+
eh.blockRegistrants.forEach(function(cb){
2837+
cb(event.block);
2838+
});
2839+
}
2840+
});
2841+
this.call.on('end', function() {
2842+
eh.call.end();
2843+
// clean up Registrants - should app get notified?
2844+
eh.chaincodeRegistrants.clear();
2845+
eh.blockRegistrants.clear();
2846+
});
2847+
}
2848+
2849+
public disconnect() {
2850+
if (!this.connected) return;
2851+
this.unregisterBlockEvent(this.txCallback);
2852+
this.call.end();
2853+
this.connected = false;
2854+
}
2855+
2856+
public registerChaincodeEvent(ccid: string, eventname: string, callback: Function){
2857+
if (!this.connected) return;
2858+
let cb = new ChainCodeCBE(ccid, eventname, null, callback);
2859+
let register = { register: { events: [ { eventType: "CHAINCODE", chaincodeRegInfo:{ chaincodeID: ccid , eventName: eventname }} ] }};
2860+
this.chaincodeRegistrants.put(ccid + "/" + eventname, cb);
2861+
this.call.write(register);
2862+
}
2863+
2864+
public unregisterChaincodeEvent(ccid: string, eventname: string){
2865+
if (!this.connected) return;
2866+
var unregister = { unregister: { events: [ { eventType: "CHAINCODE", chaincodeRegInfo:{ chaincodeID: ccid, eventName: eventname }} ] }};
2867+
this.chaincodeRegistrants.remove(ccid + "/" + eventname);
2868+
this.call.write(unregister);
2869+
}
2870+
2871+
public registerBlockEvent(callback:Function){
2872+
if (!this.connected) return;
2873+
this.blockRegistrants.add(callback);
2874+
if(this.blockRegistrants.size==1) {
2875+
var register = { register: { events: [ { eventType: "BLOCK"} ] }};
2876+
this.call.write(register);
2877+
}
2878+
}
2879+
2880+
public unregisterBlockEvent(callback:Function){
2881+
if (!this.connected) return;
2882+
if(this.blockRegistrants.size<=1) {
2883+
var unregister = { unregister: { events: [ { eventType: "BLOCK"} ] }};
2884+
this.call.write(unregister);
2885+
}
2886+
this.blockRegistrants.delete(callback);
2887+
}
2888+
2889+
public registerTxEvent(txid:string, callback:Function){
2890+
debug("reg txid "+txid);
2891+
this.txRegistrants.put(txid, callback);
2892+
}
2893+
2894+
public unregisterTxEvent(txid:string){
2895+
this.txRegistrants.remove(txid);
2896+
}
2897+
2898+
private txCallback = (event) => {
2899+
debug("txCallback event=%j", event);
2900+
var eh = this;
2901+
event.transactions.forEach(function(transaction){
2902+
debug("transaction.txid="+transaction.txid);
2903+
var cb = eh.txRegistrants.get(transaction.txid);
2904+
if (cb)
2905+
cb(transaction.txid);
2906+
});
2907+
}
2908+
}

sdk/node/test/unit/chain-tests.js

+6
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,11 @@ if (fs.existsSync("tlsca.cert")) {
4848
chain.setMemberServicesUrl("grpc://localhost:7054");
4949
}
5050
chain.addPeer("grpc://localhost:7051");
51+
chain.eventHubConnect("localhost:7053");
52+
53+
process.on('exit', function (){
54+
chain.eventHubDisconnect();
55+
});
5156

5257
//
5358
// Set the chaincode deployment mode to either developent mode (user runs chaincode)
@@ -759,6 +764,7 @@ test('Invoke a chaincode by enrolled user', function (t) {
759764
invokeTx.on('submitted', function (results) {
760765
// Invoke transaction submitted successfully
761766
t.pass(util.format("Successfully submitted chaincode invoke transaction: request=%j, response=%j", invokeRequest, results));
767+
chain.eventHubDisconnect();
762768
});
763769
invokeTx.on('error', function (err) {
764770
// Invoke transaction submission failed

0 commit comments

Comments
 (0)