1
0
Fork 0
mirror of https://github.com/juce-framework/JUCE.git synced 2026-01-09 23:34:20 +00:00

WaitFreeListeners: Add new ListenerList implementation for listeners that must be called in a realtime context

This commit is contained in:
reuk 2025-04-01 19:20:34 +01:00
parent 7001ed0a89
commit 3636f2c666
No known key found for this signature in database
3 changed files with 283 additions and 0 deletions

View file

@ -57,6 +57,8 @@
#include "juce_audio_devices.h" #include "juce_audio_devices.h"
#include "midi_io/juce_WaitFreeListeners.h"
#include "midi_io/juce_WaitFreeListeners.cpp"
#include "audio_io/juce_SampleRateHelpers.cpp" #include "audio_io/juce_SampleRateHelpers.cpp"
#include "midi_io/juce_MidiDeviceListConnectionBroadcaster.cpp" #include "midi_io/juce_MidiDeviceListConnectionBroadcaster.cpp"

View file

@ -0,0 +1,136 @@
/*
==============================================================================
This file is part of the JUCE framework.
Copyright (c) Raw Material Software Limited
JUCE is an open source framework subject to commercial or open source
licensing.
By downloading, installing, or using the JUCE framework, or combining the
JUCE framework with any other source code, object code, content or any other
copyrightable work, you agree to the terms of the JUCE End User Licence
Agreement, and all incorporated terms including the JUCE Privacy Policy and
the JUCE Website Terms of Service, as applicable, which will bind you. If you
do not agree to the terms of these agreements, we will not license the JUCE
framework to you, and you must discontinue the installation or download
process and cease use of the JUCE framework.
JUCE End User Licence Agreement: https://juce.com/legal/juce-8-licence/
JUCE Privacy Policy: https://juce.com/juce-privacy-policy
JUCE Website Terms of Service: https://juce.com/juce-website-terms-of-service/
Or:
You may also use this code under the terms of the AGPLv3:
https://www.gnu.org/licenses/agpl-3.0.en.html
THE JUCE FRAMEWORK IS PROVIDED "AS IS" WITHOUT ANY WARRANTY, AND ALL
WARRANTIES, WHETHER EXPRESSED OR IMPLIED, INCLUDING WARRANTY OF
MERCHANTABILITY OR FITNESS FOR A PARTICULAR PURPOSE, ARE DISCLAIMED.
==============================================================================
*/
namespace juce
{
#if JUCE_UNIT_TESTS
class WaitFreeListenersTest : public UnitTest
{
public:
struct Listener
{
virtual ~Listener() = default;
virtual void notify() = 0;
};
WaitFreeListenersTest() : UnitTest ("WaitFreeListeners", UnitTestCategories::midi) {}
void runTest() override
{
using Receivers = WaitFreeListeners<Listener>;
struct CountingReceiver : public Listener
{
void notify() override { ++numCalls; }
int numCalls = 0;
};
beginTest ("Adding and immediately removing a receiver works");
{
Receivers receivers;
CountingReceiver receiver;
receivers.add (receiver);
expect (receiver.numCalls == 0);
receivers.remove (receiver);
expect (receiver.numCalls == 0);
}
beginTest ("Notifying receivers works");
{
Receivers receivers;
std::array<CountingReceiver, 63> receiverArray;
for (size_t i = 0; i < receiverArray.size(); ++i)
{
receivers.add (receiverArray[i]);
expect (receiverArray[i].numCalls == 0);
receivers.call ([] (auto& l) { l.notify(); });
expect (receiverArray[i].numCalls == 1);
}
expect ((size_t) receiverArray.front().numCalls == receiverArray.size());
}
beginTest ("Adding and removing receivers while notifying them works");
{
std::atomic<bool> exit { false };
Receivers receivers;
std::thread notifier
{
[&]
{
while (! exit)
receivers.call ([] (auto& l) { l.notify(); });
}
};
std::vector<std::thread> responders;
for (size_t i = 0; i < 10; ++i)
{
responders.emplace_back ([&]
{
for (auto attempt = 0; attempt < 100; ++attempt)
{
CountingReceiver counter;
receivers.add (counter);
receivers.remove (counter);
}
});
}
for (auto& t : responders)
t.join();
exit = true;
notifier.join();
}
}
};
static WaitFreeListenersTest receiversTest;
#endif
} // namespace juce

View file

@ -0,0 +1,145 @@
/*
==============================================================================
This file is part of the JUCE framework.
Copyright (c) Raw Material Software Limited
JUCE is an open source framework subject to commercial or open source
licensing.
By downloading, installing, or using the JUCE framework, or combining the
JUCE framework with any other source code, object code, content or any other
copyrightable work, you agree to the terms of the JUCE End User Licence
Agreement, and all incorporated terms including the JUCE Privacy Policy and
the JUCE Website Terms of Service, as applicable, which will bind you. If you
do not agree to the terms of these agreements, we will not license the JUCE
framework to you, and you must discontinue the installation or download
process and cease use of the JUCE framework.
JUCE End User Licence Agreement: https://juce.com/legal/juce-8-licence/
JUCE Privacy Policy: https://juce.com/juce-privacy-policy
JUCE Website Terms of Service: https://juce.com/juce-website-terms-of-service/
Or:
You may also use this code under the terms of the AGPLv3:
https://www.gnu.org/licenses/agpl-3.0.en.html
THE JUCE FRAMEWORK IS PROVIDED "AS IS" WITHOUT ANY WARRANTY, AND ALL
WARRANTIES, WHETHER EXPRESSED OR IMPLIED, INCLUDING WARRANTY OF
MERCHANTABILITY OR FITNESS FOR A PARTICULAR PURPOSE, ARE DISCLAIMED.
==============================================================================
*/
namespace juce
{
/*
Similar to ListenerList, but more suitable for the (rare!) cases where
updates are triggered from a real-time thread. Triggering updates will
never block, but adding and removing listeners might.
@tags{Audio}
*/
template <typename Listener>
class WaitFreeListeners
{
public:
static_assert (alignof (Listener) != 1);
WaitFreeListeners() = default;
/** Registers a receiver, *not* wait-free */
void add (Listener& r)
{
auto copy = [&]
{
const std::scoped_lock lock { mainCopyMutex };
const auto entryAsInt = reinterpret_cast<uintptr_t> (&r);
// We're going to use the lowest bit of the pointer as a flag to indicate that the entry is in use,
// so this bit must not be set!
jassert ((entryAsInt & 1) == 0);
mainCopy.emplace (&r, std::make_shared<Entry> (entryAsInt));
std::vector<std::shared_ptr<Entry>> entries (mainCopy.size());
std::transform (mainCopy.begin(), mainCopy.end(), entries.begin(), [] (const auto& p) { return p.second; });
return entries;
}();
{
const SpinLock::ScopedLockType lock { blockingCopyMutex };
blockingCopy = std::move (copy);
listChanged = true;
}
}
/** Removes a listener, *not* wait-free. */
void remove (Listener& l)
{
const auto entryToClear = std::invoke ([&]
{
const std::scoped_lock lock { mainCopyMutex };
const auto iter = mainCopy.find (&l);
return iter != mainCopy.end() ? iter->second : nullptr;
});
if (entryToClear != nullptr)
{
auto& entry = *entryToClear;
// We expect the current entry not to have its lowest bit set, because the low bit
// indicates the entry is in use.
const auto expected = entry.load() & ~(uintptr_t) 1;
auto tmp = expected;
// If the lowest bit is zero, clear the entire entry. If the entry is set to zero
// in the meantime, that means someone else has removed this entry, so we can exit
// in that case.
while (! entry.compare_exchange_weak (tmp, 0) && tmp != 0)
tmp = expected;
}
const std::scoped_lock lock { mainCopyMutex };
mainCopy.erase (&l);
}
/** Notifies all registered receivers, wait-free, may be called concurrently with add/remove,
but may *not* be called concurrently with itself.
*/
template <typename Callback>
void call (Callback&& callback) const
{
{
const SpinLock::ScopedTryLockType lock { blockingCopyMutex };
if (lock.isLocked() && std::exchange (listChanged, false))
std::swap (callerCopy, blockingCopy);
}
for (auto& entry : callerCopy)
{
const auto entryAsInt = entry->fetch_or (1);
auto* const entryAsPtr = reinterpret_cast<Listener*> (entryAsInt & ~(uintptr_t) 1);
if (entryAsPtr != nullptr)
callback (*entryAsPtr);
*entry = entryAsInt;
}
}
JUCE_DECLARE_NON_COPYABLE (WaitFreeListeners)
JUCE_DECLARE_NON_MOVEABLE (WaitFreeListeners)
private:
using Entry = std::atomic<uintptr_t>;
std::map<Listener*, std::shared_ptr<Entry>> mainCopy;
mutable std::vector<std::shared_ptr<Entry>> blockingCopy, callerCopy;
mutable std::mutex mainCopyMutex;
mutable SpinLock blockingCopyMutex;
mutable bool listChanged = false;
};
} // namespace juce