1
0
Fork 0
mirror of https://github.com/juce-framework/JUCE.git synced 2026-01-10 23:44:24 +00:00

CIDevice: Improve robustness of subscription API

The old API only allowed cancelling property "get" inquiries and
subscription updates. However, there are use-cases for cancelling other
requests too. e.g. switching between views in a JUCE app might mean that
it's no longer necessary to subscribe to a particular property.

Cancelling subscriptions ends up being quite involved. Different
handling is needed depending on whether the subscription is cancelled
before or after the responder replies to the initial request.
In addition, the responder may ask the initiator to retry a subscription
begin request.
This commit is contained in:
reuk 2024-01-08 20:16:06 +00:00
parent 16d5e4e2a6
commit 60757de2f2
No known key found for this signature in database
GPG key ID: FCB43929F012EE5C
16 changed files with 2255 additions and 532 deletions

View file

@ -2,6 +2,77 @@
# develop # develop
## Change
The signatures of some member functions of ci::Device have been changed:
- sendPropertyGetInquiry
- sendPropertySetInquiry
The signature of ci::PropertyHost::sendSubscriptionUpdate has also changed.
The following member functions of ci::Device have been replaced with new
alternatives:
- sendPropertySubscriptionStart
- sendPropertySubscriptionEnd
- getOngoingSubscriptionsForMuid
- countOngoingPropertyTransactions
The enum field PropertyExchangeResult::Error::invalidPayload has been removed.
**Possible Issues**
Code that uses any of these symbols will fail to compile until it is updated.
**Workaround**
Device::sendPropertyGetInquiry, Device::sendPropertySetInquiry, and
PropertyHost::sendSubscriptionUpdate all now return an optional RequestKey
instead of an ErasedScopeGuard. Requests started via any of these functions may
be cancelled by the request's RequestKey to the new function
Device::abortPropertyRequest. The returned RequestKey may be null, indicating a
failure to send the request.
countOngoingPropertyTransactions has been replaced by getOngoingRequests,
which returns the RequestKeys of all ongoing requests. To find the number of
transactions, use the size of the returned container.
sendPropertySubscriptionStart has been replaced by beginSubscription.
sendPropertySubscriptionEnd has been replaced by endSubscription.
The new functions no longer take callbacks. Instead, to receive notifications
when a subscription starts or ends, override
DeviceListener::propertySubscriptionChanged.
getOngoingSubscriptionsForMuid is replaced by multiple functions.
getOngoingSubscriptions returns SubscriptionKeys for all of the subscriptions
currently in progress, which may be filtered based on SubscriptionKey::getMuid.
The subscribeId assigned to a particular SubscriptionKey can be found using
getSubscribeIdForKey, and the subscribed resource can be found using
getResourceForKey.
It's possible that the initial call to beginSubscription may not be able to
start the subscription, e.g. if the remote device is busy and request a retry.
In this case, the request is cached. If you use subscriptions, then you
should call sendPendingMessages periodically to flush any messages that may
need to be retried.
There is no need to check for the invalidPayload error when processing
property exchange results.
**Rationale**
Keeping track of subscriptions is quite involved, as the initial request to
begin a subscription might not be accepted straight away. The device may not
initially have enough vacant slots to send the request, or responder might
request a retry if it is too busy to process the request. The ci::Device now
caches requests when necessary, allowing them to be retried in the future.
This functionality couldn't be implemented without modifying the old interface.
Replacing ErasedScopeGuards with Keys makes lifetime handling a bit easier.
It's no longer necessary to store or manually release scope guards for requests
that don't need to be cancelled. The new Key types are also a bit more
typesafe, and allow for simple queries of the transaction that created the key.
## Change ## Change
The ListenerList::Iterator class has been removed. The ListenerList::Iterator class has been removed.
@ -45,6 +116,7 @@ fixed white colour was inappropriate for most user interfaces.
## Change ## Change
>>>>>>> c74b2b1058 (CIDevice: Improve robustness of subscription API)
ProfileHost::enableProfile and ProfileHost::disableProfile have been combined ProfileHost::enableProfile and ProfileHost::disableProfile have been combined
into a single function, ProfileHost::setProfileEnablement. into a single function, ProfileHost::setProfileEnablement.

View file

@ -740,14 +740,7 @@ struct Model
DeviceInfo info; DeviceInfo info;
Profiles profiles; Profiles profiles;
Properties properties; Properties properties;
std::map<String, String> subscribeIdForResource; std::map<ci::SubscriptionKey, ci::Subscription> subscriptions;
std::optional<String> getSubscriptionId (const String& resource) const
{
const auto iter = subscribeIdForResource.find (resource);
return iter != subscribeIdForResource.end() ? std::optional (iter->second)
: std::nullopt;
}
template <typename Archive, typename This> template <typename Archive, typename This>
static auto serialise (Archive& archive, This& t) static auto serialise (Archive& archive, This& t)
@ -761,7 +754,7 @@ struct Model
auto tie() const auto tie() const
{ {
return std::tie (muid, info, profiles, properties, subscribeIdForResource); return std::tie (muid, info, profiles, properties, subscriptions);
} }
JUCE_TUPLE_RELATIONAL_OPS (Device) JUCE_TUPLE_RELATIONAL_OPS (Device)
}; };
@ -2472,7 +2465,7 @@ public:
explicit PropertyValuePanel (State<Model::Properties> s) explicit PropertyValuePanel (State<Model::Properties> s)
: PropertyValuePanel (s, {}) {} : PropertyValuePanel (s, {}) {}
PropertyValuePanel (State<Model::Properties> s, State<std::map<String, String>> subState) PropertyValuePanel (State<Model::Properties> s, State<std::map<ci::SubscriptionKey, ci::Subscription>> subState)
: state (s), subscriptions (subState) : state (s), subscriptions (subState)
{ {
addAndMakeVisible (value); addAndMakeVisible (value);
@ -2590,13 +2583,13 @@ private:
if (const auto* selectedProp = state->properties.getSelected()) if (const auto* selectedProp = state->properties.getSelected())
{ {
const auto text = sub.count (selectedProp->name) != 0 ? "Unsubscribe" : "Subscribe"; const auto text = std::any_of (sub.begin(), sub.end(), [&] (const auto& p) { return p.second.resource == selectedProp->name; }) ? "Unsubscribe" : "Subscribe";
subscribe.setButtonText (text); subscribe.setButtonText (text);
} }
} }
State<Model::Properties> state; State<Model::Properties> state;
State<std::map<String, String>> subscriptions; State<std::map<ci::SubscriptionKey, ci::Subscription>> subscriptions;
MonospaceEditor value; MonospaceEditor value;
TextField<editable> format; TextField<editable> format;
@ -2970,9 +2963,9 @@ private:
{ {
const auto selected = (size_t) state->transient.devices.selection; const auto selected = (size_t) state->transient.devices.selection;
return state[&Model::App::transient] return state[&Model::App::transient]
[&Model::Transient::devices] [&Model::Transient::devices]
[&Model::ListWithSelection<Model::Device>::items] [&Model::ListWithSelection<Model::Device>::items]
[selected]; [selected];
} }
State<Model::App> state; State<Model::App> state;
@ -2980,7 +2973,7 @@ private:
PropertyValuePanel<Editable::no> value PropertyValuePanel<Editable::no> value
{ {
getDeviceState()[&Model::Device::properties], getDeviceState()[&Model::Device::properties],
getDeviceState()[&Model::Device::subscribeIdForResource] getDeviceState()[&Model::Device::subscriptions]
}; };
TabbedComponent tabs { TabbedButtonBar::Orientation::TabsAtTop }; TabbedComponent tabs { TabbedButtonBar::Orientation::TabsAtTop };
}; };
@ -3613,7 +3606,8 @@ private:
}); });
}; };
class CapabilityInquiryDemo : public Component class CapabilityInquiryDemo : public Component,
private Timer
{ {
public: public:
CapabilityInquiryDemo() CapabilityInquiryDemo()
@ -3667,10 +3661,14 @@ public:
{ {
setPropertyPartial (bytes); setPropertyPartial (bytes);
}; };
startTimer (2'000);
} }
~CapabilityInquiryDemo() override ~CapabilityInquiryDemo() override
{ {
stopTimer();
// In a production app, it'd be a bit risky to write to a file from a destructor as it's // In a production app, it'd be a bit risky to write to a file from a destructor as it's
// bad karma to throw an exception inside a destructor! // bad karma to throw an exception inside a destructor!
if (auto* userSettings = applicationProperties.getUserSettings()) if (auto* userSettings = applicationProperties.getUserSettings())
@ -3691,6 +3689,12 @@ public:
} }
private: private:
void timerCallback() override
{
if (device.has_value())
device->sendPendingMessages();
}
std::optional<std::tuple<ci::MUID, String>> getPropertyRequestInfo() const std::optional<std::tuple<ci::MUID, String>> getPropertyRequestInfo() const
{ {
auto* selectedDevice = appState->transient.devices.getSelected(); auto* selectedDevice = appState->transient.devices.getSelected();
@ -3860,58 +3864,37 @@ private:
const auto& [muid, propName] = *details; const auto& [muid, propName] = *details;
const auto subId = [&, propNameCopy = propName] // Find the subscription for this resource, if any
const auto existingToken = [&, propNameCopy = propName]() -> std::optional<ci::SubscriptionKey>
{ {
const auto ongoing = device->getOngoingSubscriptionsForMuid (selectedDevice->muid); const auto ongoing = device->getOngoingSubscriptions();
const auto iter = std::find_if (ongoing.begin(), ongoing.end(), [&] (const auto& sub)
{
return sub.resource == propNameCopy;
});
return iter != ongoing.end() ? iter->subscribeId : String(); for (const auto& o : ongoing)
if (propNameCopy == device->getResourceForKey (o))
return o;
return std::nullopt;
}(); }();
ci::PropertySubscriptionHeader header; // If we're already subscribed, end that subscription.
header.resource = propName; // Otherwise, begin a new subscription to this resource.
header.command = subId.isEmpty() ? ci::PropertySubscriptionCommand::start : ci::PropertySubscriptionCommand::end; const auto changedToken = [this, propName = propName, muid = muid, existingToken = existingToken]() -> std::optional<ci::SubscriptionKey>
header.subscribeId = subId;
auto callback = [this,
target = muid,
propertyName = propName,
existingSubscription = subId.isNotEmpty()] (const ci::PropertyExchangeResult& response)
{ {
if (response.getError().has_value()) // We're not subscribed, so begin a new subscription
return; if (! existingToken.has_value())
auto updated = *appState;
auto& knownDevices = updated.transient.devices.items;
const auto deviceIter = std::find_if (knownDevices.begin(),
knownDevices.end(),
[target] (const auto& d) { return d.muid == target; });
if (deviceIter == knownDevices.end())
{ {
// The device has gone away? ci::PropertySubscriptionHeader header;
jassertfalse; header.resource = propName;
return; header.command = ci::PropertySubscriptionCommand::start;
return device->beginSubscription (muid, header);
} }
const auto parsedHeader = response.getHeaderAsSubscriptionHeader(); device->endSubscription (*existingToken);
return existingToken;
}();
if (parsedHeader.subscribeId.isNotEmpty() && ! existingSubscription) if (changedToken.has_value())
deviceIter->subscribeIdForResource.emplace (propertyName, parsedHeader.subscribeId); deviceListener.propertySubscriptionChanged (*changedToken);
else
deviceIter->subscribeIdForResource.erase (propertyName);
appState = std::move (updated);
};
if (subId.isEmpty())
device->sendPropertySubscriptionStart (muid, header, callback);
else
device->sendPropertySubscriptionEnd (muid, subId, callback);
} }
template <typename Transient> template <typename Transient>
@ -3973,11 +3956,8 @@ private:
header.resource = propertyName; header.resource = propertyName;
header.mutualEncoding = *encodingToUse; header.mutualEncoding = *encodingToUse;
const auto it = ongoingGetInquiries.insert (ongoingGetInquiries.end(), ErasedScopeGuard{}); device->sendPropertyGetInquiry (target, header, [this, target, propertyName] (const auto& response)
*it = device->sendPropertyGetInquiry (target, header, [this, it, target, propertyName] (const auto& response)
{ {
ongoingGetInquiries.erase (it);
if (response.getError().has_value()) if (response.getError().has_value())
return; return;
@ -4113,7 +4093,7 @@ private:
header.command = ci::PropertySubscriptionCommand::notify; header.command = ci::PropertySubscriptionCommand::notify;
header.subscribeId = subId; header.subscribeId = subId;
header.resource = propertyName; header.resource = propertyName;
host->sendSubscriptionUpdate (receiver, header, {}, {}).release(); host->sendSubscriptionUpdate (receiver, header, {}, {});
} }
} }
} }
@ -4348,9 +4328,13 @@ private:
{ {
const auto resource = [&] const auto resource = [&]
{ {
for (const auto& [subId, res] : demo.device->getOngoingSubscriptionsForMuid (muid)) const auto ongoing = demo.device->getOngoingSubscriptions();
if (subId == subscription.header.subscribeId)
return res; for (const auto& o : ongoing)
{
if (subscription.header.subscribeId == demo.device->getSubscribeIdForKey (o))
return demo.device->getResourceForKey (o).value_or (String{});
}
return String{}; return String{};
}(); }();
@ -4425,10 +4409,7 @@ private:
} }
case ci::PropertySubscriptionCommand::end: case ci::PropertySubscriptionCommand::end:
{
matchingDevice->subscribeIdForResource.erase (resource);
break; break;
}
case ci::PropertySubscriptionCommand::start: case ci::PropertySubscriptionCommand::start:
jassertfalse; jassertfalse;
@ -4438,6 +4419,35 @@ private:
devicesState = std::move (copiedDevices); devicesState = std::move (copiedDevices);
} }
void propertySubscriptionChanged (ci::SubscriptionKey key, const std::optional<String>&) override
{
propertySubscriptionChanged (key);
}
void propertySubscriptionChanged (ci::SubscriptionKey key)
{
auto updated = *demo.appState;
auto& knownDevices = updated.transient.devices.items;
const auto deviceIter = std::find_if (knownDevices.begin(),
knownDevices.end(),
[target = key.getMuid()] (const auto& d) { return d.muid == target; });
if (deviceIter == knownDevices.end())
{
// The device has gone away?
jassertfalse;
return;
}
if (const auto resource = demo.device->getResourceForKey (key))
deviceIter->subscriptions.emplace (key, ci::Subscription { demo.device->getSubscribeIdForKey (key).value_or (String{}), *resource });
else
deviceIter->subscriptions.erase (key);
demo.appState = std::move (updated);
}
private: private:
void updateProfilesForMuid (ci::MUID muid) void updateProfilesForMuid (ci::MUID muid)
{ {
@ -4843,7 +4853,6 @@ private:
std::unique_ptr<MidiInput> input; std::unique_ptr<MidiInput> input;
std::unique_ptr<MidiOutput> output; std::unique_ptr<MidiOutput> output;
std::optional<ci::Device> device; std::optional<ci::Device> device;
std::list<ErasedScopeGuard> ongoingGetInquiries;
FileChooser fileChooser { "Pick State JSON File", {}, "*.json", true, false, this }; FileChooser fileChooser { "Pick State JSON File", {}, "*.json", true, false, this };
@ -4875,7 +4884,6 @@ private:
.withPropertyDelegate (&propertyDelegate) .withPropertyDelegate (&propertyDelegate)
.withProfileDelegate (&profileDelegate); .withProfileDelegate (&profileDelegate);
ongoingGetInquiries.clear();
device.emplace (options); device.emplace (options);
device->addListener (deviceListener); device->addListener (deviceListener);

View file

@ -799,4 +799,13 @@ namespace TypeHelpers
[[deprecated ("Use std::abs() instead.")]] inline int64 abs64 (int64 n) noexcept { return std::abs (n); } [[deprecated ("Use std::abs() instead.")]] inline int64 abs64 (int64 n) noexcept { return std::abs (n); }
#endif #endif
/** Converts an enum to its underlying integral type.
Similar to std::to_underlying, which is only available in C++23 and above.
*/
template <typename T>
constexpr auto toUnderlyingType (T t) -> std::enable_if_t<std::is_enum_v<T>, std::underlying_type_t<T>>
{
return static_cast<std::underlying_type_t<T>> (t);
}
} // namespace juce } // namespace juce

File diff suppressed because it is too large Load diff

View file

@ -144,86 +144,74 @@ public:
*/ */
void sendPropertyCapabilitiesInquiry (MUID destination); void sendPropertyCapabilitiesInquiry (MUID destination);
/** Sends an inquiry to get a property value from another device, invoking a callback once /** Initiates an inquiry to fetch a property from a particular device.
the full transaction has completed.
@param destination the device whose property will be set @param m the MUID of the device to query
@param header information about the property data that will be sent @param header specifies the resource to query, along with format/encoding options
@param onResult this will be called once the result of the transaction is known. @param onResult called when the transaction completes; not called if the transaction fails to start
If the transaction cannot start for some reason (e.g. the request is @returns a key uniquely identifying this request, if the transaction begins successfully, or nullopt otherwise
malformed, or there are too many simultaneous requests) then the
function will be called immediately. Otherwise, the function will be
called once the destination device has confirmed receipt of the
inquiry.
@return a token bounding the lifetime of the request.
If you need to terminate the transaction before it has completed,
you can call reset() on this token, or cause its destructor to run.
*/ */
ErasedScopeGuard sendPropertyGetInquiry (MUID destination, std::optional<RequestKey> sendPropertyGetInquiry (MUID m,
const PropertyRequestHeader& header, const PropertyRequestHeader& header,
std::function<void (const PropertyExchangeResult&)> onResult); std::function<void (const PropertyExchangeResult&)> onResult);
/** Sends an inquiry to set a property value on another device, invoking a callback once /** Initiates an inquiry to set a property on a particular device.
the full transaction has completed.
@param destination the device whose property will be set @param m the MUID of the device to query
@param header information about the property data that will be sent @param header specifies the resource to query, along with format/encoding options
@param body the property data payload to send. @param body the unencoded body content of the message
If the header specifies 'ascii' encoding, then you are responsible @param onResult called when the transaction completes; not called if the transaction fails to start
for ensuring that no byte of the payload data has its most @returns a key uniquely identifying this request, if the transaction begins successfully, or nullopt otherwise
significant bit set. Sending the message will fail if this is not
the case. Otherwise, if another encoding is specified then the
payload data may contain any byte values. You should not attempt to
encode the data yourself; the payload will be automatically encoded
before being sent.
@param onResult this will be called once the result of the transaction is known.
If the transaction cannot start for some reason (e.g. the
destination does not support property exchange, the request is
malformed, or there are too many simultaneous requests) then the
function will be called immediately. Otherwise, the function will be
called once the destination device has confirmed receipt of the
inquiry.
*/ */
void sendPropertySetInquiry (MUID destination, std::optional<RequestKey> sendPropertySetInquiry (MUID m,
const PropertyRequestHeader& header, const PropertyRequestHeader& header,
Span<const std::byte> body, Span<const std::byte> body,
std::function<void (const PropertyExchangeResult&)> onResult); std::function<void (const PropertyExchangeResult&)> onResult);
/** Sends an inquiry to start a subscription to a property on a device. /** Cancels a request started with sendPropertyGetInquiry() or sendPropertySetInquiry().
The provided callback will be called to indicate whether starting the subscription
succeeded or failed. This sends a property notify message indicating that the responder no longer needs to
When the remote device indicates that its property value has changed, process the initial request.
DeviceListener::propertySubscriptionReceived will be called with information about the
update.
*/ */
void sendPropertySubscriptionStart (MUID, void abortPropertyRequest (RequestKey);
const PropertySubscriptionHeader& header,
std::function<void (const PropertyExchangeResult&)>);
/** Sends an inquiry to end a subscription to a property on a device. /** Returns the request id corresponding to a particular request.
The provided callback will be called to indicate whether the subscriber acknowledged
receipt of the message. If the request could not be found (it never started, or already finished), then this
Note that the remote device may also choose to terminate the subscription of its own returns nullopt.
accord - in this case, the end request will be sent to
DeviceListener::propertySubscriptionReceived.
*/ */
void sendPropertySubscriptionEnd (MUID, std::optional<RequestID> getIdForRequestKey (RequestKey) const;
const String& subscribeId,
std::function<void (const PropertyExchangeResult&)>);
/** Returns all of the subscriptions that we have requested from another device. /** Returns all the ongoing requests. */
std::vector<RequestKey> getOngoingRequests() const;
Does *not* include subscriptions that other devices have requested from us. /** Attempts to begin a subscription with the provided attributes.
Once the subscription is no longer required, cancel it by passing the SubscriptionKey to endSubscription().
*/ */
std::vector<Subscription> getOngoingSubscriptionsForMuid (MUID m) const; SubscriptionKey beginSubscription (MUID m, const PropertySubscriptionHeader& header);
/** Returns the number of transactions initiated by us that are yet to receive complete replies. /** Ends a previously-started subscription. */
void endSubscription (SubscriptionKey);
Does *not* include the count of unfinished requests addressed to us by other devices. /** Returns all the subscriptions that have been initiated by this device. */
std::vector<SubscriptionKey> getOngoingSubscriptions() const;
@see PropertyHost::countOngoingTransactions() /** If the provided subscription has started successfully, this returns the subscribeId assigned
to the subscription by the remote device.
*/ */
int countOngoingPropertyTransactions() const; std::optional<String> getSubscribeIdForKey (SubscriptionKey key) const;
/** If the provided subscription has not been cancelled, this returns the name of the
subscribed resource.
*/
std::optional<String> getResourceForKey (SubscriptionKey key) const;
/** Sends any cached messages that need retrying.
@returns true if there are no more messages to send, or false otherwise
*/
bool sendPendingMessages();
//============================================================================== //==============================================================================
/** Adds a listener that will be notified when particular events occur. /** Adds a listener that will be notified when particular events occur.

View file

@ -141,6 +141,14 @@ struct DeviceListener
*/ */
virtual void propertySubscriptionDataReceived ([[maybe_unused]] MUID x, virtual void propertySubscriptionDataReceived ([[maybe_unused]] MUID x,
[[maybe_unused]] const PropertySubscriptionData& data) {} [[maybe_unused]] const PropertySubscriptionData& data) {}
/** Called when a remote device updates a subscription by accepting or terminating it.
If the subscription was accepted, the subscribeId will be non-null. Otherwise, a null
subscribeId indicates that the subscription was terminated.
*/
virtual void propertySubscriptionChanged ([[maybe_unused]] SubscriptionKey subscription,
[[maybe_unused]] const std::optional<String>& subscribeId) {}
}; };
} // namespace juce::midi_ci } // namespace juce::midi_ci

View file

@ -29,8 +29,7 @@ namespace juce::midi_ci
class PropertyExchangeCache class PropertyExchangeCache
{ {
public: public:
explicit PropertyExchangeCache (std::function<void()> term) PropertyExchangeCache() = default;
: onTerminate (std::move (term)) {}
struct OwningResult struct OwningResult
{ {
@ -66,16 +65,20 @@ public:
const auto headerJson = JSON::parse (String (headerStorage.data(), headerStorage.size())); const auto headerJson = JSON::parse (String (headerStorage.data(), headerStorage.size()));
onTerminate = nullptr; terminate();
const auto encodingString = headerJson.getProperty ("mutualEncoding", "ASCII").toString(); const auto encodingString = headerJson.getProperty ("mutualEncoding", "ASCII").toString();
if (chunk.thisChunkNum != chunk.totalNumChunks) if (chunk.thisChunkNum != chunk.totalNumChunks)
return std::optional<OwningResult> { std::in_place, PropertyExchangeResult::Error::partial }; return std::optional<OwningResult> { std::in_place, PropertyExchangeResult::Error::partial };
const int status = headerJson.getProperty ("status", 200);
if (status == 343)
return std::optional<OwningResult> { std::in_place, PropertyExchangeResult::Error::tooManyTransactions };
return std::optional<OwningResult> { std::in_place, return std::optional<OwningResult> { std::in_place,
headerJson, headerJson,
Encodings::decode (bodyStorage, EncodingUtils::toEncoding (encodingString.toRawUTF8()).value_or (Encoding::ascii)) }; Encodings::decode (bodyStorage, EncodingUtils::toEncoding (encodingString.toRawUTF8()).value_or (Encoding::ascii)) };
} }
std::optional<OwningResult> notify (Span<const std::byte> header) std::optional<OwningResult> notify (Span<const std::byte> header)
@ -90,21 +93,20 @@ public:
if (! status.isInt() || (int) status == 100) if (! status.isInt() || (int) status == 100)
return {}; return {};
onTerminate = nullptr; terminate();
return std::optional<OwningResult> { std::in_place, PropertyExchangeResult::Error::notify }; return std::optional<OwningResult> { std::in_place, PropertyExchangeResult::Error::notify };
} }
void terminate() bool terminate()
{ {
if (auto t = std::exchange (onTerminate, nullptr)) return std::exchange (ongoing, false);
t();
} }
private: private:
std::vector<char> headerStorage; std::vector<char> headerStorage;
std::vector<std::byte> bodyStorage; std::vector<std::byte> bodyStorage;
std::function<void()> onTerminate;
uint16_t lastChunk = 0; uint16_t lastChunk = 0;
bool ongoing = true;
}; };
//============================================================================== //==============================================================================
@ -113,52 +115,104 @@ class PropertyExchangeCacheArray
public: public:
PropertyExchangeCacheArray() = default; PropertyExchangeCacheArray() = default;
ErasedScopeGuard primeCacheForRequestId (std::byte id, Token64 primeCacheForRequestId (uint8_t id, std::function<void (const PropertyExchangeResult&)> onDone)
std::function<void (const PropertyExchangeResult&)> onDone,
std::function<void()> onTerminate)
{ {
auto& entry = caches[(uint8_t) id]; jassert (id < caches.size());
entry = std::make_shared<Transaction> (std::move (onDone), std::move (onTerminate));
auto weak = std::weak_ptr<Transaction> (entry);
return ErasedScopeGuard { [&entry, weak] ++lastKey;
auto& entry = caches[id];
if (entry.has_value())
{ {
// If this fails, then the transaction finished before the ErasedScopeGuard was destroyed. // Trying to start a new message with the same id as another in-progress message
if (auto locked = weak.lock()) jassertfalse;
{ ids.erase (entry->key);
entry->cache.terminate(); }
entry = nullptr;
} const auto& item = entry.emplace (id, std::move (onDone), Token64 { lastKey });
} }; ids.emplace (item.key, id);
return item.key;
} }
void addChunk (std::byte b, const Message::DynamicSizePropertyExchange& chunk) bool terminate (Token64 key)
{
const auto iter = ids.find (key);
// If the key isn't found, then the transaction must have completed already
if (iter == ids.end())
return false;
// We're about to terminate this transaction, so we don't need to retain this record
auto index = iter->second;
ids.erase (iter);
auto& entry = caches[index];
// If the entry is null, something's gone wrong. The ids map should only contain elements for
// non-null cache entries.
if (! entry.has_value())
{
jassertfalse;
return false;
}
const auto result = entry->cache.terminate();
entry.reset();
return result;
}
void addChunk (RequestID b, const Message::DynamicSizePropertyExchange& chunk)
{ {
updateCache (b, [&] (PropertyExchangeCache& c) { return c.addChunk (chunk); }); updateCache (b, [&] (PropertyExchangeCache& c) { return c.addChunk (chunk); });
} }
void notify (std::byte b, Span<const std::byte> header) void notify (RequestID b, Span<const std::byte> header)
{ {
updateCache (b, [&] (PropertyExchangeCache& c) { return c.notify (header); }); updateCache (b, [&] (PropertyExchangeCache& c) { return c.notify (header); });
} }
bool hasTransaction (std::byte id) const std::optional<Token64> getKeyForId (RequestID id) const
{ {
return caches[(uint8_t) id] != nullptr; if (auto& c = caches[id.asInt()])
return c->key;
return {};
} }
uint8_t countOngoingTransactions() const bool hasTransaction (RequestID id) const
{ {
return (uint8_t) std::count_if (caches.begin(), caches.end(), [] (auto& c) { return c != nullptr; }); return getKeyForId (id).has_value();
} }
/** MSB of result is set on failure. */ std::optional<RequestID> getIdForKey (Token64 key) const
std::byte findUnusedId (uint8_t maxSimultaneousTransactions) const {
const auto iter = ids.find (key);
return iter != ids.end() ? RequestID::create (iter->second) : std::nullopt;
}
auto countOngoingTransactions() const
{
jassert (ids.size() == (size_t) std::count_if (caches.begin(), caches.end(), [] (auto& c) { return c.has_value(); }));
return (int) ids.size();
}
auto getOngoingTransactions() const
{
jassert (ids.size() == (size_t) std::count_if (caches.begin(), caches.end(), [] (auto& c) { return c.has_value(); }));
std::vector<Token64> result (ids.size());
std::transform (ids.begin(), ids.end(), result.begin(), [] (const auto& p) { return Token64 { p.first }; });
return result;
}
std::optional<RequestID> findUnusedId (uint8_t maxSimultaneousTransactions) const
{ {
if (countOngoingTransactions() >= maxSimultaneousTransactions) if (countOngoingTransactions() >= maxSimultaneousTransactions)
return std::byte { 0xff }; return {};
return (std::byte) std::distance (caches.begin(), std::find (caches.begin(), caches.end(), nullptr)); return RequestID::create ((uint8_t) std::distance (caches.begin(), std::find (caches.begin(), caches.end(), std::nullopt)));
} }
// Instances must stay at the same location to ensure that references captured in the // Instances must stay at the same location to ensure that references captured in the
@ -172,57 +226,66 @@ private:
class Transaction class Transaction
{ {
public: public:
Transaction (std::function<void (const PropertyExchangeResult&)> onSuccess, Transaction (uint8_t i, std::function<void (const PropertyExchangeResult&)> onSuccess, Token64 k)
std::function<void()> onTerminate) : onFinish (std::move (onSuccess)), key (k), id (i) {}
: cache (std::move (onTerminate)), onFinish (std::move (onSuccess)) {}
PropertyExchangeCache cache; PropertyExchangeCache cache;
std::function<void (const PropertyExchangeResult&)> onFinish; std::function<void (const PropertyExchangeResult&)> onFinish;
Token64 key{};
uint8_t id = 0;
}; };
template <typename WithCache> template <typename WithCache>
void updateCache (std::byte b, WithCache&& withCache) void updateCache (RequestID b, WithCache&& withCache)
{ {
if (auto& entry = caches[(uint8_t) b]) if (auto& entry = caches[b.asInt()])
{ {
if (const auto result = withCache (entry->cache)) if (const auto result = withCache (entry->cache))
{ {
const auto tmp = std::move (entry->onFinish); const auto tmp = std::move (*entry);
entry = nullptr; ids.erase (tmp.key);
NullCheckedInvocation::invoke (tmp, result->result); entry.reset();
NullCheckedInvocation::invoke (tmp.onFinish, result->result);
} }
} }
} }
std::array<std::shared_ptr<Transaction>, numCaches> caches; std::array<std::optional<Transaction>, numCaches> caches;
std::map<Token64, uint8_t> ids;
uint64_t lastKey = 0;
}; };
//============================================================================== //==============================================================================
class InitiatorPropertyExchangeCache::Impl class InitiatorPropertyExchangeCache::Impl
{ {
public: public:
TokenAndId primeCache (uint8_t maxSimultaneousRequests, std::optional<Token64> primeCache (uint8_t maxSimultaneousRequests,
std::function<void (const PropertyExchangeResult&)> onDone, std::function<void (const PropertyExchangeResult&)> onDone)
std::function<void (std::byte)> onTerminate)
{ {
const auto id = array.findUnusedId (maxSimultaneousRequests); const auto id = array.findUnusedId (maxSimultaneousRequests);
if ((id & std::byte { 0x80 }) != std::byte{}) return id.has_value() ? std::optional<Token64> (array.primeCacheForRequestId (id->asInt(), std::move (onDone)))
{ : std::nullopt;
NullCheckedInvocation::invoke (onDone, PropertyExchangeResult { PropertyExchangeResult::Error::tooManyTransactions });
return {};
}
auto token = array.primeCacheForRequestId (id,
std::move (onDone),
[id, term = std::move (onTerminate)] { NullCheckedInvocation::invoke (term, id); });
return { std::move (token), id };
} }
void addChunk (std::byte b, const Message::DynamicSizePropertyExchange& chunk) { array.addChunk (b, chunk); } bool terminate (Token64 token)
void notify (std::byte b, Span<const std::byte> header) { array.notify (b, header); } {
int countOngoingTransactions() const { return array.countOngoingTransactions(); } return array.terminate (token);
bool isAwaitingResponse() const { return countOngoingTransactions() != 0; } }
std::optional<Token64> getTokenForRequestId (RequestID id) const
{
return array.getKeyForId (id);
}
std::optional<RequestID> getRequestIdForToken (Token64 token) const
{
return array.getIdForKey (token);
}
void addChunk (RequestID b, const Message::DynamicSizePropertyExchange& chunk) { array.addChunk (b, chunk); }
void notify (RequestID b, Span<const std::byte> header) { array.notify (b, header); }
auto getOngoingTransactions() const { return array.getOngoingTransactions(); }
private: private:
PropertyExchangeCacheArray array; PropertyExchangeCacheArray array;
@ -234,17 +297,18 @@ InitiatorPropertyExchangeCache::InitiatorPropertyExchangeCache (InitiatorPropert
InitiatorPropertyExchangeCache& InitiatorPropertyExchangeCache::operator= (InitiatorPropertyExchangeCache&&) noexcept = default; InitiatorPropertyExchangeCache& InitiatorPropertyExchangeCache::operator= (InitiatorPropertyExchangeCache&&) noexcept = default;
InitiatorPropertyExchangeCache::~InitiatorPropertyExchangeCache() = default; InitiatorPropertyExchangeCache::~InitiatorPropertyExchangeCache() = default;
InitiatorPropertyExchangeCache::TokenAndId InitiatorPropertyExchangeCache::primeCache (uint8_t maxSimultaneousTransactions, std::optional<Token64> InitiatorPropertyExchangeCache::primeCache (uint8_t maxSimultaneousTransactions,
std::function<void (const PropertyExchangeResult&)> onDone, std::function<void (const PropertyExchangeResult&)> onDone)
std::function<void (std::byte)> onTerminate)
{ {
return pimpl->primeCache (maxSimultaneousTransactions, std::move (onDone), std::move (onTerminate)); return pimpl->primeCache (maxSimultaneousTransactions, std::move (onDone));
} }
void InitiatorPropertyExchangeCache::addChunk (std::byte b, const Message::DynamicSizePropertyExchange& chunk) { pimpl->addChunk (b, chunk); } bool InitiatorPropertyExchangeCache::terminate (Token64 token) { return pimpl->terminate (token); }
void InitiatorPropertyExchangeCache::notify (std::byte b, Span<const std::byte> header) { pimpl->notify (b, header); } std::optional<Token64> InitiatorPropertyExchangeCache::getTokenForRequestId (RequestID id) const { return pimpl->getTokenForRequestId (id); }
int InitiatorPropertyExchangeCache::countOngoingTransactions() const { return pimpl->countOngoingTransactions(); } std::optional<RequestID> InitiatorPropertyExchangeCache::getRequestIdForToken (Token64 token) const { return pimpl->getRequestIdForToken (token); }
bool InitiatorPropertyExchangeCache::isAwaitingResponse() const { return pimpl->isAwaitingResponse(); } void InitiatorPropertyExchangeCache::addChunk (RequestID b, const Message::DynamicSizePropertyExchange& chunk) { pimpl->addChunk (b, chunk); }
void InitiatorPropertyExchangeCache::notify (RequestID b, Span<const std::byte> header) { pimpl->notify (b, header); }
std::vector<Token64> InitiatorPropertyExchangeCache::getOngoingTransactions() const { return pimpl->getOngoingTransactions(); }
//============================================================================== //==============================================================================
class ResponderPropertyExchangeCache::Impl class ResponderPropertyExchangeCache::Impl
@ -252,7 +316,7 @@ class ResponderPropertyExchangeCache::Impl
public: public:
void primeCache (uint8_t maxSimultaneousTransactions, void primeCache (uint8_t maxSimultaneousTransactions,
std::function<void (const PropertyExchangeResult&)> onDone, std::function<void (const PropertyExchangeResult&)> onDone,
std::byte id) RequestID id)
{ {
if (array.hasTransaction (id)) if (array.hasTransaction (id))
return; return;
@ -260,11 +324,11 @@ public:
if (array.countOngoingTransactions() >= maxSimultaneousTransactions) if (array.countOngoingTransactions() >= maxSimultaneousTransactions)
NullCheckedInvocation::invoke (onDone, PropertyExchangeResult { PropertyExchangeResult::Error::tooManyTransactions }); NullCheckedInvocation::invoke (onDone, PropertyExchangeResult { PropertyExchangeResult::Error::tooManyTransactions });
else else
array.primeCacheForRequestId (id, std::move (onDone), nullptr).release(); array.primeCacheForRequestId (id.asInt(), std::move (onDone));
} }
void addChunk (std::byte b, const Message::DynamicSizePropertyExchange& chunk) { array.addChunk (b, chunk); } void addChunk (RequestID b, const Message::DynamicSizePropertyExchange& chunk) { array.addChunk (b, chunk); }
void notify (std::byte b, Span<const std::byte> header) { array.notify (b, header); } void notify (RequestID b, Span<const std::byte> header) { array.notify (b, header); }
int countOngoingTransactions() const { return array.countOngoingTransactions(); } int countOngoingTransactions() const { return array.countOngoingTransactions(); }
private: private:
@ -279,13 +343,13 @@ ResponderPropertyExchangeCache::~ResponderPropertyExchangeCache() = default;
void ResponderPropertyExchangeCache::primeCache (uint8_t maxSimultaneousTransactions, void ResponderPropertyExchangeCache::primeCache (uint8_t maxSimultaneousTransactions,
std::function<void (const PropertyExchangeResult&)> onDone, std::function<void (const PropertyExchangeResult&)> onDone,
std::byte id) RequestID id)
{ {
return pimpl->primeCache (maxSimultaneousTransactions, std::move (onDone), id); return pimpl->primeCache (maxSimultaneousTransactions, std::move (onDone), id);
} }
void ResponderPropertyExchangeCache::addChunk (std::byte b, const Message::DynamicSizePropertyExchange& chunk) { pimpl->addChunk (b, chunk); } void ResponderPropertyExchangeCache::addChunk (RequestID b, const Message::DynamicSizePropertyExchange& chunk) { pimpl->addChunk (b, chunk); }
void ResponderPropertyExchangeCache::notify (std::byte b, Span<const std::byte> header) { pimpl->notify (b, header); } void ResponderPropertyExchangeCache::notify (RequestID b, Span<const std::byte> header) { pimpl->notify (b, header); }
int ResponderPropertyExchangeCache::countOngoingTransactions() const { return pimpl->countOngoingTransactions(); } int ResponderPropertyExchangeCache::countOngoingTransactions() const { return pimpl->countOngoingTransactions(); }
} // namespace juce::midi_ci } // namespace juce::midi_ci

View file

@ -26,6 +26,81 @@
namespace juce::midi_ci namespace juce::midi_ci
{ {
/**
A strongly-typed identifier for a 7-bit request ID with a nullable state.
@tags{Audio}
*/
class RequestID
{
public:
/** Constructs a RequestID if the provided value is valid, i.e. its most significant bit is
not set. Otherwise, returns nullopt.
*/
static std::optional<RequestID> create (uint8_t v)
{
if (v < 128)
return RequestID { v };
return {};
}
/** Constructs a RequestID if the provided value is valid, i.e. its most significant bit is
not set. Otherwise, returns nullopt.
*/
static std::optional<RequestID> create (std::byte value)
{
return create (static_cast<uint8_t> (value));
}
/** Returns the byte corresponding to this ID. */
std::byte asByte() const
{
return std::byte { value };
}
/** Returns the int value of this ID. */
uint8_t asInt() const
{
return value;
}
/** Equality operator. */
bool operator== (RequestID other) const
{
return value == other.value;
}
/** Inequality operator. */
bool operator!= (RequestID other) const
{
return ! operator== (other);
}
private:
/* Constructs a non-null request ID.
The argument must not have its most significant bit set.
*/
explicit RequestID (uint8_t index)
: value (index)
{
// IDs must only use the lowest 7 bits
jassert (value < 128);
}
uint8_t value{};
};
/** A strongly-typed 64-bit identifier. */
enum class Token64 : uint64_t {};
/** Compares Token64 instances. */
constexpr bool operator< (Token64 a, Token64 b)
{
return toUnderlyingType (a) < toUnderlyingType (b);
}
/** /**
Accumulates message chunks that have been sent by another device in response Accumulates message chunks that have been sent by another device in response
to a transaction initiated by a local device. to a transaction initiated by a local device.
@ -43,44 +118,38 @@ public:
JUCE_DECLARE_NON_COPYABLE (InitiatorPropertyExchangeCache) JUCE_DECLARE_NON_COPYABLE (InitiatorPropertyExchangeCache)
/** Holds a token that can be used to stop waiting for a reply, along with
an identifier byte that uniquely identifies an ongoing transaction.
@tags{Audio}
*/
struct TokenAndId
{
TokenAndId() = default;
TokenAndId (ErasedScopeGuard tokenIn, std::byte idIn)
: token (std::move (tokenIn)), id (idIn) {}
bool isValid() const { return (id & std::byte { 0x80 }) == std::byte{}; }
ErasedScopeGuard token{};
std::byte id { 0x80 };
};
/** Picks an unused request ID, and prepares the cache for that ID to accumulate message chunks. /** Picks an unused request ID, and prepares the cache for that ID to accumulate message chunks.
Incoming chunks added with addChunk are generated by another device acting as a responder. Incoming chunks added with addChunk are generated by another device acting as a responder.
*/ */
TokenAndId primeCache (uint8_t maxSimultaneousRequests, std::optional<Token64> primeCache (uint8_t maxSimultaneousRequests,
std::function<void (const PropertyExchangeResult&)> onDone, std::function<void (const PropertyExchangeResult&)> onDone);
std::function<void (std::byte)> onTerminate);
/** Terminates/cancels an ongoing transaction.
Returns true if the termination had an effect (i.e. the transaction was still ongoing), or
false otherwise (the transaction already ended or never started).
*/
bool terminate (Token64);
/** If there's a transaction ongoing with the given request id, returns the token uniquely
identifying that transaction, otherwise returns nullopt.
*/
std::optional<Token64> getTokenForRequestId (RequestID) const;
/** If the token refers to an ongoing transaction, returns the request id of that transaction.
Otherwise, returns an invalid request id.
*/
std::optional<RequestID> getRequestIdForToken (Token64) const;
/** Adds a message chunk for the provided transaction id. */ /** Adds a message chunk for the provided transaction id. */
void addChunk (std::byte b, const Message::DynamicSizePropertyExchange& chunk); void addChunk (RequestID, const Message::DynamicSizePropertyExchange& chunk);
/** Updates the transaction state based on the contents of the provided notification. */ /** Updates the transaction state based on the contents of the provided notification. */
void notify (std::byte b, Span<const std::byte> header); void notify (RequestID, Span<const std::byte> header);
/** Returns the number of transactions that have been started but not finished. */ /** Returns all ongoing transactions. */
int countOngoingTransactions() const; std::vector<Token64> getOngoingTransactions() const;
/** Returns true if there are any transactions in progress that
haven't yet received replies.
*/
bool isAwaitingResponse() const;
private: private:
class Impl; class Impl;
@ -104,19 +173,19 @@ public:
JUCE_DECLARE_NON_COPYABLE (ResponderPropertyExchangeCache) JUCE_DECLARE_NON_COPYABLE (ResponderPropertyExchangeCache)
/** Prepares the cache for the given requestId to accumulate message chunks. /** Prepares the cache for the given requestID to accumulate message chunks.
Incoming chunks added with addChunk are generated by another device acting as an initiator. Incoming chunks added with addChunk are generated by another device acting as an initiator.
*/ */
void primeCache (uint8_t maxSimultaneousTransactions, void primeCache (uint8_t maxSimultaneousTransactions,
std::function<void (const PropertyExchangeResult&)> onDone, std::function<void (const PropertyExchangeResult&)> onDone,
std::byte id); RequestID id);
/** Adds a message chunk for the provided transaction id. */ /** Adds a message chunk for the provided transaction id. */
void addChunk (std::byte b, const Message::DynamicSizePropertyExchange& chunk); void addChunk (RequestID, const Message::DynamicSizePropertyExchange& chunk);
/** Updates the transaction state based on the contents of the provided notification. */ /** Updates the transaction state based on the contents of the provided notification. */
void notify (std::byte b, Span<const std::byte> header); void notify (RequestID, Span<const std::byte> header);
/** Returns the number of transactions that have been started but not finished. */ /** Returns the number of transactions that have been started but not finished. */
int countOngoingTransactions() const; int countOngoingTransactions() const;

View file

@ -49,10 +49,6 @@ public:
tooManyTransactions, ///< Unable to send the request because doing so would tooManyTransactions, ///< Unable to send the request because doing so would
///< exceed the number of simultaneous inquiries that were declared. ///< exceed the number of simultaneous inquiries that were declared.
///< @see PropertyDelegate::getNumSimultaneousRequestsSupported(). ///< @see PropertyDelegate::getNumSimultaneousRequestsSupported().
invalidPayload, ///< The payload couldn't be encoded for transmission. If you're
///< using the ASCII encoding, maybe some bytes have their most
///< significant bit set.
}; };
/** Creates a result denoting an error state. */ /** Creates a result denoting an error state. */

View file

@ -36,7 +36,6 @@ public:
void visit (const Message::PropertyGetData& body) const override { visitImpl (body); } void visit (const Message::PropertyGetData& body) const override { visitImpl (body); }
void visit (const Message::PropertySetData& body) const override { visitImpl (body); } void visit (const Message::PropertySetData& body) const override { visitImpl (body); }
void visit (const Message::PropertySubscribe& body) const override { visitImpl (body); } void visit (const Message::PropertySubscribe& body) const override { visitImpl (body); }
void visit (const Message::PropertyNotify& body) const override { visitImpl (body); }
using MessageVisitor::visit; using MessageVisitor::visit;
private: private:
@ -88,43 +87,56 @@ private:
const auto source = output->getIncomingHeader().source; const auto source = output->getIncomingHeader().source;
const auto dest = output->getIncomingHeader().destination; const auto dest = output->getIncomingHeader().destination;
const auto group = output->getIncomingGroup(); const auto group = output->getIncomingGroup();
const auto request = data.requestID; const auto request = RequestID::create (data.requestID);
caches->primeCache (host->delegate.getNumSimultaneousRequestsSupported(), [this, source, dest, group, request] (const PropertyExchangeResult& result)
if (! request.has_value())
return false;
caches->primeCache (host->delegate.getNumSimultaneousRequestsSupported(), [hostPtr = host, source, dest, group, request] (const PropertyExchangeResult& result)
{ {
const auto send = [&] (const PropertyReplyHeader& header) const auto send = [&] (const PropertyReplyHeader& header)
{ {
detail::MessageTypeUtils::send (host->output, detail::MessageTypeUtils::send (hostPtr->output,
group, group,
Message::Header { ChannelInGroup::wholeBlock, Message::Header { ChannelInGroup::wholeBlock,
detail::MessageMeta::Meta<Message::PropertySetDataResponse>::subID2, detail::MessageMeta::Meta<Message::PropertySetDataResponse>::subID2,
detail::MessageMeta::implementationVersion, detail::MessageMeta::implementationVersion,
dest, dest,
source }, source },
Message::PropertySetDataResponse { { request, Encodings::jsonTo7BitText (header.toVarCondensed()) } }); Message::PropertySetDataResponse { { request->asByte(), Encodings::jsonTo7BitText (header.toVarCondensed()) } });
}; };
if (result.getError() == PropertyExchangeResult::Error::tooManyTransactions) const auto sendStatus = [&] (int status, StringRef message)
{ {
PropertyReplyHeader header; PropertyReplyHeader header;
header.status = 343; header.status = status;
header.message = TRANS ("The device has initiated too many simultaneous requests"); header.message = message;
send (header); send (header);
};
if (const auto error = result.getError())
{
switch (*error)
{
case PropertyExchangeResult::Error::tooManyTransactions:
sendStatus (343, TRANS ("The device has initiated too many simultaneous requests"));
break;
case PropertyExchangeResult::Error::partial:
sendStatus (400, TRANS ("Request was incomplete"));
break;
case PropertyExchangeResult::Error::notify:
break;
}
return; return;
} }
if (result.getError().has_value()) send (hostPtr->delegate.propertySetDataRequested (source, { result.getHeaderAsRequestHeader(), result.getBody() }));
{ }, *request);
PropertyReplyHeader header;
header.status = 400;
header.message = TRANS ("Request was incomplete");
send (header);
return;
}
send (host->delegate.propertySetDataRequested (source, { result.getHeaderAsRequestHeader(), result.getBody() })); caches->addChunk (*request, data);
}, request);
caches->addChunk (data.requestID, data);
return true; return true;
} }
@ -206,19 +218,6 @@ private:
return false; return false;
} }
bool messageReceived (const Message::PropertyNotify& n) const
{
const auto m = output->getIncomingHeader().source;
if (auto* it = host->cacheProvider.getCacheForMuidAsResponder (m))
it->notify (n.requestID, n.header);
if (auto* it = host->cacheProvider.getCacheForMuidAsInitiator (m))
it->notify (n.requestID, n.header);
return true;
}
PropertyHost* host = nullptr; PropertyHost* host = nullptr;
ResponderOutput* output = nullptr; ResponderOutput* output = nullptr;
bool* handled = nullptr; bool* handled = nullptr;
@ -263,10 +262,10 @@ bool PropertyHost::tryRespond (ResponderOutput& responderOutput, const Message::
return result; return result;
} }
ErasedScopeGuard PropertyHost::sendSubscriptionUpdate (MUID device, std::optional<RequestKey> PropertyHost::sendSubscriptionUpdate (MUID device,
const PropertySubscriptionHeader& header, const PropertySubscriptionHeader& header,
Span<const std::byte> body, Span<const std::byte> body,
std::function<void (const PropertyExchangeResult&)> cb) std::function<void (const PropertyExchangeResult&)> cb)
{ {
const auto deviceIter = registry.find (device); const auto deviceIter = registry.find (device);
@ -309,7 +308,6 @@ ErasedScopeGuard PropertyHost::sendSubscriptionUpdate (MUID device,
if (caches == nullptr) if (caches == nullptr)
return {}; return {};
const auto terminator = detail::PropertyHostUtils::getTerminator (output, functionBlock, device);
auto wrappedCallback = [&]() -> std::function<void (const PropertyExchangeResult&)> auto wrappedCallback = [&]() -> std::function<void (const PropertyExchangeResult&)>
{ {
if (header.command != PropertySubscriptionCommand::end) if (header.command != PropertySubscriptionCommand::end)
@ -331,27 +329,32 @@ ErasedScopeGuard PropertyHost::sendSubscriptionUpdate (MUID device,
if (! encoded.has_value()) if (! encoded.has_value())
{ {
NullCheckedInvocation::invoke (wrappedCallback, PropertyExchangeResult { PropertyExchangeResult::Error::invalidPayload }); // The data could not be encoded successfully
jassertfalse;
return {}; return {};
} }
auto primed = caches->primeCache (delegate.getNumSimultaneousRequestsSupported(), const auto primed = caches->primeCache (delegate.getNumSimultaneousRequestsSupported(),
std::move (wrappedCallback), std::move (wrappedCallback));
std::move (terminator));
if (! primed.isValid()) if (! primed.has_value())
return {};
const auto id = caches->getRequestIdForToken (*primed);
if (! id.has_value())
return {}; return {};
detail::PropertyHostUtils::send (output, detail::PropertyHostUtils::send (output,
functionBlock.firstGroup, functionBlock.firstGroup,
detail::MessageMeta::Meta<Message::PropertySubscribe>::subID2, detail::MessageMeta::Meta<Message::PropertySubscribe>::subID2,
device, device,
primed.id, id->asByte(),
Encodings::jsonTo7BitText (header.toVarCondensed()), Encodings::jsonTo7BitText (header.toVarCondensed()),
*encoded, *encoded,
cacheProvider.getMaxSysexSizeForMuid (device)); cacheProvider.getMaxSysexSizeForMuid (device));
return std::move (primed.token); return RequestKey { device, *primed };
} }
void PropertyHost::terminateSubscription (MUID device, const String& subscribeId) void PropertyHost::terminateSubscription (MUID device, const String& subscribeId)
@ -380,7 +383,7 @@ void PropertyHost::terminateSubscription (MUID device, const String& subscribeId
header.subscribeId = subscribeId; header.subscribeId = subscribeId;
header.resource = subIter->second; header.resource = subIter->second;
sendSubscriptionUpdate (device, header, {}, nullptr).release(); sendSubscriptionUpdate (device, header, {}, nullptr);
} }
PropertyHost::SubscriptionToken PropertyHost::uidFromSubscribeId (String id) PropertyHost::SubscriptionToken PropertyHost::uidFromSubscribeId (String id)

View file

@ -26,6 +26,39 @@
namespace juce::midi_ci namespace juce::midi_ci
{ {
/**
A key used to uniquely identify ongoing transactions initiated by a ci::Device.
@tags{Audio}
*/
class RequestKey
{
auto tie() const { return std::tuple (m, v); }
public:
/** Constructor. */
RequestKey (MUID muid, Token64 key) : m (muid), v (key) {}
/** Returns the muid of the device to which we are subscribed. */
MUID getMuid() const { return m; }
/** Returns an identifier unique to this subscription. */
Token64 getKey() const { return v; }
/** Equality operator. */
bool operator== (const RequestKey& other) const { return tie() == other.tie(); }
/** Inequality operator. */
bool operator!= (const RequestKey& other) const { return tie() != other.tie(); }
/** Less-than operator. */
bool operator< (const RequestKey& other) const { return tie() < other.tie(); }
private:
MUID m;
Token64 v{};
};
/** /**
Acting as a ResponderListener, instances of this class can formulate Acting as a ResponderListener, instances of this class can formulate
appropriate replies to property transactions initiated by remote devices. appropriate replies to property transactions initiated by remote devices.
@ -67,13 +100,12 @@ public:
The provided callback will be called once the remote device has confirmed The provided callback will be called once the remote device has confirmed
receipt of the subscription update. If the state of your application receipt of the subscription update. If the state of your application
changes such that you no longer need to respond/wait for confirmation, changes such that you no longer need to respond/wait for confirmation,
you can allow the returned Guard to fall out of scope, or reset it you can pass the request key to Device::abortPropertyRequest().
manually.
*/ */
ErasedScopeGuard sendSubscriptionUpdate (MUID device, std::optional<RequestKey> sendSubscriptionUpdate (MUID device,
const PropertySubscriptionHeader& header, const PropertySubscriptionHeader& header,
Span<const std::byte> body, Span<const std::byte> body,
std::function<void (const PropertyExchangeResult&)> callback); std::function<void (const PropertyExchangeResult&)> callback);
/** Terminates a subscription that was started by a remote device. /** Terminates a subscription that was started by a remote device.

View file

@ -0,0 +1,812 @@
/*
==============================================================================
This file is part of the JUCE library.
Copyright (c) 2022 - Raw Material Software Limited
JUCE is an open source library subject to commercial or open-source
licensing.
By using JUCE, you agree to the terms of both the JUCE 7 End-User License
Agreement and JUCE Privacy Policy.
End User License Agreement: www.juce.com/juce-7-licence
Privacy Policy: www.juce.com/juce-privacy-policy
Or: You may also use this code under the terms of the GPL v3 (see
www.gnu.org/licenses).
JUCE IS PROVIDED "AS IS" WITHOUT ANY WARRANTY, AND ALL WARRANTIES, WHETHER
EXPRESSED OR IMPLIED, INCLUDING MERCHANTABILITY AND FITNESS FOR PURPOSE, ARE
DISCLAIMED.
==============================================================================
*/
namespace juce::midi_ci
{
struct RequestRetryQueueEntry
{
PropertySubscriptionHeader msg;
Token64 key{}; ///< A unique identifier for this message
bool inFlight = false; ///< True if the message has been sent and we're waiting for a reply, false otherwise
};
/*
A queue to store pending property exchange messages.
A property exchange message may fail to send because the initiator doesn't have enough vacant
property exchange IDs.
Similarly, if the responder doesn't have enough vacant IDs, then it may tell us to retry the
request.
We store messages that we're planning to send, and mark them as in-flight once we've attempted
to send them.
We always try to send the first not-in-flight message in the queue.
If the responder informs us that the message was actioned, or there was an unrecoverable error,
then we can remove the message from the queue. We can also remove the message if the user
decides that the message is no longer important.
Otherwise, if the message wasn't sent successfully, we leave the message at its current
position in the queue, and mark it as not-in-flight again.
*/
class RequestRetryQueue
{
using Entry = RequestRetryQueueEntry;
private:
auto getIter (Token64 k)
{
const auto iter = std::lower_bound (entries.begin(),
entries.end(),
(uint64_t) k,
[] (const Entry& e, uint64_t v) { return (uint64_t) e.key < v; });
return iter != entries.end() && iter->key == k ? iter : entries.end();
}
public:
/* Add a new message at the end of the queue, and return the entry for that message. */
Entry* add (PropertySubscriptionHeader msg)
{
const auto key = ++lastKey;
entries.push_back (Entry { std::move (msg), Token64 { key }, false });
return &entries.back();
}
/* Erase the entry for a given key. */
std::optional<Entry> erase (Token64 k)
{
const auto iter = getIter (k);
if (iter == entries.end())
return {};
auto result = std::move (*iter);
entries.erase (iter);
return result;
}
/* Find the next entry that should be sent, and return it after marking it as in-flight. */
const Entry* markNextInFlight()
{
const auto iter = std::find_if (entries.begin(), entries.end(), [] (const Entry& e) { return ! e.inFlight; });
if (iter == entries.end())
return nullptr;
iter->inFlight = true;
return &*iter;
}
void markNotInFlight (Token64 k)
{
const auto iter = getIter (k);
if (iter != entries.end())
iter->inFlight = false;
}
private:
std::vector<Entry> entries;
uint64_t lastKey = 0;
};
/**
Info about a particular subscription.
You can think of this as a subscription agreement as identified by a subscribeId, but this
also holds state that is necessary to negotiate the subscribeId.
*/
struct SubscriptionState
{
// If we're waiting to send this subscription request, this is monostate
// If the request has been sent, but we haven't received a reply, this is the id of the request
// If the subscription started successfully, this is the subscribeId for the subscription
std::variant<std::monostate, Token64, String> state;
String resource;
};
/**
Info about all the subscriptions requested of a particular device/MUID.
This keeps track of the order in which subscription requests are made, so that requests can
be re-tried in order if the initial sending of a request fails.
*/
class DeviceSubscriptionStates
{
public:
Token64 postToQueue (const PropertySubscriptionHeader& header)
{
return queue.add (header)->key;
}
Token64 beginSubscription (const PropertySubscriptionHeader& header)
{
jassert (header.command == PropertySubscriptionCommand::start);
auto headerCopy = header;
headerCopy.command = PropertySubscriptionCommand::start;
const auto key = postToQueue (headerCopy);
stateForSubscription[key].resource = headerCopy.resource;
return key;
}
std::optional<SubscriptionState> endSubscription (Token64 key)
{
queue.erase (key);
const auto iter = stateForSubscription.find (key);
if (iter == stateForSubscription.end())
return {};
auto subInfo = iter->second;
stateForSubscription.erase (iter);
return { std::move (subInfo) };
}
std::vector<Token64> endSubscription (String subscribeId)
{
std::vector<Token64> ended;
for (auto it = stateForSubscription.begin(); it != stateForSubscription.end();)
{
if (const auto* id = std::get_if<String> (&it->second.state))
{
if (*id == subscribeId)
{
ended.push_back (it->first);
queue.erase (it->first);
it = stateForSubscription.erase (it);
continue;
}
}
++it;
}
return ended;
}
void endAll()
{
for (auto& item : stateForSubscription)
queue.erase (item.first);
stateForSubscription.clear();
}
void resetKey (Token64 key)
{
const auto iter = stateForSubscription.find (key);
if (iter != stateForSubscription.end())
iter->second.state = std::monostate{};
queue.markNotInFlight (key);
}
void setRequestIdForKey (Token64 key, Token64 request)
{
const auto iter = stateForSubscription.find (key);
if (iter != stateForSubscription.end())
iter->second.state = request;
}
void setSubscribeIdForKey (Token64 key, String subscribeId)
{
const auto iter = stateForSubscription.find (key);
if (iter != stateForSubscription.end())
iter->second.state = subscribeId;
queue.erase (key);
}
auto* markNextInFlight()
{
return queue.markNextInFlight();
}
std::optional<SubscriptionState> getInfoForSubscriptionKey (Token64 key) const
{
const auto iter = stateForSubscription.find (key);
if (iter != stateForSubscription.end())
return iter->second;
return {};
}
auto begin() const { return stateForSubscription.begin(); }
auto end() const { return stateForSubscription.end(); }
private:
RequestRetryQueue queue;
std::map<Token64, SubscriptionState> stateForSubscription;
};
class SubscriptionManager::Impl : public std::enable_shared_from_this<Impl>,
private DeviceListener
{
public:
explicit Impl (SubscriptionManagerDelegate& d)
: delegate (d) {}
SubscriptionKey beginSubscription (MUID m, const PropertySubscriptionHeader& header)
{
const auto key = infoForMuid[m].beginSubscription (header);
sendPendingMessages();
return SubscriptionKey { m, key };
}
void endSubscription (SubscriptionKey key)
{
const auto iter = infoForMuid.find (key.getMuid());
if (iter == infoForMuid.end())
return;
const auto ended = iter->second.endSubscription (key.getKey());
if (! ended.has_value())
return;
if (auto* request = std::get_if<Token64> (&ended->state))
{
delegate.abortPropertyRequest ({ key.getMuid(), *request });
}
else if (auto* subscribeId = std::get_if<String> (&ended->state))
{
PropertySubscriptionHeader header;
header.command = PropertySubscriptionCommand::end;
header.subscribeId = *subscribeId;
iter->second.postToQueue (header);
sendPendingMessages();
}
}
void endSubscriptionFromResponder (MUID m, String sub)
{
const auto iter = infoForMuid.find (m);
if (iter != infoForMuid.end())
for (const auto& ended : iter->second.endSubscription (sub))
delegate.propertySubscriptionChanged ({ m, ended }, std::nullopt);
}
void endSubscriptionsFromResponder (MUID m)
{
const auto iter = infoForMuid.find (m);
if (iter == infoForMuid.end())
return;
std::vector<Token64> tokens;
std::transform (iter->second.begin(),
iter->second.end(),
std::back_inserter (tokens),
[] (const auto& p) { return p.first; });
iter->second.endAll();
for (const auto& ended : tokens)
delegate.propertySubscriptionChanged ({ m, ended }, std::nullopt);
}
std::vector<SubscriptionKey> getOngoingSubscriptions() const
{
std::vector<SubscriptionKey> result;
for (const auto& pair : infoForMuid)
for (const auto& info : pair.second)
result.emplace_back (pair.first, info.first);
return result;
}
std::optional<SubscriptionState> getInfoForSubscriptionKey (SubscriptionKey key) const
{
const auto iter = infoForMuid.find (key.getMuid());
if (iter != infoForMuid.end())
return iter->second.getInfoForSubscriptionKey (key.getKey());
return {};
}
bool sendPendingMessages()
{
// Note: not using any_of here because we don't want the early-exit behaviour
bool result = true;
for (auto& pair : infoForMuid)
result &= sendPendingMessages (pair.first, pair.second);
return result;
}
private:
void handleReply (SubscriptionKey subscriptionKey, PropertySubscriptionCommand command, const PropertyExchangeResult& r)
{
const auto iter = infoForMuid.find (subscriptionKey.getMuid());
if (iter == infoForMuid.end())
return;
auto& second = iter->second;
if (const auto error = r.getError())
{
// If the responder requested a retry, keep the message in the queue so that
// it can be re-sent
if (*error == PropertyExchangeResult::Error::tooManyTransactions)
{
second.resetKey (subscriptionKey.getKey());
return;
}
// We tried to begin or end a subscription, but the responder said no!
// If the responder declined to start a subscription, we can just
// mark the subscription as ended.
// If the responder declined to end a subscription, that's a bit trickier.
// Hopefully this won't happen in practice, because all the options to resolve are pretty bad:
// - One option is to ignore the failure. The remote device can carry on sending us updates.
// This might be a bit dangerous if we repeatedly subscribe and then fail to unsubscribe, as this
// would result in lots of redundant subscription messages that could clog the connection.
// - Another option is to store the subscription-end request and to attempt to send it again later.
// This also has the potential to clog up the connection, depending on how frequently we attempt
// to re-send failed messages. Given that unsubscribing has already failed once, there's no
// guarantee that any future attempts will succeed, so we might end up in a loop, sending the
// same message over and over.
// On balance, I think the former option is best for now. If this ends up being an issue in
// practice, perhaps we could add a mechanism to do exponential back-off, but that would
// add complexity that isn't necessarily required.
jassert (*error != PropertyExchangeResult::Error::notify);
// If we failed to begin a subscription, then the subscription never started,
// and we should remove it from the set of ongoing subscriptions.
second.endSubscription (subscriptionKey.getKey());
// We only need to alert the delegate if the subscription failed to start.
// If the subscription fails to end, we'll treat the subscription as ended anyway.
if (command == PropertySubscriptionCommand::start)
delegate.propertySubscriptionChanged (subscriptionKey, std::nullopt);
return;
}
if (command == PropertySubscriptionCommand::start)
{
second.setSubscribeIdForKey (subscriptionKey.getKey(), r.getHeaderAsSubscriptionHeader().subscribeId);
delegate.propertySubscriptionChanged (subscriptionKey, r.getHeaderAsSubscriptionHeader().subscribeId);
}
}
bool sendPendingMessages (MUID m, DeviceSubscriptionStates& info)
{
while (auto* entry = info.markNextInFlight())
{
const auto requestKind = entry->msg.command;
const SubscriptionKey subscriptionKey { m, entry->key };
auto cb = [weak = weak_from_this(), requestKind, subscriptionKey] (const PropertyExchangeResult& r)
{
if (const auto locked = weak.lock())
locked->handleReply (subscriptionKey, requestKind, r);
};
if (const auto request = delegate.sendPropertySubscribe (m, entry->msg, std::move (cb)))
{
if (entry->msg.command == PropertySubscriptionCommand::start)
info.setRequestIdForKey (entry->key, request->getKey());
}
else
{
// Couldn't find a valid ID to use, so we must have exhausted all message slots.
// There's no point trying to send the rest of the messages that are queued for this
// MUID, so give up. It's probably a good idea to try again in a bit.
info.resetKey (entry->key);
return false;
}
}
return true;
}
SubscriptionManagerDelegate& delegate;
std::map<MUID, DeviceSubscriptionStates> infoForMuid;
};
//==============================================================================
SubscriptionManager::SubscriptionManager (SubscriptionManagerDelegate& delegate)
: pimpl (std::make_shared<Impl> (delegate)) {}
SubscriptionKey SubscriptionManager::beginSubscription (MUID m, const PropertySubscriptionHeader& header)
{
return pimpl->beginSubscription (m, header);
}
void SubscriptionManager::endSubscription (SubscriptionKey key)
{
pimpl->endSubscription (key);
}
void SubscriptionManager::endSubscriptionFromResponder (MUID m, String sub)
{
pimpl->endSubscriptionFromResponder (m, sub);
}
void SubscriptionManager::endSubscriptionsFromResponder (MUID m)
{
pimpl->endSubscriptionsFromResponder (m);
}
std::vector<SubscriptionKey> SubscriptionManager::getOngoingSubscriptions() const
{
return pimpl->getOngoingSubscriptions();
}
std::optional<String> SubscriptionManager::getSubscribeIdForKey (SubscriptionKey key) const
{
if (const auto info = pimpl->getInfoForSubscriptionKey (key))
if (const auto* subscribeId = std::get_if<String> (&info->state))
return *subscribeId;
return {};
}
std::optional<String> SubscriptionManager::getResourceForKey (SubscriptionKey key) const
{
if (const auto info = pimpl->getInfoForSubscriptionKey (key))
return info->resource;
return {};
}
bool SubscriptionManager::sendPendingMessages()
{
return pimpl->sendPendingMessages();
}
//==============================================================================
//==============================================================================
#if JUCE_UNIT_TESTS
class SubscriptionTests : public UnitTest
{
public:
SubscriptionTests() : UnitTest ("Subscription", UnitTestCategories::midi) {}
void runTest() override
{
auto random = getRandom();
class Delegate : public SubscriptionManagerDelegate
{
public:
std::optional<RequestKey> sendPropertySubscribe (MUID m,
const PropertySubscriptionHeader&,
std::function<void (const PropertyExchangeResult&)> cb) override
{
++sendCount;
if (! sendShouldSucceed)
return {};
const RequestKey key { m, Token64 { ++lastKey } };
callbacks[key] = std::move (cb);
return key;
}
void abortPropertyRequest (RequestKey k) override
{
++abortCount;
callbacks.erase (k);
}
void propertySubscriptionChanged (SubscriptionKey, const std::optional<String>&) override
{
subChanged = true;
}
void setSendShouldSucceed (bool b) { sendShouldSucceed = b; }
void sendResult (RequestKey key, const PropertyExchangeResult& r)
{
const auto iter = callbacks.find (key);
if (iter != callbacks.end())
NullCheckedInvocation::invoke (iter->second, r);
callbacks.erase (key);
}
std::vector<RequestKey> getOngoingRequests() const
{
std::vector<RequestKey> result;
result.reserve (callbacks.size());
std::transform (callbacks.begin(), callbacks.end(), std::back_inserter (result), [] (const auto& p) { return p.first; });
return result;
}
uint64_t getAndClearSendCount() { return std::exchange (sendCount, 0); }
uint64_t getAndClearAbortCount() { return std::exchange (abortCount, 0); }
bool getAndClearSubChanged() { return std::exchange (subChanged, false); }
private:
std::map<RequestKey, std::function<void (const PropertyExchangeResult&)>> callbacks;
uint64_t sendCount = 0, abortCount = 0;
uint64_t lastKey = 0;
bool sendShouldSucceed = true, subChanged = false;
};
Delegate delegate;
SubscriptionManager manager { delegate };
const auto inquiryMUID = MUID::makeRandom (random);
beginTest ("Beginning a subscription and ending it before the remote device replies aborts the request");
{
PropertySubscriptionHeader header;
header.command = PropertySubscriptionCommand::start;
header.resource = "X-CustomProp";
const auto a = manager.beginSubscription (inquiryMUID, header);
expect (manager.getOngoingSubscriptions() == std::vector { a });
expect (delegate.getAndClearSendCount() == 1);
// Sending a subscription request uses a request slot
expect (delegate.getOngoingRequests().size() == 1);
const auto request = delegate.getOngoingRequests().back();
// subscription id is empty until the responder confirms the subscription
expect (manager.getResourceForKey (a) == header.resource);
manager.endSubscription (a);
expect (delegate.getOngoingRequests().empty());
expect (delegate.getAndClearAbortCount() == 1);
expect (manager.getOngoingSubscriptions().empty());
const auto successHeader = []
{
auto ptr = std::make_unique<DynamicObject>();
ptr->setProperty ("status", 200);
ptr->setProperty ("subscribeId", "anId");
return var { ptr.release() };
}();
delegate.sendResult (request, PropertyExchangeResult { successHeader, {} });
// Already ended, the confirmation shouldn't do anything
expect (! delegate.getAndClearSubChanged());
expect (delegate.getOngoingRequests().empty());
expect (delegate.getAndClearSendCount() == 0);
expect (delegate.getAndClearAbortCount() == 0);
// There shouldn't be any queued messages.
expect (manager.sendPendingMessages());
expect (! delegate.getAndClearSubChanged());
expect (delegate.getOngoingRequests().empty());
expect (delegate.getAndClearSendCount() == 0);
expect (delegate.getAndClearAbortCount() == 0);
}
beginTest ("Starting a new subscription while the device is waiting for a previous subscription to be confirmed queues further requests");
{
PropertySubscriptionHeader header;
header.command = PropertySubscriptionCommand::start;
header.resource = "X-CustomProp";
delegate.setSendShouldSucceed (false);
const auto a = manager.beginSubscription (inquiryMUID, header);
expect (delegate.getAndClearSendCount() == 1);
expect (! manager.sendPendingMessages());
expect (delegate.getAndClearSendCount() == 1);
delegate.setSendShouldSucceed (true);
expect (manager.sendPendingMessages());
expect (delegate.getAndClearSendCount() == 1);
delegate.setSendShouldSucceed (false);
const auto b = manager.beginSubscription (inquiryMUID, header);
const auto c = manager.beginSubscription (inquiryMUID, header);
expect (manager.getOngoingSubscriptions() == std::vector { a, b, c });
expect (delegate.getOngoingRequests().size() == 1);
// subscription id is empty until the responder confirms the subscription
expect (manager.getResourceForKey (a) == header.resource);
expect (manager.getResourceForKey (b) == header.resource);
expect (manager.getResourceForKey (c) == header.resource);
expect (delegate.getAndClearSendCount() == 2);
expect (delegate.getAndClearAbortCount() == 0);
delegate.setSendShouldSucceed (true);
// The device has sent a subscription start for a, but not for c,
// so it should send a notify to end subscription a, but shouldn't emit any
// messages related to subscription c.
manager.endSubscription (a);
manager.endSubscription (c);
expect (manager.getOngoingSubscriptions() == std::vector { b });
expect (delegate.getOngoingRequests().empty());
expect (delegate.getAndClearSendCount() == 0);
expect (delegate.getAndClearAbortCount() == 1);
// There should still be requests related to subscription b pending
expect (manager.sendPendingMessages());
expect (delegate.getOngoingRequests().size() == 1);
expect (delegate.getAndClearSendCount() == 1);
expect (delegate.getAndClearAbortCount() == 0);
// Now, we should send a terminate request for subscription b
manager.endSubscription (b);
expect (delegate.getOngoingRequests().empty());
expect (delegate.getAndClearSendCount() == 0);
expect (delegate.getAndClearAbortCount() == 1);
// The manager never received any replies, so it shouldn't have notified listeners about
// changed subscriptions
expect (! delegate.getAndClearSubChanged());
}
beginTest ("If the device receives a retry or notify in response to a subscription start request, the subscription is retried or terminated as necessary");
{
PropertySubscriptionHeader header;
header.command = PropertySubscriptionCommand::start;
header.resource = "X-CustomProp";
const auto a = manager.beginSubscription (inquiryMUID, header);
expect (manager.getOngoingSubscriptions() == std::vector { a });
expect (delegate.getOngoingRequests().size() == 1);
expect (delegate.getAndClearSendCount() == 1);
expect (delegate.getAndClearAbortCount() == 0);
delegate.sendResult (delegate.getOngoingRequests().back(), PropertyExchangeResult { PropertyExchangeResult::Error::tooManyTransactions });
// The subscription is still active from the perspective of the manager, but the
// first request is over and should be retried
expect (manager.getOngoingSubscriptions() == std::vector { a });
expect (! delegate.getAndClearSubChanged());
expect (delegate.getOngoingRequests().empty());
expect (delegate.getAndClearSendCount() == 0);
expect (delegate.getAndClearAbortCount() == 0);
expect (manager.sendPendingMessages());
expect (manager.getOngoingSubscriptions() == std::vector { a });
expect (delegate.getOngoingRequests().size() == 1);
expect (delegate.getAndClearSendCount() == 1);
expect (delegate.getAndClearAbortCount() == 0);
delegate.sendResult (delegate.getOngoingRequests().back(), PropertyExchangeResult { PropertyExchangeResult::Error::notify });
// The request was terminated by the responder, so the delegate should get a sub-changed message
expect (delegate.getAndClearSubChanged());
expect (manager.getOngoingSubscriptions().empty());
expect (delegate.getOngoingRequests().empty());
expect (delegate.getAndClearSendCount() == 0);
expect (delegate.getAndClearAbortCount() == 0);
expect (manager.sendPendingMessages());
expect (! delegate.getAndClearSubChanged());
expect (manager.getOngoingSubscriptions().empty());
expect (delegate.getOngoingRequests().empty());
expect (delegate.getAndClearSendCount() == 0);
expect (delegate.getAndClearAbortCount() == 0);
}
beginTest ("If the device receives a retry or notify in response to a subscription end request, the subscription is retried as necessary");
{
PropertySubscriptionHeader header;
header.command = PropertySubscriptionCommand::start;
header.resource = "X-CustomProp";
const auto a = manager.beginSubscription (inquiryMUID, header);
expect (manager.getOngoingSubscriptions() == std::vector { a });
expect (manager.getResourceForKey (a) == header.resource);
expect (! manager.getSubscribeIdForKey (a).has_value());
const auto subscriptionResponseHeader = []
{
auto ptr = std::make_unique<DynamicObject>();
ptr->setProperty ("status", 200);
ptr->setProperty ("subscribeId", "newId");
return ptr.release();
}();
// Accept the subscription
delegate.sendResult (delegate.getOngoingRequests().back(), PropertyExchangeResult { subscriptionResponseHeader, {} });
// The subscription is still active from the perspective of the device, but the
// request is over and should be retried
expect (manager.getOngoingSubscriptions() == std::vector { a });
expect (delegate.getAndClearSubChanged());
// Now that the subscription was accepted, the subscription id should be non-empty
expect (manager.getResourceForKey (a) == header.resource);
expect (manager.getSubscribeIdForKey (a) == "newId");
expect (delegate.getOngoingRequests().empty());
expect (delegate.getAndClearSendCount() == 1);
expect (delegate.getAndClearAbortCount() == 0);
manager.endSubscription (a);
expect (manager.getOngoingSubscriptions().empty());
expect (! delegate.getAndClearSubChanged());
expect (delegate.getOngoingRequests().size() == 1);
expect (delegate.getAndClearSendCount() == 1);
expect (delegate.getAndClearAbortCount() == 0);
// The responder is busy, can't process the subscription end
delegate.sendResult (delegate.getOngoingRequests().back(), PropertyExchangeResult { PropertyExchangeResult::Error::tooManyTransactions });
expect (manager.getOngoingSubscriptions().empty());
expect (delegate.getOngoingRequests().empty());
expect (delegate.getAndClearSendCount() == 0);
expect (delegate.getAndClearAbortCount() == 0);
expect (manager.sendPendingMessages());
expect (manager.getOngoingSubscriptions().empty());
expect (delegate.getOngoingRequests().size() == 1);
expect (delegate.getAndClearSendCount() == 1);
expect (delegate.getAndClearAbortCount() == 0);
// The responder told us to immediately terminate our request to end the subscription!
// It's unclear how this should behave, so we'll just ignore the failure and assume
// the subscription is really over.
delegate.sendResult (delegate.getOngoingRequests().back(), PropertyExchangeResult { PropertyExchangeResult::Error::notify });
expect (manager.getOngoingSubscriptions().empty());
expect (delegate.getOngoingRequests().empty());
expect (delegate.getAndClearSendCount() == 0);
expect (delegate.getAndClearAbortCount() == 0);
expect (manager.sendPendingMessages());
expect (delegate.getAndClearSendCount() == 0);
expect (delegate.getAndClearAbortCount() == 0);
expect (! delegate.getAndClearSubChanged());
}
}
};
static SubscriptionTests subscriptionTests;
#endif
} // namespace juce::midi_ci

View file

@ -0,0 +1,167 @@
/*
==============================================================================
This file is part of the JUCE library.
Copyright (c) 2022 - Raw Material Software Limited
JUCE is an open source library subject to commercial or open-source
licensing.
By using JUCE, you agree to the terms of both the JUCE 7 End-User License
Agreement and JUCE Privacy Policy.
End User License Agreement: www.juce.com/juce-7-licence
Privacy Policy: www.juce.com/juce-privacy-policy
Or: You may also use this code under the terms of the GPL v3 (see
www.gnu.org/licenses).
JUCE IS PROVIDED "AS IS" WITHOUT ANY WARRANTY, AND ALL WARRANTIES, WHETHER
EXPRESSED OR IMPLIED, INCLUDING MERCHANTABILITY AND FITNESS FOR PURPOSE, ARE
DISCLAIMED.
==============================================================================
*/
namespace juce::midi_ci
{
/**
A key used to uniquely identify ongoing property subscriptions initiated by a ci::Device.
@tags{Audio}
*/
class SubscriptionKey
{
auto tie() const { return std::tuple (m, v); }
public:
/** Constructor */
SubscriptionKey() = default;
/** Constructor */
SubscriptionKey (MUID muid, Token64 key) : m (muid), v (key) {}
/** Returns the muid of the device to which we are subscribed. */
MUID getMuid() const { return m; }
/** Returns an identifier unique to this subscription. */
Token64 getKey() const { return v; }
/** Equality operator. */
bool operator== (const SubscriptionKey& other) const { return tie() == other.tie(); }
/** Inequality operator. */
bool operator!= (const SubscriptionKey& other) const { return tie() != other.tie(); }
/** Less-than operator. */
bool operator< (const SubscriptionKey& other) const { return tie() < other.tie(); }
private:
MUID m = MUID::getBroadcast();
Token64 v{};
};
/**
Functions used by a SubscriptionManager to negotiate subscriptions.
@tags{Audio}
*/
struct SubscriptionManagerDelegate
{
virtual ~SubscriptionManagerDelegate() = default;
/** Called when the manager wants to send an update. */
virtual std::optional<RequestKey> sendPropertySubscribe (MUID m,
const PropertySubscriptionHeader& header,
std::function<void (const PropertyExchangeResult&)> onResult) = 0;
/** Called by the manager to cancel a previous request. */
virtual void abortPropertyRequest (RequestKey) = 0;
/** Called by the manager when the remote device provides a subscribeId, or when it
terminates a subscription.
*/
virtual void propertySubscriptionChanged (SubscriptionKey, const std::optional<String>&) = 0;
};
/**
Manages subscriptions to properties on remote devices.
Occasionally, sending a subscription-begin request may fail, in which case the request will be
cached. Cached requests will be sent during a future call to sendPendingMessages().
To use this:
- pass a SubscriptionManagerDelegate (such as a ci::Device) to the constructor
- call sendPendingMessages() periodically, e.g. in a timer callback
@tags{Audio}
*/
class SubscriptionManager
{
public:
/** Constructor.
The delegate functions will be called when necessary to start and cancel property requests.
*/
explicit SubscriptionManager (SubscriptionManagerDelegate& delegate);
/** Attempts to begin a subscription using the provided details.
@returns a token that uniquely identifies this subscription. This token can be passed to
endSubscription to terminate an ongoing subscription.
*/
SubscriptionKey beginSubscription (MUID m, const PropertySubscriptionHeader& header);
/** Ends an ongoing subscription by us.
If the subscription begin request hasn't been sent yet, then this will just cancel the cached request.
If a subscription begin request has been sent, but no response has been received, this will
send a notification cancelling the initial request via SubscriptionManagerDelegate::abortPropertyRequest().
If the subscription has started successfully, then this will send a subscription end request
via SubscriptionManagerDelegate::sendPropertySubscribe().
*/
void endSubscription (SubscriptionKey);
/** Ends an ongoing subscription as requested from the remote device.
Unlike the other overload of endSubscription, this won't notify the delegate. It will only
update the internal record of active subscriptions.
Calls Delegate::propertySubscriptionChanged().
*/
void endSubscriptionFromResponder (MUID, String);
/** Ends all ongoing subscriptions as requested from a remote device.
Calls Delegate::propertySubscriptionChanged().
*/
void endSubscriptionsFromResponder (MUID);
/** Returns all of the subscriptions that have been initiated by this manager. */
std::vector<SubscriptionKey> getOngoingSubscriptions() const;
/** If the provided subscription has started successfully, this returns the subscribeId assigned
to the subscription by the remote device.
*/
std::optional<String> getSubscribeIdForKey (SubscriptionKey key) const;
/** If the provided subscription has not been cancelled, this returns the name of the
subscribed resource.
*/
std::optional<String> getResourceForKey (SubscriptionKey key) const;
/** Sends any cached messages that need retrying.
@returns true if there are no more messages to send, or false otherwise
*/
bool sendPendingMessages();
private:
class Impl;
std::shared_ptr<Impl> pimpl;
};
} // namespace juce::midi_ci

View file

@ -51,28 +51,5 @@ struct PropertyHostUtils
std::for_each (chunker.begin(), chunker.end(), [&] (auto) { output.send (group); }); std::for_each (chunker.begin(), chunker.end(), [&] (auto) { output.send (group); });
} }
static auto getTerminator (BufferOutput& output, FunctionBlock fb, MUID them)
{
const auto us = output.getMuid();
return [&output, fb, us, them] (std::byte id)
{
const Message::Header notifyHeader
{
ChannelInGroup::wholeBlock,
detail::MessageMeta::Meta<Message::PropertyNotify>::subID2,
detail::MessageMeta::implementationVersion,
us,
them,
};
const auto jsonHeader = Encodings::jsonTo7BitText (JSONUtils::makeObjectWithKeyFirst ({ { "status", 144 } }, "status"));
detail::MessageTypeUtils::send (output,
fb.firstGroup,
notifyHeader,
Message::PropertyNotify { { id, jsonHeader, 1, 1, {} } });
};
}
}; };
} // namespace juce::midi_ci::detail } // namespace juce::midi_ci::detail

View file

@ -53,3 +53,4 @@
#include <juce_midi_ci/ci/juce_CIPropertyExchangeCache.cpp> #include <juce_midi_ci/ci/juce_CIPropertyExchangeCache.cpp>
#include <juce_midi_ci/ci/juce_CIPropertyHost.cpp> #include <juce_midi_ci/ci/juce_CIPropertyHost.cpp>
#include <juce_midi_ci/ci/juce_CIResponderOutput.cpp> #include <juce_midi_ci/ci/juce_CIResponderOutput.cpp>
#include <juce_midi_ci/ci/juce_CISubscriptionManager.cpp>

View file

@ -76,6 +76,7 @@
#include <juce_midi_ci/ci/juce_CIDeviceFeatures.h> #include <juce_midi_ci/ci/juce_CIDeviceFeatures.h>
#include <juce_midi_ci/ci/juce_CIDeviceMessageHandler.h> #include <juce_midi_ci/ci/juce_CIDeviceMessageHandler.h>
#include <juce_midi_ci/ci/juce_CIDeviceOptions.h> #include <juce_midi_ci/ci/juce_CIDeviceOptions.h>
#include <juce_midi_ci/ci/juce_CISubscriptionManager.h>
#include <juce_midi_ci/ci/juce_CIDeviceListener.h> #include <juce_midi_ci/ci/juce_CIDeviceListener.h>
#include <juce_midi_ci/ci/juce_CIDevice.h> #include <juce_midi_ci/ci/juce_CIDevice.h>