forked from couchbase/cbgt
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmsg_ring.go
145 lines (122 loc) · 3.46 KB
/
msg_ring.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
// Copyright (c) 2014 Couchbase, Inc.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the
// License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an "AS
// IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
// express or implied. See the License for the specific language
// governing permissions and limitations under the License.
package cbgt
import (
"fmt"
"io"
"sync"
)
// MsgRingMaxSmallBufSize is the cutoff point, in bytes, in which a
// msg ring categorizes a buf as small versus large for reuse.
var MsgRingMaxSmallBufSize = 1024
// MsgRingMaxSmallBufSize is the max pool size for reused buf's.
var MsgRingMaxBufPoolSize = 8
// A MsgRing wraps an io.Writer, and remembers a ring of previous
// writes to the io.Writer. It is concurrent safe and is useful, for
// example, for remembering recent log messages.
type MsgRing struct {
m sync.Mutex
inner io.Writer
Next int `json:"next"`
Msgs [][]byte `json:"msgs"`
SmallBufs [][]byte // Pool of small buffers.
LargeBufs [][]byte // Pool of large buffers.
}
// NewMsgRing returns a MsgRing of a given ringSize.
func NewMsgRing(inner io.Writer, ringSize int) (*MsgRing, error) {
if inner == nil {
return nil, fmt.Errorf("msg_ring: nil inner io.Writer")
}
if ringSize <= 0 {
return nil, fmt.Errorf("msg_ring: non-positive ring size")
}
return &MsgRing{
inner: inner,
Next: 0,
Msgs: make([][]byte, ringSize),
}, nil
}
// Implements the io.Writer interface.
func (m *MsgRing) Write(p []byte) (n int, err error) {
m.m.Lock()
// Recycle the oldMsg into the small-vs-large pools, as long as
// there's enough pool space.
oldMsg := m.Msgs[m.Next]
if oldMsg != nil {
if len(oldMsg) <= MsgRingMaxSmallBufSize {
if len(m.SmallBufs) < MsgRingMaxBufPoolSize {
m.SmallBufs = append(m.SmallBufs)
}
} else {
if len(m.LargeBufs) < MsgRingMaxBufPoolSize {
m.LargeBufs = append(m.LargeBufs)
}
}
}
// Allocate a new buf or recycled buf from the pools.
var buf []byte
if len(p) <= MsgRingMaxSmallBufSize {
if len(m.SmallBufs) > 0 {
buf = m.SmallBufs[len(m.SmallBufs)-1]
m.SmallBufs = m.SmallBufs[0 : len(m.SmallBufs)-1]
}
} else {
// Although we wastefully throw away any cached large bufs
// that aren't large enough, this simple approach doesn't
// "learn" the wrong large buf size.
for len(m.LargeBufs) > 0 && buf == nil {
largeBuf := m.LargeBufs[len(m.LargeBufs)-1]
m.LargeBufs = m.LargeBufs[0 : len(m.LargeBufs)-1]
if len(p) <= cap(largeBuf) {
buf = largeBuf
}
}
}
if buf == nil {
buf = make([]byte, len(p))
}
copy(buf[0:len(p)], p)
m.Msgs[m.Next] = buf
m.Next += 1
if m.Next >= len(m.Msgs) {
m.Next = 0
}
m.m.Unlock()
return m.inner.Write(p)
}
// Retrieves the recent writes to the MsgRing.
func (m *MsgRing) Messages() [][]byte {
rv := make([][]byte, 0, len(m.Msgs))
m.m.Lock()
// Pre-alloc a buf to hold a copy of all msgs.
bufSize := 0
for _, msg := range m.Msgs {
bufSize += len(msg)
}
buf := make([]byte, 0, bufSize)
n := len(m.Msgs)
i := 0
idx := m.Next
for i < n {
if msg := m.Msgs[idx]; msg != nil {
bufLen := len(buf)
buf = append(buf, msg...)
rv = append(rv, buf[bufLen:])
}
idx += 1
if idx >= n {
idx = 0
}
i += 1
}
m.m.Unlock()
return rv
}