#### libremiliacr
#### Copyright(C) 2020-2024 Remilia Scarlet <remilia@posteo.jp>
####
#### This program is free software: you can redistribute it and/or modify
#### it under the terms of the GNU General Public License as published
#### the Free Software Foundation, either version 3 of the License, or
#### (at your option) any later version.
####
#### This program is distributed in the hope that it will be useful,
#### but WITHOUT ANY WARRANTY; without even the implied warranty of
#### MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
#### GNU General Public License for more details.
####
#### You should have received a copy of the GNU General Public License
#### along with this program.If not, see<http:####www.gnu.org/licenses/.>
require "math"
module RemiLib
# Encapsulates an `IO` to enable the reading of individual bits.
class BitReader
class NotOnByteError < Exception
end
@io : IO
@bitpos : UInt8 = 0
@byte : UInt8? = nil
# Creates a new `BitReader` that will read data from `stream`. This will
# always call `IO#read_byte` exactly once to populate the initial `#byte`.
def initialize(stream : IO)
@io = stream
@byte = @io.read_byte || raise IO::EOFError.new("No initial byte to read")
end
# Reinitializes this `BitReader` with a new `IO`. This completely resets
# this instance. This will always call `IO#read_byte` exactly once to
# populate the initial `#byte`.
def reinitialize(stream : IO)
@bitpos = 0u8
@io = stream
@byte = @io.read_byte || raise IO::EOFError.new("No initial byte to read")
end
# Rewinds the stream to the beginning so that the next bit read is the very
# first bit of the stream. Returns `self`.
def rewind : self
self.pos = 0
end
# Returns the current bit position within the current byte.
@[AlwaysInline]
def bitpos
@bitpos % 8
end
# Returns the current byte position in the underlying `IO`.
@[AlwaysInline]
def pos
@io.pos - 1
end
# Returns the current byte position in the underlying `IO`. Returns `self`.
@[AlwaysInline]
def pos=(value) : self
@io.pos = value
@bitpos = 0u8
@byte = @io.read_byte || raise IO::EOFError.new("No initial byte to read")
self
end
# Returns the current byte that the `BitReader` is reading bits from.
@[AlwaysInline]
def byte : UInt8
if @byte.nil?
raise IO::EOFError.new("No bytes left to read")
else
@byte.not_nil!
end
end
# Returns the current byte that the `BitReader` is reading bits from, or
# `nil` if were no more bytes to read.
@[AlwaysInline]
def byte? : UInt8?
@byte
end
# Sets what the `BitReader` considers the last byte read. This does not
# affect the underlying stream.
@[AlwaysInline]
def byte=(val : UInt8)
@byte = val
end
# Reads `count` bits, then returns the value as an `Int64`.
def read(count : Int) : Int64
ret : Int64 = 0
# Try to easy out
if count == 8 && @bitpos == 0
ret = @byte.not_nil!.to_i64!
@byte = @io.read_byte
return ret
end
bitsToRead = 0
while count > 0
bitsToRead = Math.min(count, 8 - @bitpos)
if count > 0 && @byte.nil?
raise IO::EOFError.new("End of stream, no more bits")
end
count -= bitsToRead
@bitpos += bitsToRead
ret |= ((@byte.not_nil! >> (8 - @bitpos)) & (0xFF_u8 >> (8 - bitsToRead))).to_i64! << count
if @bitpos >= 8
@bitpos = 0
@byte = @io.read_byte
end
end
ret
end
# Try to read `count` bits, then returns the value as an `Int64?`. If there
# were not `count` bits available, this returns `nil`.
def read?(count : Int) : Int64?
return nil if @byte.nil?
ret : Int64 = 0
# Try to easy out
if count == 8 && @bitpos == 0
rbyte = @byte.not_nil!.to_i64!
@byte = @io.read_byte
return rbyte
end
bitsToRead = 0
while count > 0
bitsToRead = Math.min(count, 8 - @bitpos)
if count > 0 && @byte.nil?
return nil
end
count -= bitsToRead
@bitpos += bitsToRead
ret |= ((@byte.not_nil! >> (8 - @bitpos)) & (0xFF_u8 >> (8 - bitsToRead))).to_i64! << count
if @bitpos >= 8
@bitpos = 0
@byte = @io.read_byte
end
end
ret
end
# Reads `count` bytes into `dest` starting at `dest[offset]`. The
# `BitReader` must be on a byte boundary, or this will raise a
# `NotOnByteError`. This returns the number of bytes read, which maybe 0 if
# nothing was read. The current `#byte` is always the first byte put into
# `dest`.
def read(dest : Bytes, offset : Int, count : Int) : Int
return 0 if count == 0
# Bunch of checks
raise NotOnByteError.new("BitReader is not on a byte boundary") unless @bitpos == 0
raise ArgumentError.new("Offset cannot be negative") if offset < 0
raise IO::EOFError.new("End of stream, no more bits") if @byte.nil?
if dest.size < offset + count
raise ArgumentError.new("offset + count is too large for array")
end
# Put the current byte into the destination.
dest[offset] = @byte.not_nil!
numRead = 1
newlyRead = 0
while numRead < count
newlyRead = @io.read(dest[offset + numRead, count - numRead])
break if newlyRead == 0
numRead += newlyRead
end
@byte = @io.read_byte
numRead
end
# :ditto:
def read(dest : Array(UInt8), offset : Int, count : Int) : Int forall T
return 0 if count == 0
# Bunch of checks
raise NotOnByteError.new("BitReader is not on a byte boundary") unless @bitpos == 0
raise ArgumentError.new("Offset cannot be negative") if offset < 0
raise IO::EOFError.new("End of stream, no more bits") if @byte.nil?
if dest.size < offset + count
raise ArgumentError.new("offset + count is too large for array")
end
# Put the current byte into the destination.
dest[offset] = @byte.not_nil!
numRead = 1
newlyRead = 0
while numRead < count
newlyRead = @io.read(Slice.new(dest.to_unsafe + (offset + numRead), count - numRead))
break if newlyRead == 0
numRead += newlyRead
end
@byte = @io.read_byte
numRead
end
# Reads `dest.size` bytes into `dest`. The `BitReader` must be on a byte
# boundary, or this will raise a `NotOnByteError`. This returns the number
# of bytes read, which maybe 0 if nothing was read. The current `#byte` is
# always the first byte put into `dest`.
@[AlwaysInline]
def read(dest : Bytes|Array(UInt8)) : Int
self.read(dest, 0, dest.size)
end
# Reads `count` bytes into a new array, then returns that array. The number
# of elements in the returned array may be less than `count` if the end of
# the file was reached.
def readByteArray(count : Int) : Array(UInt8)
ret = [] of UInt8
count.times do |_|
byte = self.read?(8)
if byte
ret << byte.to_u8!
else
break
end
end
ret
end
# Reads `count` bytes into a new array, then returns that `Bytes`. If there
# are not `count` bytes remaining, this will raise an `IO::EOFError`.
@[AlwaysInline]
def readBytes(count : Int32) : Bytes
Bytes.new(count) do |_|
self.read(8).to_u8!
end
end
# Peeks at `count` bits, returning an `Int64`.
def peek(count : Int) : Int64
raise IO::EOFError.new if @byte.nil?
return @byte.not_nil!.to_i64! if @bitpos == 0 && count == 8
oldPos = @io.pos
oldBitPos = @bitpos
oldByte = @byte
ret : Int64? = nil
begin
ret = self.read(count)
ensure
@io.pos = oldPos
@bitpos = oldBitPos
@byte = oldByte
end
ret
end
# Peeks at `count` bits, if possible. On success, this returns an `Int64`,
# otherwise this returns `nil`.
def peek?(count : Int) : Int64?
return nil if @byte.nil?
return @byte.not_nil!.to_i64! if @bitpos == 0 && count == 8
oldPos = @io.pos
oldBitPos = @bitpos
oldByte = @byte
ret : Int64? = nil
begin
ret = self.read?(count)
ensure
@io.pos = oldPos
@bitpos = oldBitPos
@byte = oldByte
end
ret
end
# Peeks `count` bytes into a new array, then returns that array. The number
# of elements in the returned array may be less than `count` if the end of
# the file was reached.
def peekBytes(count : Int) : Array(UInt8)
return [] of UInt8 if @byte.nil?
if @bitpos == 0 && count % 8 == 0
# Easy peek
ret = Array(UInt8).new(count, 0)
@io.read(Slice.new(ret.to_unsafe, count))
return ret
end
oldPos = @io.pos
oldBitPos = @bitpos
oldByte = @byte
ret = [] of UInt8
begin
count.times do |_|
byte = self.read?(8)
if byte
ret << byte.to_u8
else
break
end
end
ensure
@io.pos = oldPos
@bitpos = oldBitPos
@byte = oldByte
end
ret
end
# Reads bits until a `1` is encountered, counting the number of zeros that
# are read. This returns the number of zeros that were read before the `1`
# was encountered.
#
# If `discardFirstOne` is `true`, then the first `1` bit is read and
# discarded before returning. Otherwise it remains unread.
def countZeros(*, discardFirstOne : Bool = false) : Int
ret = 0
if discardFirstOne
# Slightly faster since we don't do a peek
while self.read(1) == 0
ret += 1
end
return ret
end
loop do
break if self.peek(1) == 1
ret += 1
self.read(1)
end
ret
end
# Reads bits until a `0` is encountered, counting the number of ones that
# are read. This returns the number of zeros that were read before the `0`
# was encountered.
#
# If `discardFirstZero` is `true`, then the first `0` bit is read and
# discarded before returning. Otherwise it remains unread.
def countOnes(*, discardFirstZero : Bool = false) : Int
ret = 0
if discardFirstZero
# Slightly faster since we don't do a peek
while self.read(1) == 1
ret += 1
end
return ret
end
loop do
break if self.peek(1) == 0
ret += 1
self.read(1)
end
ret
end
# Advances this `BitReader` to the next byte boundary, discarding bits as it
# goes. If the reader is already on a byte boundary, this does nothing.
@[AlwaysInline]
def advanceToNextByte : Nil
return if @bitpos == 0
while @bitpos != 0
self.read(1)
end
end
# Reads up to `sizeInBytes`, then attempts to convert those bytes into a
# string. On success, this returns the new string, which may be smaller
# than `sizeInBytes` if there was not enough data left to read.
@[AlwaysInline]
def readString(sizeInBytes : Int) : String
String.new(self.readByteArray(sizeInBytes).toSlice)
end
# Reads up to `sizeInBytes`, then attempts to convert those bytes into a
# string. On success, this returns the new string. This raises an
# `IO::EOFError` if there was not enough data left to read.
@[AlwaysInline]
def readString!(sizeInBytes : Int) : String
String.new(self.readBytes(sizeInBytes))
end
{% begin %}
{% types = [:Int16, :UInt16, :Int32, :UInt32, :Int64, :UInt64, :Int128, :UInt128] %}
{% sizes = [2, 2, 4, 4, 8, 8, 16, 16] %}
{% for i in 0...types.size %}
@[AlwaysInline]
def read{{types[i].id}} : {{types[i].id}}
IO::ByteFormat::LittleEndian.decode({{types[i].id}}, readBytes({{sizes[i]}}))
end
@[AlwaysInline]
def read{{types[i].id}}BE : {{types[i].id}}
IO::ByteFormat::BigEndian.decode({{types[i].id}}, readBytes({{sizes[i]}}))
end
{% end %}
{% end %}
end
end