From ae35ebd5bcfd741a7c6a355b07290e9891b435c7 Mon Sep 17 00:00:00 2001 From: reuk Date: Mon, 8 Jun 2020 17:04:27 +0100 Subject: [PATCH] DSP: Allow Convolution instances to share a single background thread --- examples/Plugins/DSPModulePluginDemo.h | 5 +- .../juce_dsp/frequency/juce_Convolution.cpp | 232 +++++++++++------- modules/juce_dsp/frequency/juce_Convolution.h | 72 +++++- 3 files changed, 212 insertions(+), 97 deletions(-) diff --git a/examples/Plugins/DSPModulePluginDemo.h b/examples/Plugins/DSPModulePluginDemo.h index 874983c1d5..afb0d72e5c 100644 --- a/examples/Plugins/DSPModulePluginDemo.h +++ b/examples/Plugins/DSPModulePluginDemo.h @@ -1129,8 +1129,9 @@ private: return latency; } - dsp::Convolution cabinet { dsp::Convolution::NonUniform { 512 } }; - dsp::Convolution reverb { dsp::Convolution::NonUniform { 512 } }; + dsp::ConvolutionMessageQueue queue; + dsp::Convolution cabinet { dsp::Convolution::NonUniform { 512 }, queue }; + dsp::Convolution reverb { dsp::Convolution::NonUniform { 512 }, queue }; dsp::DryWetMixer mixer; bool cabEnabled = false, reverbEnabled = false; diff --git a/modules/juce_dsp/frequency/juce_Convolution.cpp b/modules/juce_dsp/frequency/juce_Convolution.cpp index af9c45d705..674629d3de 100644 --- a/modules/juce_dsp/frequency/juce_Convolution.cpp +++ b/modules/juce_dsp/frequency/juce_Convolution.cpp @@ -21,6 +21,105 @@ namespace juce namespace dsp { +template +class Queue +{ +public: + explicit Queue (int size) + : fifo (size), storage (static_cast (size)) {} + + bool push (Element& element) noexcept + { + if (fifo.getFreeSpace() == 0) + return false; + + const auto writer = fifo.write (1); + + if (writer.blockSize1 != 0) + storage[static_cast (writer.startIndex1)] = std::move (element); + else if (writer.blockSize2 != 0) + storage[static_cast (writer.startIndex2)] = std::move (element); + + return true; + } + + template + void pop (Fn&& fn) { popN (1, std::forward (fn)); } + + template + void popAll (Fn&& fn) { popN (fifo.getNumReady(), std::forward (fn)); } + + bool hasPendingMessages() const noexcept { return fifo.getNumReady() > 0; } + +private: + template + void popN (int n, Fn&& fn) + { + fifo.read (n).forEach ([&] (int index) + { + fn (storage[static_cast (index)]); + }); + } + + AbstractFifo fifo; + std::vector storage; +}; + +class BackgroundMessageQueue : private Thread +{ +public: + explicit BackgroundMessageQueue (int entries) + : Thread ("Convolution background loader"), queue (entries) + { + startThread(); + } + + ~BackgroundMessageQueue() override + { + stopThread (-1); + } + + using IncomingCommand = FixedSizeFunction<400, void()>; + + // Push functions here, and they'll be called later on a background thread. + // This function is wait-free. + // This function is only safe to call from a single thread at a time. + bool push (IncomingCommand& command) { return queue.push (command); } + +private: + void run() override + { + while (! threadShouldExit()) + { + if (queue.hasPendingMessages()) + queue.pop ([] (IncomingCommand& command) { command(); command = nullptr;}); + else + sleep (10); + } + } + + Queue queue; +}; + +struct ConvolutionMessageQueue::Impl : public BackgroundMessageQueue +{ + using BackgroundMessageQueue::BackgroundMessageQueue; +}; + +ConvolutionMessageQueue::ConvolutionMessageQueue() + : ConvolutionMessageQueue (1000) +{} + +ConvolutionMessageQueue::ConvolutionMessageQueue (int entries) + : pimpl (std::make_unique (entries)) +{} + +ConvolutionMessageQueue::~ConvolutionMessageQueue() noexcept = default; + +ConvolutionMessageQueue::ConvolutionMessageQueue (ConvolutionMessageQueue&&) noexcept = default; +ConvolutionMessageQueue& ConvolutionMessageQueue::operator= (ConvolutionMessageQueue&&) noexcept = default; + +//============================================================================== struct ConvolutionEngine { ConvolutionEngine (const float* samples, @@ -314,51 +413,6 @@ struct ConvolutionEngine std::vector> buffersInputSegments, buffersImpulseSegments; }; -//============================================================================== -template -class Queue -{ -public: - explicit Queue (int size) - : fifo (size), storage (static_cast (size)) {} - - bool push (Element& element) noexcept - { - if (fifo.getFreeSpace() == 0) - return false; - - const auto writer = fifo.write (1); - - if (writer.blockSize1 != 0) - storage[static_cast (writer.startIndex1)] = std::move (element); - else if (writer.blockSize2 != 0) - storage[static_cast (writer.startIndex2)] = std::move (element); - - return true; - } - - template - void pop (Fn&& fn) { popN (1, std::forward (fn)); } - - template - void popAll (Fn&& fn) { popN (fifo.getNumReady(), std::forward (fn)); } - - bool hasPendingMessages() const noexcept { return fifo.getNumReady() > 0; } - -private: - template - void popN (int n, Fn&& fn) - { - fifo.read (n).forEach ([&] (int index) - { - fn (storage[static_cast (index)]); - }); - } - - AbstractFifo fifo; - std::vector storage; -}; - //============================================================================== class MultichannelEngine { @@ -414,7 +468,7 @@ public: void processSamples (const AudioBlock& input, AudioBlock& output) { - const auto numChannels = juce::jmin (head.size(), input.getNumChannels(), output.getNumChannels()); + const auto numChannels = jmin (head.size(), input.getNumChannels(), output.getNumChannels()); const auto numSamples = jmin (input.getNumSamples(), output.getNumSamples()); const AudioBlock fullTailBlock (tailBuffer); @@ -518,7 +572,7 @@ static AudioBuffer trimImpulseResponse (const AudioBuffer& buf) return result; } - const auto newLength = juce::jmax (1, numSamples - static_cast (offsetBegin + offsetEnd)); + const auto newLength = jmax (1, numSamples - static_cast (offsetBegin + offsetEnd)); AudioBuffer result (numChannels, newLength); @@ -549,7 +603,7 @@ static void normaliseImpulseResponse (AudioBuffer& buf) const auto maxSumSquaredMag = std::accumulate (channelPtrs, channelPtrs + numChannels, 0.0f, [&] (auto max, auto* channel) { - return juce::jmax (max, std::accumulate (channel, channel + numSamples, 0.0f, [] (auto sum, auto samp) + return jmax (max, std::accumulate (channel, channel + numSamples, 0.0f, [] (auto sum, auto samp) { return sum + (samp * samp); })); @@ -576,7 +630,7 @@ static AudioBuffer resampleImpulseResponse (const AudioBuffer& buf MemoryAudioSource memorySource (original, false); ResamplingAudioSource resamplingSource (&memorySource, false, buf.getNumChannels()); - const auto finalSize = roundToInt (juce::jmax (1.0, buf.getNumSamples() / factorReading)); + const auto finalSize = roundToInt (jmax (1.0, buf.getNumSamples() / factorReading)); resamplingSource.setResamplingRatio (factorReading); resamplingSource.prepareToPlay (finalSize, srcSampleRate); @@ -587,42 +641,6 @@ static AudioBuffer resampleImpulseResponse (const AudioBuffer& buf } //============================================================================== -class BackgroundMessageQueue : private Thread -{ -public: - BackgroundMessageQueue() - : Thread ("Convolution background loader"), queue (1000) - { - startThread(); - } - - ~BackgroundMessageQueue() override - { - stopThread (-1); - } - - using IncomingCommand = FixedSizeFunction<400, void()>; - - // Push functions here, and they'll be called later on a background thread. - // This function is wait-free. - // This function is only safe to call from a single thread at a time. - bool push (IncomingCommand& command) { return queue.push (command); } - -private: - void run() override - { - while (! threadShouldExit()) - { - if (queue.hasPendingMessages()) - queue.pop ([] (IncomingCommand& command) { command(); command = nullptr;}); - else - sleep (10); - } - } - - Queue queue; -}; - template class TryLockedPtr { @@ -963,11 +981,18 @@ private: AudioBuffer mixBuffer; }; +using OptionalQueue = OptionalScopedPointer; + class Convolution::Impl { public: - Impl (Latency requiredLatency, NonUniform requiredHeadSize) - : engineQueue (std::make_shared (messageQueue, requiredLatency, requiredHeadSize)) + Impl (Latency requiredLatency, + NonUniform requiredHeadSize, + OptionalQueue&& queue) + : messageQueue (std::move (queue)), + engineQueue (std::make_shared (*messageQueue->pimpl, + requiredLatency, + requiredHeadSize)) {} void reset() @@ -1048,7 +1073,7 @@ private: { // If the queue is full, we'll destroy this straight away BackgroundMessageQueue::IncomingCommand command = [p = std::move (previousEngine)]() mutable { p = nullptr; }; - messageQueue.push (command); + messageQueue->pimpl->push (command); } void installNewEngine (std::unique_ptr newEngine) @@ -1065,7 +1090,7 @@ private: installNewEngine (std::move (newEngine)); } - BackgroundMessageQueue messageQueue; + OptionalQueue messageQueue; std::shared_ptr engineQueue; std::unique_ptr previousEngine, currentEngine; CrossoverMixer mixer; @@ -1140,14 +1165,37 @@ void Convolution::Mixer::reset() { dryBlock.clear(); } //============================================================================== Convolution::Convolution() - : Convolution (Latency { 0 }) {} + : Convolution (Latency { 0 }) +{} + +Convolution::Convolution (ConvolutionMessageQueue& queue) + : Convolution (Latency { 0 }, queue) +{} Convolution::Convolution (const Latency& requiredLatency) - : pimpl (std::make_unique (requiredLatency, NonUniform{})) + : Convolution (requiredLatency, + {}, + OptionalQueue { std::make_unique() }) {} Convolution::Convolution (const NonUniform& nonUniform) - : pimpl (std::make_unique (Latency{}, nonUniform)) + : Convolution ({}, + nonUniform, + OptionalQueue { std::make_unique() }) +{} + +Convolution::Convolution (const Latency& requiredLatency, ConvolutionMessageQueue& queue) + : Convolution (requiredLatency, {}, OptionalQueue { queue }) +{} + +Convolution::Convolution (const NonUniform& nonUniform, ConvolutionMessageQueue& queue) + : Convolution ({}, nonUniform, OptionalQueue { queue }) +{} + +Convolution::Convolution (const Latency& latency, + const NonUniform& nonUniform, + OptionalQueue&& queue) + : pimpl (std::make_unique (latency, nonUniform, std::move (queue))) {} Convolution::~Convolution() noexcept = default; diff --git a/modules/juce_dsp/frequency/juce_Convolution.h b/modules/juce_dsp/frequency/juce_Convolution.h index d07210521c..1a102f7fd0 100644 --- a/modules/juce_dsp/frequency/juce_Convolution.h +++ b/modules/juce_dsp/frequency/juce_Convolution.h @@ -21,6 +21,45 @@ namespace juce namespace dsp { +/** + Used by the Convolution to dispatch engine-update messages on a background + thread. + + May be shared between multiple Convolution instances. +*/ +class JUCE_API ConvolutionMessageQueue +{ +public: + /** Initialises the queue to a default size. + + If your Convolution is updated very frequently, or you are sharing + this queue between multiple Convolutions, consider using the alternative + constructor taking an explicit size argument. + */ + ConvolutionMessageQueue(); + ~ConvolutionMessageQueue() noexcept; + + /** Initialises the queue with the specified number of entries. + + In general, the number of required entries scales with the number + of Convolutions sharing the same Queue, and the frequency of updates + to those Convolutions. + */ + explicit ConvolutionMessageQueue (int numEntries); + + ConvolutionMessageQueue (ConvolutionMessageQueue&&) noexcept; + ConvolutionMessageQueue& operator= (ConvolutionMessageQueue&&) noexcept; + + ConvolutionMessageQueue (const ConvolutionMessageQueue&) = delete; + ConvolutionMessageQueue& operator= (const ConvolutionMessageQueue&) = delete; + +private: + struct Impl; + std::unique_ptr pimpl; + + friend class Convolution; +}; + /** Performs stereo partitioned convolution of an input signal with an impulse response in the frequency domain, using the JUCE FFT class. @@ -60,6 +99,13 @@ public: /** Initialises an object for performing convolution in the frequency domain. */ Convolution(); + /** Initialises a convolution engine using a shared background message queue. + + IMPORTANT: the queue *must* remain alive throughout the lifetime of the + Convolution. + */ + explicit Convolution (ConvolutionMessageQueue& queue); + /** Contains configuration information for a convolution with a fixed latency. */ struct Latency { int latencyInSamples; }; @@ -90,6 +136,22 @@ public: */ explicit Convolution (const NonUniform& requiredHeadSize); + /** Behaves the same as the constructor taking a single Latency argument, + but with a shared background message queue. + + IMPORTANT: the queue *must* remain alive throughout the lifetime of the + Convolution. + */ + Convolution (const Latency&, ConvolutionMessageQueue&); + + /** Behaves the same as the constructor taking a single NonUniform argument, + but with a shared background message queue. + + IMPORTANT: the queue *must* remain alive throughout the lifetime of the + Convolution. + */ + Convolution (const NonUniform&, ConvolutionMessageQueue&); + ~Convolution() noexcept; //============================================================================== @@ -112,9 +174,9 @@ public: } //============================================================================== - enum class Stereo { yes, no }; - enum class Trim { yes, no }; - enum class Normalise { yes, no }; + enum class Stereo { no, yes }; + enum class Trim { no, yes }; + enum class Normalise { no, yes }; //============================================================================== /** This function loads an impulse response audio file from memory, added in a @@ -188,6 +250,10 @@ public: private: //============================================================================== + Convolution (const Latency&, + const NonUniform&, + OptionalScopedPointer&&); + void processSamples (const AudioBlock&, AudioBlock&, bool isBypassed) noexcept; class Mixer