@@ -87,40 +87,65 @@ func (s *blockfileStream) nextBlockBytes() ([]byte, error) {
87
87
return blockBytes , err
88
88
}
89
89
90
+ // nextBlockBytesAndPlacementInfo returns bytes for the next block
91
+ // along with the offset information in the block file.
92
+ // An error `ErrUnexpectedEndOfBlockfile` is returned if a partial written data is detected
93
+ // which is possible towards the tail of the file if a crash had taken place during appending of a block
90
94
func (s * blockfileStream ) nextBlockBytesAndPlacementInfo () ([]byte , * blockPlacementInfo , error ) {
91
95
var lenBytes []byte
92
96
var err error
93
- if lenBytes , err = s .reader .Peek (8 ); err != nil {
94
- // reader.Peek raises io.EOF error if enough bytes not available
95
- if err == io .EOF {
96
- if len (lenBytes ) > 0 {
97
- return nil , nil , ErrUnexpectedEndOfBlockfile
98
- }
99
- return nil , nil , nil
100
- }
97
+ var fileInfo os.FileInfo
98
+ moreContentAvailable := true
99
+
100
+ if fileInfo , err = s .file .Stat (); err != nil {
101
101
return nil , nil , err
102
102
}
103
- len , n := proto . DecodeVarint ( lenBytes )
104
- if n == 0 {
105
- panic ( fmt . Errorf ( "Error in decoding varint bytes" ))
103
+ if s . currentOffset == fileInfo . Size () {
104
+ logger . Debugf ( "Finished reading file number [%d]" , s . fileNum )
105
+ return nil , nil , nil
106
106
}
107
- if _ , err = s .reader .Discard (n ); err != nil {
107
+ remainingBytes := fileInfo .Size () - s .currentOffset
108
+ // Peek 8 or smaller number of bytes (if remaining bytes are less than 8)
109
+ // Assumption is that a block size would be small enough to be represented in 8 bytes varint
110
+ peekBytes := 8
111
+ if remainingBytes < int64 (peekBytes ) {
112
+ peekBytes = int (remainingBytes )
113
+ moreContentAvailable = false
114
+ }
115
+ logger .Debugf ("Remaining bytes=[%d], Going to peek [%d] bytes" , remainingBytes , peekBytes )
116
+ if lenBytes , err = s .reader .Peek (peekBytes ); err != nil {
108
117
return nil , nil , err
109
118
}
110
- blockBytes := make ([] byte , len )
111
- if _ , err = io . ReadAtLeast ( s . reader , blockBytes , int ( len )); err != nil {
112
- // io.ReadAtLeast raises io.ErrUnexpectedEOF error if it is able to
113
- // read a fewer (non-zero) bytes and io.EOF is encountered
114
- if err == io . ErrUnexpectedEOF {
119
+ length , n := proto . DecodeVarint ( lenBytes )
120
+ if n == 0 {
121
+ // proto.DecodeVarint did not consume any byte at all which means that the bytes
122
+ // representing the size of the block are partial bytes
123
+ if ! moreContentAvailable {
115
124
return nil , nil , ErrUnexpectedEndOfBlockfile
116
125
}
126
+ panic (fmt .Errorf ("Error in decoding varint bytes [%#v]" , lenBytes ))
127
+ }
128
+ bytesExpected := int64 (n ) + int64 (length )
129
+ if bytesExpected > remainingBytes {
130
+ logger .Debugf ("At least [%d] bytes expected. Remaining bytes = [%d]. Returning with error [%s]" ,
131
+ bytesExpected , remainingBytes , ErrUnexpectedEndOfBlockfile )
132
+ return nil , nil , ErrUnexpectedEndOfBlockfile
133
+ }
134
+ // skip the bytes representing the block size
135
+ if _ , err = s .reader .Discard (n ); err != nil {
136
+ return nil , nil , err
137
+ }
138
+ blockBytes := make ([]byte , length )
139
+ if _ , err = io .ReadAtLeast (s .reader , blockBytes , int (length )); err != nil {
140
+ logger .Debugf ("Error while trying to read [%d] bytes from fileNum [%d]: %s" , length , s .fileNum , err )
117
141
return nil , nil , err
118
142
}
119
143
blockPlacementInfo := & blockPlacementInfo {
120
144
fileNum : s .fileNum ,
121
145
blockStartOffset : s .currentOffset ,
122
146
blockBytesOffset : s .currentOffset + int64 (n )}
123
- s .currentOffset += int64 (n ) + int64 (len )
147
+ s .currentOffset += int64 (n ) + int64 (length )
148
+ logger .Debugf ("Returning blockbytes - length=[%d], placementInfo={%s}" , len (blockBytes ), blockPlacementInfo )
124
149
return blockBytes , blockPlacementInfo , nil
125
150
}
126
151
@@ -179,3 +204,8 @@ func (s *blockStream) nextBlockBytesAndPlacementInfo() ([]byte, *blockPlacementI
179
204
func (s * blockStream ) close () error {
180
205
return s .currentFileStream .close ()
181
206
}
207
+
208
+ func (i * blockPlacementInfo ) String () string {
209
+ return fmt .Sprintf ("fileNum=[%d], startOffset=[%d], bytesOffset=[%d]" ,
210
+ i .fileNum , i .blockStartOffset , i .blockBytesOffset )
211
+ }
0 commit comments