Skip to content

Commit 177fb34

Browse files
author
Jason Yellick
committed
[FAB-4538] Disconnect deliver clients after revoke
If a client of the Deliver interface requests to seek until max_uint64, then this client will stay connected receiving blocks indefinitely. If that client's access is revoked, either by removing the organization from the channel, or the client's certificate is revoked, then this client is not disconnected, and will continue to receive blocks indefinitely. This CR causes the Deliver code to check the configuration sequence number before each block is delivered. If the configuration sequence has been incremented, then the client's authorization is checked once more and ejected if the new configuration does not permit the client's request. Change-Id: Ie852a9a8d435917ef1e7dce2025122c791fc9248 Signed-off-by: Jason Yellick <[email protected]>
1 parent 88b5bcb commit 177fb34

File tree

3 files changed

+60
-0
lines changed

3 files changed

+60
-0
lines changed

orderer/common/deliver/deliver.go

+16
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@ type SupportManager interface {
4545

4646
// Support provides the backing resources needed to support deliver on a chain
4747
type Support interface {
48+
// Sequence returns the current config sequence number, can be used to detect config changes
49+
Sequence() uint64
50+
4851
// PolicyManager returns the current policy manager as specified by the chain configuration
4952
PolicyManager() policies.Manager
5053

@@ -115,6 +118,8 @@ func (ds *deliverServer) Handle(srv ab.AtomicBroadcast_DeliverServer) error {
115118

116119
}
117120

121+
lastConfigSequence := chain.Sequence()
122+
118123
sf := sigfilter.New(policies.ChannelReaders, chain.PolicyManager())
119124
result, _ := sf.Apply(envelope)
120125
if result != filter.Forward {
@@ -166,6 +171,17 @@ func (ds *deliverServer) Handle(srv ab.AtomicBroadcast_DeliverServer) error {
166171
}
167172
}
168173

174+
currentConfigSequence := chain.Sequence()
175+
if currentConfigSequence > lastConfigSequence {
176+
lastConfigSequence = currentConfigSequence
177+
sf := sigfilter.New(policies.ChannelReaders, chain.PolicyManager())
178+
result, _ := sf.Apply(envelope)
179+
if result != filter.Forward {
180+
logger.Warningf("Client authorization revoked for deliver request for channel %s", chdr.ChannelId)
181+
return sendStatusReply(srv, cb.Status_FORBIDDEN)
182+
}
183+
}
184+
169185
block, status := cursor.Next()
170186
if status != cb.Status_SUCCESS {
171187
logger.Errorf("Error reading from channel, cause was: %v", status)

orderer/common/deliver/deliver_test.go

+41
Original file line numberDiff line numberDiff line change
@@ -113,12 +113,17 @@ type mockSupport struct {
113113
ledger ledger.ReadWriter
114114
policyManager *mockpolicies.Manager
115115
erroredChan chan struct{}
116+
configSeq uint64
116117
}
117118

118119
func (mcs *mockSupport) Errored() <-chan struct{} {
119120
return mcs.erroredChan
120121
}
121122

123+
func (mcs *mockSupport) Sequence() uint64 {
124+
return mcs.configSeq
125+
}
126+
122127
func (mcs *mockSupport) PolicyManager() policies.Manager {
123128
return mcs.policyManager
124129
}
@@ -289,6 +294,42 @@ func TestUnauthorizedSeek(t *testing.T) {
289294
}
290295
}
291296

297+
func TestRevokedAuthorizationSeek(t *testing.T) {
298+
mm := newMockMultichainManager()
299+
for i := 1; i < ledgerSize; i++ {
300+
l := mm.chains[systemChainID].ledger
301+
l.Append(ledger.CreateNextBlock(l, []*cb.Envelope{&cb.Envelope{Payload: []byte(fmt.Sprintf("%d", i))}}))
302+
}
303+
304+
m := newMockD()
305+
defer close(m.recvChan)
306+
ds := NewHandlerImpl(mm)
307+
308+
go ds.Handle(m)
309+
310+
m.recvChan <- makeSeek(systemChainID, &ab.SeekInfo{Start: seekSpecified(uint64(ledgerSize - 1)), Stop: seekSpecified(ledgerSize), Behavior: ab.SeekInfo_BLOCK_UNTIL_READY})
311+
312+
select {
313+
case deliverReply := <-m.sendChan:
314+
assert.NotNil(t, deliverReply.GetBlock(), "First should succeed")
315+
case <-time.After(time.Second):
316+
t.Fatalf("Timed out waiting to get all blocks")
317+
}
318+
319+
mm.chains[systemChainID].policyManager.Policy.Err = fmt.Errorf("Fail to evaluate policy")
320+
mm.chains[systemChainID].configSeq++
321+
l := mm.chains[systemChainID].ledger
322+
l.Append(ledger.CreateNextBlock(l, []*cb.Envelope{&cb.Envelope{Payload: []byte(fmt.Sprintf("%d", ledgerSize+1))}}))
323+
324+
select {
325+
case deliverReply := <-m.sendChan:
326+
assert.Equal(t, cb.Status_FORBIDDEN, deliverReply.GetStatus(), "Second should been forbidden ")
327+
case <-time.After(time.Second):
328+
t.Fatalf("Timed out waiting to get all blocks")
329+
}
330+
331+
}
332+
292333
func TestOutOfBoundSeek(t *testing.T) {
293334
m := newMockD()
294335
defer close(m.recvChan)

orderer/multichain/chainsupport.go

+3
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,9 @@ type ChainSupport interface {
9595
broadcast.Support
9696
ConsenterSupport
9797

98+
// Sequence returns the current config sequence number
99+
Sequence() uint64
100+
98101
// ProposeConfigUpdate applies a CONFIG_UPDATE to an existing config to produce a *cb.ConfigEnvelope
99102
ProposeConfigUpdate(env *cb.Envelope) (*cb.ConfigEnvelope, error)
100103
}

0 commit comments

Comments
 (0)