Index: src/0dev.org/predictor/predictor.go ================================================================== --- src/0dev.org/predictor/predictor.go +++ src/0dev.org/predictor/predictor.go @@ -6,13 +6,14 @@ bits "0dev.org/bits" iou "0dev.org/ioutil" "io" ) +// The context struct contains the predictor's algorithm guess table +// and the current value of its input/output hash type context struct { table [1 << 16]byte - input []byte hash uint16 } // The following hash code is the heart of the algorithm: // It builds a sliding hash sum of the previous 3-and-a-bit @@ -27,123 +28,142 @@ // and compresses data according to the predictor algorithm // // It can buffer data as the predictor mandates 8-byte blocks with a header. // A call with no data will force a flush. func Compressor(writer io.Writer) io.Writer { - var ctx context - - return iou.SizedWriter(iou.WriterFunc(func(data []byte) (int, error) { - var ( - blockSize int = 8 - datalength int = len(data) - ) - - if datalength == 0 { - return 0, nil - } - - if datalength < blockSize { - blockSize = datalength - } - - var buf []byte = make([]byte, 1, blockSize+1) - for block := 0; block < datalength/blockSize; block++ { - for i := 0; i < blockSize; i++ { - var current byte = data[(block*blockSize)+i] - if ctx.table[ctx.hash] == current { - // Guess was right - don't output - buf[0] |= 1 << uint(i) - } else { - // Guess was wrong, output char - ctx.table[ctx.hash] = current - buf = append(buf, current) - } - ctx.update(current) - } - - if c, err := writer.Write(buf); err != nil { - return (block * blockSize) + c, err - } - - // Reset the flags and buffer for the next iteration - buf, buf[0] = buf[:1], 0 - } - - return datalength, nil - }), 8) + var cmp compressor + cmp.Writer = iou.SizedWriter(iou.WriterFunc(cmp.compress), 8) + cmp.target = writer + return &cmp +} + +type compressor struct { + context + io.Writer + target io.Writer +} + +func (ctx *compressor) compress(data []byte) (int, error) { + var ( + blockSize int = 8 + datalength int = len(data) + ) + + if datalength == 0 { + return 0, nil + } + + if datalength < blockSize { + blockSize = datalength + } + + var buf []byte = make([]byte, 1, blockSize+1) + for block := 0; block < datalength/blockSize; block++ { + for i := 0; i < blockSize; i++ { + var current byte = data[(block*blockSize)+i] + if ctx.table[ctx.hash] == current { + // Guess was right - don't output + buf[0] |= 1 << uint(i) + } else { + // Guess was wrong, output char + ctx.table[ctx.hash] = current + buf = append(buf, current) + } + ctx.update(current) + } + + if c, err := ctx.target.Write(buf); err != nil { + return (block * blockSize) + c, err + } + + // Reset the flags and buffer for the next iteration + buf, buf[0] = buf[:1], 0 + } + + return datalength, nil } // Returns an io.Reader implementation that wraps the provided io.Reader // and decompresses data according to the predictor algorithm func Decompressor(reader io.Reader) io.Reader { - var ctx context - ctx.input = make([]byte, 0, 8) - - return iou.SizedReader(iou.ReaderFunc(func(output []byte) (int, error) { - var ( - err error - flags, predicted byte - rc, total, copied int - ) - - // Read the next prediction header - readHeader: - rc, err = reader.Read(ctx.input[:1]) - // Fail on error unless it is EOF - if err != nil && err != io.EOF { - return total, err - } else if rc == 0 { - return total, err - } - - // Extend the buffer, copy the prediction header - // and calculate the number of subsequent bytes to read - ctx.input = ctx.input[:8] - flags = ctx.input[0] - predicted = bits.Hamming(flags) - - // Read the non-predicted bytes and place them in the end of the buffer - rc, err = reader.Read(ctx.input[predicted:]) - retryData: - if rc < int(8-predicted) && err == nil { - // Retry the read if we have fewer bytes than what the prediction header indicates - var r int - r, err = reader.Read(ctx.input[int(predicted)+rc:]) - rc += r - goto retryData - } // Continue on any error, try to decompress and return it along the result - - // rc now contains the amount of actual bytes in this cycle (usually 8) - rc += int(predicted) - - // Walk the buffer, filling in the predicted blanks, - // relocating read bytes and and updating the guess table - for i, a := 0, predicted; i < rc; i++ { - if (flags & (1 << uint(i))) > 0 { - // Guess succeeded, fill in from the table - ctx.input[i] = ctx.table[ctx.hash] - } else { - // Relocate a read byte and advance the read byte index - ctx.input[i], a = ctx.input[a], a+1 - // Guess failed, update the table - ctx.table[ctx.hash] = ctx.input[i] - } - // Update the hash - ctx.update(ctx.input[i]) - } - - // Copy the decompressed data to the output and accumulate the count - copied = copy(output, ctx.input[:rc]) - total += copied - - // Clear the buffer - ctx.input = ctx.input[:0] - - // Loop for another pass if there is available space in the output - output = output[copied:] - if len(output) > 0 && err == nil { - goto readHeader - } - - return total, err - }), 8) + var dcmp decompressor + dcmp.Reader = iou.SizedReader(iou.ReaderFunc(dcmp.decompress), 8) + dcmp.source = reader + dcmp.input = make([]byte, 0, 8) + return &dcmp +} + +type decompressor struct { + context + io.Reader + source io.Reader + input []byte +} + +func (ctx *decompressor) decompress(output []byte) (int, error) { + var ( + err error + flags, predicted byte + rc, total, copied int + ) + + // Read the next prediction header +readHeader: + rc, err = ctx.source.Read(ctx.input[:1]) + // Fail on error unless it is EOF + if err != nil && err != io.EOF { + return total, err + } else if rc == 0 { + return total, err + } + + // Extend the buffer, copy the prediction header + // and calculate the number of subsequent bytes to read + ctx.input = ctx.input[:8] + flags = ctx.input[0] + predicted = bits.Hamming(flags) + + // Read the non-predicted bytes and place them in the end of the buffer + rc, err = ctx.source.Read(ctx.input[predicted:]) +retryData: + if rc < int(8-predicted) && err == nil { + // Retry the read if we have fewer bytes than what the prediction header indicates + var r int + r, err = ctx.source.Read(ctx.input[int(predicted)+rc:]) + rc += r + goto retryData + } // Continue on any error, try to decompress and return it along the result + + // rc now contains the amount of actual bytes in this cycle (usually 8) + rc += int(predicted) + + // Walk the buffer, filling in the predicted blanks, + // relocating read bytes and and updating the guess table + for i, a := 0, predicted; i < rc; i++ { + if (flags & (1 << uint(i))) > 0 { + // Guess succeeded, fill in from the table + ctx.input[i] = ctx.table[ctx.hash] + } else { + // Relocate a read byte and advance the read byte index + ctx.input[i], a = ctx.input[a], a+1 + // Guess failed, update the table + ctx.table[ctx.hash] = ctx.input[i] + } + // Update the hash + ctx.update(ctx.input[i]) + } + + // Copy the decompressed data to the output and accumulate the count + copied = copy(output, ctx.input[:rc]) + total += copied + + // Clear the buffer + ctx.input = ctx.input[:0] + + // Loop for another pass if there is available space in the output + output = output[copied:] + if len(output) > 0 && err == nil { + goto readHeader + } + + return total, err }