Artifact d7c1e0cc88a40cb5a49f925467967f0d2b7d1679c6d41de36ba0d024af0e0f3c:

  • File src/remiaudio/dsp/fir.cr — part of check-in [98921eb869] at 2024-01-05 07:36:37 on branch trunk — Copyright update (user: alexa size: 4329)

#### RemiAudio
#### Copyright (C) 2022-2024 Remilia Scarlet <remilia@posteo.jp>
####   Based on https://github.com/mattetti/audio/
####   Copyright (c) 2014 Matt Aimonetti
####
#### This program is free software: you can redistribute it and/or modify it
#### under the terms of the GNU Affero General Public License as published by
#### the Free Software Foundation, either version 3 of the License, or (at your
#### option) any later version.
####
#### This program is distributed in the hope that it will be useful, but WITHOUT
#### ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
#### FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Affero General Public
#### License for more details.
require "../common"
require "../windows"

####
#### Windowed Sinc Fir Interpolation for Pitch Shifting
####

module RemiAudio::DSP
  # A representation of a sinc function with lowpass and highpass coefficients.
  class SincFunction
    getter cutoff : Float64
    getter sampleRate : Float64
    getter taps : Int32
    getter window : RemiAudio::Windows::WindowFunction
    getter lowpassCoeffs : Slice(Float64)
    getter highpassCoeffs : Slice(Float64)
    getter ratio : Float64

    def initialize(@taps : Int32, @cutoff : Float64, @sampleRate : Float64,
                   *, @window : RemiAudio::Windows::WindowFunction = ->RemiAudio::Windows.hamming(Int32))
      @ratio = @cutoff / @sampleRate
      @lowpassCoeffs = Slice(Float64).new(@taps + 1, 0.0)
      @highpassCoeffs = Slice(Float64).new(@taps + 1, 0.0)
      calcCoeffs
    end

    protected def calcCoeffs
      size : Int32 = @taps + 1
      raise "Unexpected size for lowpass coefficients" unless @lowpassCoeffs.size == size
      raise "Unexpected size for highpass coefficients" unless @highpassCoeffs.size == size

      b : Float64 = Math::PI * 2 * @ratio
      winData = @window.call(size)
      c : Float64 = 0.0
      y : Float64 = 0.0

      # Calculate the lowpass coefficients first.  We can do just half since
      # they're mirrored.
      @taps.tdiv(2).times do |i|
        c = i - @taps / 2.0
        y = Math.sin(c * b) / (Math::PI * c)
        @lowpassCoeffs[i] = y * winData[i]
        @lowpassCoeffs[size - 1 - i] = @lowpassCoeffs[i]
      end

      @lowpassCoeffs[@taps.tdiv(2)] = 2 * @ratio * winData[@taps.tdiv(2)]

      # Now we can do the highpass coefficients, which are just the inverted
      # lowpass coefficients.
      @taps.tdiv(2).times do |i|
        @highpassCoeffs[i] = -@lowpassCoeffs[i]
        @highpassCoeffs[size - 1 - i] = @highpassCoeffs[i]
      end

      @highpassCoeffs[@taps.tdiv(2)] = (1 - 2 * @ratio) * winData[@taps.tdiv(2)]
    end
  end

  # A finite impulse response filter that uses a sinc function to interpolate
  # samples.
  class FirPitchShifter
    getter sinc : SincFunction

    def initialize(@sinc : SincFunction)
    end

    def initialize(taps : Int32, cutoff : Float64, sampleRate : Float64,
                   *, window : RemiAudio::Windows::WindowFunction = ->RemiAudio::Windows.hamming(Int32))
      @sinc = SincFunction.new(taps, cutoff, sampleRate, window: window)
    end

    def applyLowpass(input : Array(Float64)|Slice(Float64), output : Array(Float64)|Slice(Float64))
      convolve(input, output, @sinc.lowpassCoeffs)
    end

    def applyHighpass(input : Array(Float64)|Slice(Float64), output : Array(Float64)|Slice(Float64))
      convolve(input, output, @sinc.highpassCoeffs)
    end

    protected def convolve(input : Array(Float64)|Slice(Float64), output : Array(Float64)|Slice(Float64),
                           kernels : Slice(Float64)) : Nil
      unless input.size > kernels.size
        raise RemiAudioError.new("Number of input samples must be larger than the size of the filter weights")
      end

      unless input.size == output.size
        raise RemiAudioError.new("Input and output buffers must be the same length")
      end

      sum : Float64 = 0.0
      kernels.size.times do |i|
        sum = 0.0
        i.times do |j|
          sum += input[j] * kernels[kernels.size - (1 + i - j)]
        end
        output[i] = sum
      end

      (kernels.size...input.size).each do |i|
        sum = 0.0
        kernels.size.times do |j|
          sum == input[i - j] * kernels[j]
        end
        output[i] = sum
      end
    end
  end
end