Changes On Branch 52e14c83da72b193

Changes In Branch decompressor2 Through [52e14c83da] Excluding Merge-Ins

This is equivalent to a diff from 1a4bdf36e2 to 52e14c83da

2014-12-22
14:23
Read available bytes and the end of the buffer and decompress in a singe pass check-in: ae0940d072 user: spaskalev tags: decompressor2
2014-12-21
23:26
Closing the decompressor2 branch as this implementation is slower than the naive one. check-in: 52e14c83da user: spaskalev tags: decompressor2
22:52
Removed TODOs, renamed readCount->rc, wrapped->reader check-in: 630530df49 user: spaskalev tags: trunk
22:12
Check in the new decompressor implementation in a separate branch check-in: bd1368b81f user: spaskalev tags: decompressor2
19:38
Added debug/pprof to ease basic cpu profiling check-in: 1a4bdf36e2 user: spaskalev tags: trunk
17:23
Fixed a rare case of losing data from the decompressor's internal result buffer. check-in: 7b74fd57f8 user: spaskalev tags: trunk

Modified src/0dev.org/predictor/predictor.go from [d2a3bd9d21] to [84146b7c8c].

     1      1   // Package predictor implements the predictor compression/decompression algorithm
     2      2   // as specified by RFC1978 - PPP Predictor Compression Protocol
     3      3   package predictor
     4      4   
     5      5   import (
            6  +	bits "0dev.org/bits"
     6      7   	"io"
     7      8   )
     8      9   
     9     10   type context struct {
    10     11   	table [1 << 16]byte
    11     12   	input []byte
    12     13   	hash  uint16
................................................................................
    68     69   				return err
    69     70   			}
    70     71   			// ... and stage the rest of the data in the buffer
    71     72   			ctx.input = append(ctx.input, data[blockSize-bufferLength:]...)
    72     73   			return nil
    73     74   		}
    74     75   
    75         -		// TODO allocate this on ctx.buffer ...
    76     76   		var buf []byte = make([]byte, 1, blockSize+1)
    77     77   		for block := 0; block < len(data)/blockSize; block++ {
    78     78   			for i := 0; i < blockSize; i++ {
    79     79   				var current byte = data[(block*blockSize)+i]
    80     80   				if ctx.table[ctx.hash] == current {
    81     81   					// Guess was right - don't output
    82     82   					buf[0] |= 1 << uint(i)
................................................................................
   115    115   // Required to implement io.Reader
   116    116   func (r decompressor) Read(output []byte) (int, error) {
   117    117   	return r(output)
   118    118   }
   119    119   
   120    120   // Returns an io.Reader implementation that wraps the provided io.Reader
   121    121   // and decompresses data according to the predictor algorithm
   122         -func Decompressor(wrapped io.Reader) io.Reader {
          122  +func Decompressor(reader io.Reader) io.Reader {
   123    123   	var ctx context
   124    124   	ctx.input = make([]byte, 0, 8)
   125    125   
   126    126   	return decompressor(func(output []byte) (int, error) {
   127    127   		var (
   128         -			err       error
   129         -			flags     byte
   130         -			readCount int
          128  +			err                  error
          129  +			flags                byte
          130  +			readCount, available int
   131    131   		)
   132    132   
   133    133   		// Sanity check for space to read into
   134    134   		if len(output) == 0 {
   135    135   			return 0, nil
   136    136   		}
   137    137   
................................................................................
   142    142   			// Check whether we still have leftover data in the buffer :)
   143    143   			if readCount < len(ctx.input) {
   144    144   				ctx.input = ctx.input[:copy(ctx.input, ctx.input[readCount:])]
   145    145   			}
   146    146   			return readCount, nil
   147    147   		}
   148    148   
   149         -		// This is single-iteration only but it is fine according to io.Reader's contract ?!
   150         -		// TODO - read all bytes from a block based on the hamming weight of the flag
   151         -		// and just shuffle them for predictions instead of bite-sized reads ;)
   152         -
   153         -		// Read the flags
   154         -		readCount, err = wrapped.Read(ctx.input[:1])
   155         -		if readCount == 0 || err != nil {
   156         -			return readCount, err
          149  +		// Read the next prediction header
          150  +		readCount, err = reader.Read(ctx.input[:1])
          151  +		// Fail on error unless it is EOF
          152  +		if err != nil && err != io.EOF {
          153  +			return 0, err
          154  +		} else if readCount == 0 {
          155  +			return 0, err
   157    156   		}
   158    157   
          158  +		// Extend the buffer, copy the prediction header
          159  +		//  and calculate the number of subsequent bytes to read
   159    160   		ctx.input = ctx.input[:8]
   160    161   		flags = ctx.input[0]
          162  +		available = 8 - int(bits.Hamming(flags))
   161    163   
   162         -		var i uint = 0
   163         -		for ; i < 8; i++ {
   164         -			if flags&(1<<i) > 0 {
   165         -				// Guess was right
   166         -				ctx.input[i] = ctx.table[ctx.hash]
   167         -			} else {
   168         -				readCount, err = wrapped.Read(ctx.input[i:(i + 1)])
          164  +		// Read the non-predicted bytes according to header.
          165  +		readCount, err = reader.Read(ctx.input[:available])
          166  +	retryData:
          167  +		if readCount < int(available) && err == nil {
          168  +			// Retry the read if we have fewer bytes than what the prediction header indicates
          169  +			var rc int
          170  +			rc, err = reader.Read(ctx.input[readCount:available])
          171  +			readCount += rc
          172  +			goto retryData
          173  +		} // Continue on any error, try to decompress and return it along the result
   169    174   
   170         -				if err == io.EOF {
   171         -					break
   172         -				}
          175  +		// Spread the read bytes right to left to avoid overlapping
          176  +		for i, a := 7, available-1; i >= 0; i-- {
          177  +			if ((flags >> uint(i)) & 1) == 0 {
          178  +				ctx.input[i] = ctx.input[a]
          179  +				a--
          180  +			}
          181  +		}
   173    182   
   174         -				if err != nil {
   175         -					return readCount, err
   176         -				}
   177         -
   178         -				if readCount == 0 { // treat as EoF
   179         -					break
   180         -				}
   181         -
          183  +		// Walk the buffer, fill in the predicted blanks and update the guess table
          184  +		for i := uint(0); i < 8; i++ {
          185  +			if (flags & (1 << i)) > 0 {
          186  +				// Guess succeeded, fill in from the table
          187  +				ctx.input[i] = ctx.table[ctx.hash]
          188  +				readCount++
          189  +			} else {
          190  +				// Guess failed, update the table
   182    191   				ctx.table[ctx.hash] = ctx.input[i]
   183    192   			}
   184         -
          193  +			// Update the hash
   185    194   			ctx.hash = (ctx.hash << 4) ^ uint16(ctx.input[i])
   186    195   		}
   187    196   
   188         -		readCount = copy(output, ctx.input[:i])
          197  +		// readCount now contains the precise amount of populated data
          198  +		ctx.input = ctx.input[:readCount]
          199  +		available = copy(output, ctx.input)
   189    200   
   190         -		// Place any remaining bytes in the buffer
   191         -		if uint(readCount) < i {
   192         -			ctx.input = ctx.input[readCount:i]
          201  +		// Check for remaining bytes that dont fit in the output buffer
          202  +		if available < readCount {
          203  +			ctx.input = ctx.input[:copy(ctx.input, ctx.input[available:])]
   193    204   		} else {
          205  +			// Clear the buffer
   194    206   			ctx.input = ctx.input[:0]
   195    207   		}
   196    208   
   197         -		return readCount, nil
          209  +		return available, err
   198    210   	})
   199    211   }