@@ -34,6 +34,7 @@ var ErrUnexpectedEndOfBlockfile = errors.New("unexpected end of blockfile")
34
34
// blockfileStream reads blocks sequentially from a single file.
35
35
// It starts from the given offset and can traverse till the end of the file
36
36
type blockfileStream struct {
37
+ fileNum int
37
38
file * os.File
38
39
reader * bufio.Reader
39
40
currentOffset int64
@@ -49,10 +50,19 @@ type blockStream struct {
49
50
currentFileStream * blockfileStream
50
51
}
51
52
53
+ // blockPlacementInfo captures the information related
54
+ // to block's placement in the file.
55
+ type blockPlacementInfo struct {
56
+ fileNum int
57
+ blockStartOffset int64
58
+ blockBytesOffset int64
59
+ }
60
+
52
61
///////////////////////////////////
53
62
// blockfileStream functions
54
63
////////////////////////////////////
55
- func newBlockfileStream (filePath string , startOffset int64 ) (* blockfileStream , error ) {
64
+ func newBlockfileStream (rootDir string , fileNum int , startOffset int64 ) (* blockfileStream , error ) {
65
+ filePath := deriveBlockfilePath (rootDir , fileNum )
56
66
logger .Debugf ("newBlockfileStream(): filePath=[%s], startOffset=[%d]" , filePath , startOffset )
57
67
var file * os.File
58
68
var err error
@@ -68,41 +78,50 @@ func newBlockfileStream(filePath string, startOffset int64) (*blockfileStream, e
68
78
panic (fmt .Sprintf ("Could not seek file [%s] to given startOffset [%d]. New position = [%d]" ,
69
79
filePath , startOffset , newPosition ))
70
80
}
71
- s := & blockfileStream {file , bufio .NewReader (file ), startOffset }
81
+ s := & blockfileStream {fileNum , file , bufio .NewReader (file ), startOffset }
72
82
return s , nil
73
83
}
74
84
75
85
func (s * blockfileStream ) nextBlockBytes () ([]byte , error ) {
86
+ blockBytes , _ , err := s .nextBlockBytesAndPlacementInfo ()
87
+ return blockBytes , err
88
+ }
89
+
90
+ func (s * blockfileStream ) nextBlockBytesAndPlacementInfo () ([]byte , * blockPlacementInfo , error ) {
76
91
var lenBytes []byte
77
92
var err error
78
93
if lenBytes , err = s .reader .Peek (8 ); err != nil {
79
94
// reader.Peek raises io.EOF error if enough bytes not available
80
95
if err == io .EOF {
81
96
if len (lenBytes ) > 0 {
82
- return nil , ErrUnexpectedEndOfBlockfile
97
+ return nil , nil , ErrUnexpectedEndOfBlockfile
83
98
}
84
- return nil , nil
99
+ return nil , nil , nil
85
100
}
86
- return nil , err
101
+ return nil , nil , err
87
102
}
88
103
len , n := proto .DecodeVarint (lenBytes )
89
104
if n == 0 {
90
105
panic (fmt .Errorf ("Error in decoding varint bytes" ))
91
106
}
92
107
if _ , err = s .reader .Discard (n ); err != nil {
93
- return nil , err
108
+ return nil , nil , err
94
109
}
95
110
blockBytes := make ([]byte , len )
96
111
if _ , err = io .ReadAtLeast (s .reader , blockBytes , int (len )); err != nil {
97
112
// io.ReadAtLeast raises io.ErrUnexpectedEOF error if it is able to
98
113
// read a fewer (non-zero) bytes and io.EOF is encountered
99
114
if err == io .ErrUnexpectedEOF {
100
- return nil , ErrUnexpectedEndOfBlockfile
115
+ return nil , nil , ErrUnexpectedEndOfBlockfile
101
116
}
102
- return nil , err
117
+ return nil , nil , err
103
118
}
119
+ blockPlacementInfo := & blockPlacementInfo {
120
+ fileNum : s .fileNum ,
121
+ blockStartOffset : s .currentOffset ,
122
+ blockBytesOffset : s .currentOffset + int64 (n )}
104
123
s .currentOffset += int64 (n ) + int64 (len )
105
- return blockBytes , nil
124
+ return blockBytes , blockPlacementInfo , nil
106
125
}
107
126
108
127
func (s * blockfileStream ) close () error {
@@ -113,8 +132,7 @@ func (s *blockfileStream) close() error {
113
132
// blockStream functions
114
133
////////////////////////////////////
115
134
func newBlockStream (rootDir string , startFileNum int , startOffset int64 , endFileNum int ) (* blockStream , error ) {
116
- startFile := deriveBlockfilePath (rootDir , startFileNum )
117
- startFileStream , err := newBlockfileStream (startFile , startOffset )
135
+ startFileStream , err := newBlockfileStream (rootDir , startFileNum , startOffset )
118
136
if err != nil {
119
137
return nil , err
120
138
}
@@ -127,30 +145,35 @@ func (s *blockStream) moveToNextBlockfileStream() error {
127
145
return err
128
146
}
129
147
s .currentFileNum ++
130
- nextFile := deriveBlockfilePath (s .rootDir , s .currentFileNum )
131
- if s .currentFileStream , err = newBlockfileStream (nextFile , 0 ); err != nil {
148
+ if s .currentFileStream , err = newBlockfileStream (s .rootDir , s .currentFileNum , 0 ); err != nil {
132
149
return err
133
150
}
134
151
return nil
135
152
}
136
153
137
154
func (s * blockStream ) nextBlockBytes () ([]byte , error ) {
155
+ blockBytes , _ , err := s .nextBlockBytesAndPlacementInfo ()
156
+ return blockBytes , err
157
+ }
158
+
159
+ func (s * blockStream ) nextBlockBytesAndPlacementInfo () ([]byte , * blockPlacementInfo , error ) {
138
160
var blockBytes []byte
161
+ var blockPlacementInfo * blockPlacementInfo
139
162
var err error
140
- if blockBytes , err = s .currentFileStream .nextBlockBytes (); err != nil {
163
+ if blockBytes , blockPlacementInfo , err = s .currentFileStream .nextBlockBytesAndPlacementInfo (); err != nil {
141
164
logger .Debugf ("current file [%d]" , s .currentFileNum )
142
165
logger .Debugf ("blockbytes [%d]. Err:%s" , len (blockBytes ), err )
143
- return nil , err
166
+ return nil , nil , err
144
167
}
145
168
logger .Debugf ("blockbytes [%d] read from file [%d]" , len (blockBytes ), s .currentFileNum )
146
169
if blockBytes == nil && s .currentFileNum < s .endFileNum {
147
170
logger .Debugf ("current file [%d] exhausted. Moving to next file" , s .currentFileNum )
148
171
if err = s .moveToNextBlockfileStream (); err != nil {
149
- return nil , err
172
+ return nil , nil , err
150
173
}
151
- return s .nextBlockBytes ()
174
+ return s .nextBlockBytesAndPlacementInfo ()
152
175
}
153
- return blockBytes , nil
176
+ return blockBytes , blockPlacementInfo , nil
154
177
}
155
178
156
179
func (s * blockStream ) close () error {
0 commit comments