Check-in [38f8e62c81]
Overview
SHA1:38f8e62c81fb3daca174e4a8e6c6a71bea9c5e0b
Date: 2014-12-23 18:52:46
User: spaskalev
Comment:Renamed MinReader to BlockReader. The later is now used by predictor's decompressor to simplify the code and deal away with the need for internal buffering.
Timelines: family | ancestors | descendants | both | trunk
Downloads: Tarball | ZIP archive
Other Links: files | file ages | folders | manifest
Tags And Properties
Context
2014-12-23
19:18
[b0ff11dfcd] Fixing ioutil tests to compile :) (user: spaskalev, tags: trunk)
18:52
[38f8e62c81] Renamed MinReader to BlockReader. The later is now used by predictor's decompressor to simplify the code and deal away with the need for internal buffering. (user: spaskalev, tags: trunk)
18:39
[70896f73e9] Additional fixes and code simplification for MinReader (user: spaskalev, tags: trunk)
Changes

Modified src/0dev.org/ioutil/ioutil.go from [3d22f83cbe] to [f63ac1dff8].

     1      1   // Package ioutil contains various constructs for io operations
     2      2   package ioutil
     3      3   
     4      4   import (
     5         -	//"fmt"
     6      5   	"io"
     7      6   )
     8      7   
     9      8   // An function alias type that implements io.Writer
    10      9   type WriterFunc func([]byte) (int, error)
    11     10   
    12     11   // Delegates the call to the WriterFunc while implementing io.Writer
................................................................................
    20     19   // Delegates the call to the WriterFunc while implementing io.Reader
    21     20   func (r ReaderFunc) Read(b []byte) (int, error) {
    22     21   	return r(b)
    23     22   }
    24     23   
    25     24   // Returns a reader that will delegate calls to Read(...) while ensuring
    26     25   // that the output buffer will never be smaller than the required size
    27         -func MinReader(reader io.Reader, size int) io.Reader {
           26  +// and will be downsized to a multiple of the required size if larger
           27  +func BlockReader(reader io.Reader, size int) io.Reader {
    28     28   	var buffer []byte = make([]byte, 0, size)
    29     29   
    30     30   	return ReaderFunc(func(output []byte) (int, error) {
    31     31   		var (
    32     32   			readCount int
    33     33   			err       error
    34     34   		)
    35     35   	start:
    36         -		//fmt.Println("Requesting read with length ", len(output), "buffer's length is ", len(buffer))
    37         -
    38     36   		// Reply with the buffered data if there is any
    39     37   		if len(buffer) > 0 {
    40     38   			readCount = copy(output, buffer)
    41     39   
    42     40   			// Advance the data in the buffer
    43     41   			buffer = buffer[:copy(buffer, buffer[readCount:])]
    44     42   
    45         -			//fmt.Println("After buffer read - buffer lenght is", len(buffer))
    46         -
           43  +			// Return count and error if we have read the whole buffer
    47     44   			if len(buffer) == 0 {
    48     45   				return readCount, err
    49     46   			}
    50     47   
    51     48   			// Do not propagate an error until the buffer is exhausted
    52     49   			return readCount, nil
    53     50   		}
    54     51   
    55     52   		// Delegate if the buffer is empty and the destination buffer is large enough
    56     53   		if len(output) >= size {
    57         -			//fmt.Println("Delegating read for output length ", len(output), " and size ", size)
    58     54   			return reader.Read(output[:(len(output)/size)*size])
    59     55   		}
    60     56   
    61     57   		// Perform a read into the buffer
    62     58   		readCount, err = reader.Read(buffer[:size])
    63     59   
    64         -		// Size the buffer down to the read data size and restart
           60  +		// Size the buffer down to the read data size
           61  +		// and restart if we have successfully read some bytes
    65     62   		buffer = buffer[:readCount]
    66         -
    67         -		//fmt.Println("Read into buffer: ", len(buffer), "bytes")
    68         -
    69     63   		if len(buffer) > 0 {
    70     64   			goto start
    71     65   		}
           66  +
           67  +		// Returning on err/misbehaving noop reader
    72     68   		return 0, err
    73     69   	})
    74     70   }

Modified src/0dev.org/predictor/predictor.go from [7a1990432a] to [625ed7e8bc].

   117    117   
   118    118   // Returns an io.Reader implementation that wraps the provided io.Reader
   119    119   // and decompresses data according to the predictor algorithm
   120    120   func Decompressor(reader io.Reader) io.Reader {
   121    121   	var ctx context
   122    122   	ctx.input = make([]byte, 0, 8)
   123    123   
   124         -	return iou.ReaderFunc(func(output []byte) (int, error) {
          124  +	return iou.BlockReader(iou.ReaderFunc(func(output []byte) (int, error) {
   125    125   		var (
   126    126   			err               error
   127    127   			flags, predicted  byte
   128    128   			rc, total, copied int
   129    129   		)
   130    130   
   131         -		// Sanity check for space to read into
   132         -		if len(output) == 0 {
   133         -			return 0, nil
   134         -		}
   135         -
   136         -		// Check whether we have leftover data in the buffer
   137         -		if len(ctx.input) > 0 {
   138         -			rc = copy(output, ctx.input)
   139         -
   140         -			// Check whether we still have leftover data in the buffer :)
   141         -			if rc < len(ctx.input) {
   142         -				// Shift the remaining bytes at the start of the buffer
   143         -				//  and resize the buffer accordingly
   144         -				ctx.input = ctx.input[:copy(ctx.input, ctx.input[rc:])]
   145         -			}
   146         -			return rc, nil
   147         -		}
   148         -
   149    131   		// Read the next prediction header
   150    132   	readHeader:
   151    133   		rc, err = reader.Read(ctx.input[:1])
   152    134   		// Fail on error unless it is EOF
   153    135   		if err != nil && err != io.EOF {
   154    136   			return total, err
   155    137   		} else if rc == 0 {
................................................................................
   192    174   			ctx.update(ctx.input[i])
   193    175   		}
   194    176   
   195    177   		// Copy the decompressed data to the output and accumulate the count
   196    178   		copied = copy(output, ctx.input[:rc])
   197    179   		total += copied
   198    180   
   199         -		// Check for remaining bytes that dont fit in the output buffer
   200         -		if copied < rc {
   201         -			// Shift the remaining bytes at the start of the buffer
   202         -			//  and resize the buffer accordingly
   203         -			ctx.input = ctx.input[:copy(ctx.input, ctx.input[copied:rc])]
   204         -		} else {
   205         -			// Clear the buffer
   206         -			ctx.input = ctx.input[:0]
          181  +		// Clear the buffer
          182  +		ctx.input = ctx.input[:0]
   207    183   
   208         -			// Loop for another pass if there is available space in the output
   209         -			output = output[copied:]
   210         -			if len(output) > 0 && err == nil {
   211         -				goto readHeader
   212         -			}
          184  +		// Loop for another pass if there is available space in the output
          185  +		output = output[copied:]
          186  +		if len(output) > 0 && err == nil {
          187  +			goto readHeader
   213    188   		}
   214    189   
   215    190   		return total, err
   216         -	})
          191  +	}), 8)
   217    192   }