predictor.go at [ffd1ab7b0c]

File src/0dev.org/predictor/predictor.go artifact 1bc0c5d728 part of check-in ffd1ab7b0c


// Package predictor implements the predictor compression/decompression algorithm
// as specified by RFC1978 - PPP Predictor Compression Protocol
package predictor

import (
	bits "0dev.org/bits"
	iou "0dev.org/ioutil"
	"io"
)

type context struct {
	table [1 << 16]byte
	input []byte
	hash  uint16
}

// The following hash code is the heart of the algorithm:
// It builds a sliding hash sum of the previous 3-and-a-bit
// characters which will be used to index the guess table.
// A better hash function would result in additional compression,
// at the expense of time.
func (ctx *context) update(val byte) {
	ctx.hash = (ctx.hash << 4) ^ uint16(val)
}

// Returns an io.Writer implementation that wraps the provided io.Writer
// and compresses data according to the predictor algorithm
//
// It can buffer data as the predictor mandates 8-byte blocks with a header.
// A call with no data will force a flush.
func Compressor(writer io.Writer) io.Writer {
	var ctx context
	ctx.input = make([]byte, 0, 8)

	// Forward declaration as it is required for recursion
	var write iou.WriterFunc

	write = func(data []byte) (int, error) {
		var (
			blockSize    int = 8
			bufferLength int = len(ctx.input)
			datalength   int = len(data)
		)

		// Force a flush if we are called with no data to write
		if datalength == 0 {
			// Nothing to flush if the buffer is empty though
			if len(ctx.input) == 0 {
				return 0, nil
			}
			// We can't have more than 7 bytes in the buffer so this is safe
			data, datalength = ctx.input, len(ctx.input)
			blockSize, bufferLength = datalength, 0
		}

		// Check if there are pending bytes in the buffer
		if datalength < blockSize || bufferLength > 0 {

			// If the current buffer + new data can fit into a block
			if (datalength + bufferLength) <= blockSize {
				ctx.input = append(ctx.input, data...)

				// Flush the block if the buffer fills it
				if len(ctx.input) == blockSize {
					return write(nil)
				}
				// ... otherwise just return
				return datalength, nil
			}

			// The current buffer + new data overflow the block size
			// Complete the block, flush it ...
			ctx.input = append(ctx.input, data[:blockSize-bufferLength]...)
			if c, err := write(nil); err != nil {
				return c, err
			}
			// ... and stage the rest of the data in the buffer
			ctx.input = append(ctx.input, data[blockSize-bufferLength:]...)
			return datalength, nil
		}

		var buf []byte = make([]byte, 1, blockSize+1)
		for block := 0; block < datalength/blockSize; block++ {
			for i := 0; i < blockSize; i++ {
				var current byte = data[(block*blockSize)+i]
				if ctx.table[ctx.hash] == current {
					// Guess was right - don't output
					buf[0] |= 1 << uint(i)
				} else {
					// Guess was wrong, output char
					ctx.table[ctx.hash] = current
					buf = append(buf, current)
				}
				ctx.update(current)
			}

			if c, err := writer.Write(buf); err != nil {
				return (block * blockSize) + c, err
			}

			// Reset the flags and buffer for the next iteration
			buf, buf[0] = buf[:1], 0
		}

		if remaining := datalength % blockSize; remaining > 0 {
			ctx.input = ctx.input[:remaining]
			copy(ctx.input, data[datalength-remaining:])
		} else {
			ctx.input = ctx.input[:0]
		}

		return datalength, nil
	}

	return write
}

// Returns an io.Reader implementation that wraps the provided io.Reader
// and decompresses data according to the predictor algorithm
func Decompressor(reader io.Reader) io.Reader {
	var ctx context
	ctx.input = make([]byte, 0, 8)

	return iou.SizedReader(iou.ReaderFunc(func(output []byte) (int, error) {
		var (
			err               error
			flags, predicted  byte
			rc, total, copied int
		)

		// Read the next prediction header
	readHeader:
		rc, err = reader.Read(ctx.input[:1])
		// Fail on error unless it is EOF
		if err != nil && err != io.EOF {
			return total, err
		} else if rc == 0 {
			return total, err
		}

		// Extend the buffer, copy the prediction header
		//  and calculate the number of subsequent bytes to read
		ctx.input = ctx.input[:8]
		flags = ctx.input[0]
		predicted = bits.Hamming(flags)

		// Read the non-predicted bytes and place them in the end of the buffer
		rc, err = reader.Read(ctx.input[predicted:])
	retryData:
		if rc < int(8-predicted) && err == nil {
			// Retry the read if we have fewer bytes than what the prediction header indicates
			var r int
			r, err = reader.Read(ctx.input[int(predicted)+rc:])
			rc += r
			goto retryData
		} // Continue on any error, try to decompress and return it along the result

		// rc now contains the amount of actual bytes in this cycle (usually 8)
		rc += int(predicted)

		// Walk the buffer, filling in the predicted blanks,
		// relocating read bytes and and updating the guess table
		for i, a := 0, predicted; i < rc; i++ {
			if (flags & (1 << uint(i))) > 0 {
				// Guess succeeded, fill in from the table
				ctx.input[i] = ctx.table[ctx.hash]
			} else {
				// Relocate a read byte and advance the read byte index
				ctx.input[i], a = ctx.input[a], a+1
				// Guess failed, update the table
				ctx.table[ctx.hash] = ctx.input[i]
			}
			// Update the hash
			ctx.update(ctx.input[i])
		}

		// Copy the decompressed data to the output and accumulate the count
		copied = copy(output, ctx.input[:rc])
		total += copied

		// Clear the buffer
		ctx.input = ctx.input[:0]

		// Loop for another pass if there is available space in the output
		output = output[copied:]
		if len(output) > 0 && err == nil {
			goto readHeader
		}

		return total, err
	}), 8)
}