diff --git a/BREAKING_CHANGES.md b/BREAKING_CHANGES.md index c583808231..7a3331f157 100644 --- a/BREAKING_CHANGES.md +++ b/BREAKING_CHANGES.md @@ -2,6 +2,77 @@ # develop +## Change + +The signatures of some member functions of ci::Device have been changed: +- sendPropertyGetInquiry +- sendPropertySetInquiry + +The signature of ci::PropertyHost::sendSubscriptionUpdate has also changed. + +The following member functions of ci::Device have been replaced with new +alternatives: +- sendPropertySubscriptionStart +- sendPropertySubscriptionEnd +- getOngoingSubscriptionsForMuid +- countOngoingPropertyTransactions + +The enum field PropertyExchangeResult::Error::invalidPayload has been removed. + +**Possible Issues** + +Code that uses any of these symbols will fail to compile until it is updated. + +**Workaround** + +Device::sendPropertyGetInquiry, Device::sendPropertySetInquiry, and +PropertyHost::sendSubscriptionUpdate all now return an optional RequestKey +instead of an ErasedScopeGuard. Requests started via any of these functions may +be cancelled by the request's RequestKey to the new function +Device::abortPropertyRequest. The returned RequestKey may be null, indicating a +failure to send the request. + +countOngoingPropertyTransactions has been replaced by getOngoingRequests, +which returns the RequestKeys of all ongoing requests. To find the number of +transactions, use the size of the returned container. + +sendPropertySubscriptionStart has been replaced by beginSubscription. +sendPropertySubscriptionEnd has been replaced by endSubscription. +The new functions no longer take callbacks. Instead, to receive notifications +when a subscription starts or ends, override +DeviceListener::propertySubscriptionChanged. + +getOngoingSubscriptionsForMuid is replaced by multiple functions. +getOngoingSubscriptions returns SubscriptionKeys for all of the subscriptions +currently in progress, which may be filtered based on SubscriptionKey::getMuid. +The subscribeId assigned to a particular SubscriptionKey can be found using +getSubscribeIdForKey, and the subscribed resource can be found using +getResourceForKey. + +It's possible that the initial call to beginSubscription may not be able to +start the subscription, e.g. if the remote device is busy and request a retry. +In this case, the request is cached. If you use subscriptions, then you +should call sendPendingMessages periodically to flush any messages that may +need to be retried. + +There is no need to check for the invalidPayload error when processing +property exchange results. + +**Rationale** + +Keeping track of subscriptions is quite involved, as the initial request to +begin a subscription might not be accepted straight away. The device may not +initially have enough vacant slots to send the request, or responder might +request a retry if it is too busy to process the request. The ci::Device now +caches requests when necessary, allowing them to be retried in the future. +This functionality couldn't be implemented without modifying the old interface. + +Replacing ErasedScopeGuards with Keys makes lifetime handling a bit easier. +It's no longer necessary to store or manually release scope guards for requests +that don't need to be cancelled. The new Key types are also a bit more +typesafe, and allow for simple queries of the transaction that created the key. + + ## Change The ListenerList::Iterator class has been removed. @@ -45,6 +116,7 @@ fixed white colour was inappropriate for most user interfaces. ## Change +>>>>>>> c74b2b1058 (CIDevice: Improve robustness of subscription API) ProfileHost::enableProfile and ProfileHost::disableProfile have been combined into a single function, ProfileHost::setProfileEnablement. diff --git a/examples/Audio/CapabilityInquiryDemo.h b/examples/Audio/CapabilityInquiryDemo.h index 6fd1bc0742..1ab8eaf5d8 100644 --- a/examples/Audio/CapabilityInquiryDemo.h +++ b/examples/Audio/CapabilityInquiryDemo.h @@ -740,14 +740,7 @@ struct Model DeviceInfo info; Profiles profiles; Properties properties; - std::map subscribeIdForResource; - - std::optional getSubscriptionId (const String& resource) const - { - const auto iter = subscribeIdForResource.find (resource); - return iter != subscribeIdForResource.end() ? std::optional (iter->second) - : std::nullopt; - } + std::map subscriptions; template static auto serialise (Archive& archive, This& t) @@ -761,7 +754,7 @@ struct Model auto tie() const { - return std::tie (muid, info, profiles, properties, subscribeIdForResource); + return std::tie (muid, info, profiles, properties, subscriptions); } JUCE_TUPLE_RELATIONAL_OPS (Device) }; @@ -2472,7 +2465,7 @@ public: explicit PropertyValuePanel (State s) : PropertyValuePanel (s, {}) {} - PropertyValuePanel (State s, State> subState) + PropertyValuePanel (State s, State> subState) : state (s), subscriptions (subState) { addAndMakeVisible (value); @@ -2590,13 +2583,13 @@ private: if (const auto* selectedProp = state->properties.getSelected()) { - const auto text = sub.count (selectedProp->name) != 0 ? "Unsubscribe" : "Subscribe"; + const auto text = std::any_of (sub.begin(), sub.end(), [&] (const auto& p) { return p.second.resource == selectedProp->name; }) ? "Unsubscribe" : "Subscribe"; subscribe.setButtonText (text); } } State state; - State> subscriptions; + State> subscriptions; MonospaceEditor value; TextField format; @@ -2970,9 +2963,9 @@ private: { const auto selected = (size_t) state->transient.devices.selection; return state[&Model::App::transient] - [&Model::Transient::devices] - [&Model::ListWithSelection::items] - [selected]; + [&Model::Transient::devices] + [&Model::ListWithSelection::items] + [selected]; } State state; @@ -2980,7 +2973,7 @@ private: PropertyValuePanel value { getDeviceState()[&Model::Device::properties], - getDeviceState()[&Model::Device::subscribeIdForResource] + getDeviceState()[&Model::Device::subscriptions] }; TabbedComponent tabs { TabbedButtonBar::Orientation::TabsAtTop }; }; @@ -3613,7 +3606,8 @@ private: }); }; -class CapabilityInquiryDemo : public Component +class CapabilityInquiryDemo : public Component, + private Timer { public: CapabilityInquiryDemo() @@ -3667,10 +3661,14 @@ public: { setPropertyPartial (bytes); }; + + startTimer (2'000); } ~CapabilityInquiryDemo() override { + stopTimer(); + // In a production app, it'd be a bit risky to write to a file from a destructor as it's // bad karma to throw an exception inside a destructor! if (auto* userSettings = applicationProperties.getUserSettings()) @@ -3691,6 +3689,12 @@ public: } private: + void timerCallback() override + { + if (device.has_value()) + device->sendPendingMessages(); + } + std::optional> getPropertyRequestInfo() const { auto* selectedDevice = appState->transient.devices.getSelected(); @@ -3860,58 +3864,37 @@ private: const auto& [muid, propName] = *details; - const auto subId = [&, propNameCopy = propName] + // Find the subscription for this resource, if any + const auto existingToken = [&, propNameCopy = propName]() -> std::optional { - const auto ongoing = device->getOngoingSubscriptionsForMuid (selectedDevice->muid); - const auto iter = std::find_if (ongoing.begin(), ongoing.end(), [&] (const auto& sub) - { - return sub.resource == propNameCopy; - }); + const auto ongoing = device->getOngoingSubscriptions(); - return iter != ongoing.end() ? iter->subscribeId : String(); + for (const auto& o : ongoing) + if (propNameCopy == device->getResourceForKey (o)) + return o; + + return std::nullopt; }(); - ci::PropertySubscriptionHeader header; - header.resource = propName; - header.command = subId.isEmpty() ? ci::PropertySubscriptionCommand::start : ci::PropertySubscriptionCommand::end; - header.subscribeId = subId; - - auto callback = [this, - target = muid, - propertyName = propName, - existingSubscription = subId.isNotEmpty()] (const ci::PropertyExchangeResult& response) + // If we're already subscribed, end that subscription. + // Otherwise, begin a new subscription to this resource. + const auto changedToken = [this, propName = propName, muid = muid, existingToken = existingToken]() -> std::optional { - if (response.getError().has_value()) - return; - - auto updated = *appState; - - auto& knownDevices = updated.transient.devices.items; - const auto deviceIter = std::find_if (knownDevices.begin(), - knownDevices.end(), - [target] (const auto& d) { return d.muid == target; }); - - if (deviceIter == knownDevices.end()) + // We're not subscribed, so begin a new subscription + if (! existingToken.has_value()) { - // The device has gone away? - jassertfalse; - return; + ci::PropertySubscriptionHeader header; + header.resource = propName; + header.command = ci::PropertySubscriptionCommand::start; + return device->beginSubscription (muid, header); } - const auto parsedHeader = response.getHeaderAsSubscriptionHeader(); + device->endSubscription (*existingToken); + return existingToken; + }(); - if (parsedHeader.subscribeId.isNotEmpty() && ! existingSubscription) - deviceIter->subscribeIdForResource.emplace (propertyName, parsedHeader.subscribeId); - else - deviceIter->subscribeIdForResource.erase (propertyName); - - appState = std::move (updated); - }; - - if (subId.isEmpty()) - device->sendPropertySubscriptionStart (muid, header, callback); - else - device->sendPropertySubscriptionEnd (muid, subId, callback); + if (changedToken.has_value()) + deviceListener.propertySubscriptionChanged (*changedToken); } template @@ -3973,11 +3956,8 @@ private: header.resource = propertyName; header.mutualEncoding = *encodingToUse; - const auto it = ongoingGetInquiries.insert (ongoingGetInquiries.end(), ErasedScopeGuard{}); - *it = device->sendPropertyGetInquiry (target, header, [this, it, target, propertyName] (const auto& response) + device->sendPropertyGetInquiry (target, header, [this, target, propertyName] (const auto& response) { - ongoingGetInquiries.erase (it); - if (response.getError().has_value()) return; @@ -4113,7 +4093,7 @@ private: header.command = ci::PropertySubscriptionCommand::notify; header.subscribeId = subId; header.resource = propertyName; - host->sendSubscriptionUpdate (receiver, header, {}, {}).release(); + host->sendSubscriptionUpdate (receiver, header, {}, {}); } } } @@ -4348,9 +4328,13 @@ private: { const auto resource = [&] { - for (const auto& [subId, res] : demo.device->getOngoingSubscriptionsForMuid (muid)) - if (subId == subscription.header.subscribeId) - return res; + const auto ongoing = demo.device->getOngoingSubscriptions(); + + for (const auto& o : ongoing) + { + if (subscription.header.subscribeId == demo.device->getSubscribeIdForKey (o)) + return demo.device->getResourceForKey (o).value_or (String{}); + } return String{}; }(); @@ -4425,10 +4409,7 @@ private: } case ci::PropertySubscriptionCommand::end: - { - matchingDevice->subscribeIdForResource.erase (resource); break; - } case ci::PropertySubscriptionCommand::start: jassertfalse; @@ -4438,6 +4419,35 @@ private: devicesState = std::move (copiedDevices); } + void propertySubscriptionChanged (ci::SubscriptionKey key, const std::optional&) override + { + propertySubscriptionChanged (key); + } + + void propertySubscriptionChanged (ci::SubscriptionKey key) + { + auto updated = *demo.appState; + + auto& knownDevices = updated.transient.devices.items; + const auto deviceIter = std::find_if (knownDevices.begin(), + knownDevices.end(), + [target = key.getMuid()] (const auto& d) { return d.muid == target; }); + + if (deviceIter == knownDevices.end()) + { + // The device has gone away? + jassertfalse; + return; + } + + if (const auto resource = demo.device->getResourceForKey (key)) + deviceIter->subscriptions.emplace (key, ci::Subscription { demo.device->getSubscribeIdForKey (key).value_or (String{}), *resource }); + else + deviceIter->subscriptions.erase (key); + + demo.appState = std::move (updated); + } + private: void updateProfilesForMuid (ci::MUID muid) { @@ -4843,7 +4853,6 @@ private: std::unique_ptr input; std::unique_ptr output; std::optional device; - std::list ongoingGetInquiries; FileChooser fileChooser { "Pick State JSON File", {}, "*.json", true, false, this }; @@ -4875,7 +4884,6 @@ private: .withPropertyDelegate (&propertyDelegate) .withProfileDelegate (&profileDelegate); - ongoingGetInquiries.clear(); device.emplace (options); device->addListener (deviceListener); diff --git a/modules/juce_core/maths/juce_MathsFunctions.h b/modules/juce_core/maths/juce_MathsFunctions.h index 8448066fca..ed4af88406 100644 --- a/modules/juce_core/maths/juce_MathsFunctions.h +++ b/modules/juce_core/maths/juce_MathsFunctions.h @@ -799,4 +799,13 @@ namespace TypeHelpers [[deprecated ("Use std::abs() instead.")]] inline int64 abs64 (int64 n) noexcept { return std::abs (n); } #endif +/** Converts an enum to its underlying integral type. + Similar to std::to_underlying, which is only available in C++23 and above. +*/ +template +constexpr auto toUnderlyingType (T t) -> std::enable_if_t, std::underlying_type_t> +{ + return static_cast> (t); +} + } // namespace juce diff --git a/modules/juce_midi_ci/ci/juce_CIDevice.cpp b/modules/juce_midi_ci/ci/juce_CIDevice.cpp index 0bd7307781..4f88338a0a 100644 --- a/modules/juce_midi_ci/ci/juce_CIDevice.cpp +++ b/modules/juce_midi_ci/ci/juce_CIDevice.cpp @@ -26,7 +26,7 @@ namespace juce::midi_ci { -class Device::Impl +class Device::Impl : private SubscriptionManagerDelegate { template static auto getProfileHostImpl (This& t) { return t.profileHost.has_value() ? &*t.profileHost : nullptr; } @@ -48,7 +48,7 @@ public: outgoing.reserve (options.getMaxSysExSize()); } - ~Impl() + ~Impl() override { if (concreteBufferOutput.hasSentMuid()) { @@ -183,162 +183,146 @@ public: Message::PropertyExchangeCapabilities { std::byte { propertyDelegate.getNumSimultaneousRequestsSupported() }, {}, {} }); } - ErasedScopeGuard sendPropertyGetInquiry (MUID m, - const PropertyRequestHeader& propertyHeader, - std::function callback) + std::optional sendPropertyGetInquiry (MUID m, + const PropertyRequestHeader& header, + std::function onResult) { const auto iter = discovered.find (m); if (iter == discovered.end() || ! Features { iter->second.discovery.capabilities }.isPropertyExchangeSupported()) return {}; - auto primed = iter->second.initiatorPropertyCaches.primeCache (propertyDelegate.getNumSimultaneousRequestsSupported(), - std::move (callback), - detail::PropertyHostUtils::getTerminator (concreteBufferOutput, options.getFunctionBlock(), m)); + const auto primed = iter->second.initiatorPropertyCaches.primeCache (propertyDelegate.getNumSimultaneousRequestsSupported(), + std::move (onResult)); - if (! primed.isValid()) + if (! primed.has_value()) return {}; + const auto id = iter->second.initiatorPropertyCaches.getRequestIdForToken (*primed); + jassert (id.has_value()); + detail::MessageTypeUtils::send (concreteBufferOutput, options.getFunctionBlock().firstGroup, m, ChannelInGroup::wholeBlock, - Message::PropertyGetData { { primed.id, Encodings::jsonTo7BitText (propertyHeader.toVarCondensed()) } }); + Message::PropertyGetData { { id->asByte(), Encodings::jsonTo7BitText (header.toVarCondensed()) } }); - return std::move (primed.token); + return RequestKey { m, *primed }; } - void sendPropertySetInquiry (MUID m, - const PropertyRequestHeader& propertyHeader, - Span propertyBody, - std::function callback) + std::optional sendPropertySetInquiry (MUID m, + const PropertyRequestHeader& header, + Span body, + std::function onResult) { + const auto encoded = Encodings::tryEncode (body, header.mutualEncoding); + + if (! encoded.has_value()) + return {}; + const auto iter = discovered.find (m); if (iter == discovered.end() || ! Features { iter->second.discovery.capabilities }.isPropertyExchangeSupported()) - return; + return {}; - const auto encoded = Encodings::tryEncode (propertyBody, propertyHeader.mutualEncoding); + const auto primed = iter->second.initiatorPropertyCaches.primeCache (propertyDelegate.getNumSimultaneousRequestsSupported(), + std::move (onResult)); - if (! encoded.has_value()) - { - NullCheckedInvocation::invoke (callback, PropertyExchangeResult { PropertyExchangeResult::Error::invalidPayload }); - return; - } + if (! primed.has_value()) + return {}; - auto primed = iter->second.initiatorPropertyCaches.primeCache (propertyDelegate.getNumSimultaneousRequestsSupported(), - std::move (callback), - detail::PropertyHostUtils::getTerminator (concreteBufferOutput, options.getFunctionBlock(), m)); - - if (! primed.isValid()) - return; + const auto id = iter->second.initiatorPropertyCaches.getRequestIdForToken (*primed); + jassert (id.has_value()); detail::PropertyHostUtils::send (concreteBufferOutput, options.getFunctionBlock().firstGroup, detail::MessageMeta::Meta::subID2, m, - primed.id, - Encodings::jsonTo7BitText (propertyHeader.toVarCondensed()), + id->asByte(), + Encodings::jsonTo7BitText (header.toVarCondensed()), *encoded, cacheProvider.getMaxSysexSizeForMuid (m)); + + return RequestKey { m, *primed }; } - void sendPropertySubscriptionStart (MUID m, - const PropertySubscriptionHeader& header, - std::function cb) + void abortPropertyRequest (RequestKey k) override { - const auto resource = header.resource; - auto wrappedCallback = [this, m, resource, callback = std::move (cb)] (const PropertyExchangeResult& result) + const auto iter = discovered.find (k.getMuid()); + + if (iter == discovered.end()) + return; + + const auto id = iter->second.initiatorPropertyCaches.getRequestIdForToken (k.getKey()); + + if (! id.has_value() || ! iter->second.initiatorPropertyCaches.terminate (k.getKey())) + return; + + const Message::Header notifyHeader { - if (! result.getError().has_value()) - { - const auto foundMuid = discovered.find (m); - - if (foundMuid != discovered.end()) - { - const auto parsed = result.getHeaderAsSubscriptionHeader(); - - // The responder should have given us a subscription ID so that we can reference the original subscription - // whenever we get updates in the future, or if we want to end the subscription. - jassert (parsed.subscribeId.isNotEmpty()); - const auto emplaceResult = foundMuid->second.subscriptions.insert ({ parsed.subscribeId, resource }); - - // If this fails, the device gave us a subscribeId that it was already using for another subscription. - jassertquiet (emplaceResult.second); - } - } - - NullCheckedInvocation::invoke (callback, result); + ChannelInGroup::wholeBlock, + detail::MessageMeta::Meta::subID2, + detail::MessageMeta::implementationVersion, + muid, + k.getMuid(), }; - inquirePropertySubscribe (m, header, std::move (wrappedCallback)); + const auto jsonHeader = Encodings::jsonTo7BitText (JSONUtils::makeObjectWithKeyFirst ({ { "status", 144 } }, "status")); + detail::MessageTypeUtils::send (concreteBufferOutput, + options.getFunctionBlock().firstGroup, + notifyHeader, + Message::PropertyNotify { { id->asByte(), jsonHeader, 1, 1, {} } }); } - void sendPropertySubscriptionEnd (MUID m, - const String& subscribeId, - std::function cb) + std::optional getIdForRequestKey (RequestKey key) const { - const auto iter = discovered.find (m); - - if (iter == discovered.end() || ! Features { iter->second.discovery.capabilities }.isPropertyExchangeSupported()) - { - // Trying to send a subscription message to a device that doesn't exist (maybe it got removed), or - // that doesn't support property exchange. - jassertfalse; - return; - } - - if (iter->second.subscriptions.count ({ subscribeId, {} }) == 0) - { - // Trying to end a subscription that doesn't exist - perhaps it already ended. - jassertfalse; - return; - } - - auto wrappedCallback = [this, m, subscribeId, callback = std::move (cb)] (const PropertyExchangeResult& result) - { - if (! result.getError().has_value()) - { - const auto foundMuid = discovered.find (m); - - if (foundMuid != discovered.end()) - foundMuid->second.subscriptions.erase ({ subscribeId, {} }); - } - - NullCheckedInvocation::invoke (callback, result); - }; - - PropertySubscriptionHeader header; - header.subscribeId = subscribeId; - header.command = PropertySubscriptionCommand::end; - inquirePropertySubscribe (m, header, std::move (wrappedCallback)); - } - - std::vector getOngoingSubscriptionsForMuid (MUID m) const - { - const auto iter = discovered.find (m); + const auto iter = discovered.find (key.getMuid()); if (iter == discovered.end()) return {}; - std::vector result; - result.reserve (iter->second.subscriptions.size()); + return iter->second.initiatorPropertyCaches.getRequestIdForToken (key.getKey()); + } - for (const auto& [subscribeId, resource] : iter->second.subscriptions) - result.push_back ({ subscribeId, resource }); + std::vector getOngoingRequests() const + { + std::vector result; + + for (auto& i : discovered) + for (const auto& token : i.second.initiatorPropertyCaches.getOngoingTransactions()) + result.emplace_back (i.first, token); return result; } - int countOngoingPropertyTransactions() const + SubscriptionKey beginSubscription (MUID m, const PropertySubscriptionHeader& header) { - return std::accumulate (discovered.begin(), - discovered.end(), - 0, - [] (auto acc, const auto& pair) - { - return acc + pair.second.initiatorPropertyCaches.countOngoingTransactions(); - }); + return subscriptionManager.beginSubscription (m, header); + } + + void endSubscription (SubscriptionKey key) + { + subscriptionManager.endSubscription (key); + } + + std::vector getOngoingSubscriptions() const + { + return subscriptionManager.getOngoingSubscriptions(); + } + + std::optional getSubscribeIdForKey (SubscriptionKey key) const + { + return subscriptionManager.getSubscribeIdForKey (key); + } + + std::optional getResourceForKey (SubscriptionKey key) const + { + return subscriptionManager.getResourceForKey (key); + } + + bool sendPendingMessages() + { + return subscriptionManager.sendPendingMessages(); } void processMessage (ump::BytesOnGroup msg) @@ -587,6 +571,7 @@ private: if (iter != device->discovered.end()) { + device->subscriptionManager.endSubscriptionsFromResponder (targetMuid); device->discovered.erase (iter); device->listeners.call ([&] (auto& l) { l.deviceRemoved (targetMuid); }); } @@ -784,9 +769,7 @@ private: return false; }; - const auto transaction = device->ongoingTransactions.emplace (device->ongoingTransactions.end()); - - const auto onResourceListReceived = [this, iter, source, hasResource, transaction] (const PropertyExchangeResult& result) + const auto onResourceListReceived = [this, iter, source, hasResource] (const PropertyExchangeResult& result) { const auto validateResponse = [] (const PropertyExchangeResult& r) { @@ -796,9 +779,8 @@ private: && parsed.status == 200; }; - const auto allDone = [this, source, transaction] + const auto allDone = [this, source] { - device->ongoingTransactions.erase (transaction); device->listeners.call ([source] (auto& l) { l.propertyExchangeCapabilitiesReceived (source); }); }; @@ -821,13 +803,13 @@ private: return; }; - const auto getChannelList = [this, bodyAsObj, source, allDone, hasResource, onChannelListReceived, transaction] + const auto getChannelList = [this, bodyAsObj, source, allDone, hasResource, onChannelListReceived] { if (hasResource (bodyAsObj, "ChannelList")) { PropertyRequestHeader header; header.resource = "ChannelList"; - *transaction = device->sendPropertyGetInquiry (source, header, onChannelListReceived); + device->sendPropertyGetInquiry (source, header, onChannelListReceived); return; } @@ -839,15 +821,15 @@ private: { PropertyRequestHeader header; header.resource = "DeviceInfo"; - *transaction = device->sendPropertyGetInquiry (source, - header, - [iter, getChannelList, validateResponse] (const PropertyExchangeResult& r) - { - if (validateResponse (r)) - iter->second.deviceInfo = Encodings::jsonFrom7BitText (r.getBody()); + device->sendPropertyGetInquiry (source, + header, + [iter, getChannelList, validateResponse] (const PropertyExchangeResult& r) + { + if (validateResponse (r)) + iter->second.deviceInfo = Encodings::jsonFrom7BitText (r.getBody()); - getChannelList(); - }); + getChannelList(); + }); return; } @@ -856,7 +838,7 @@ private: PropertyRequestHeader header; header.resource = "ResourceList"; - *transaction = device->sendPropertyGetInquiry (source, header, onResourceListReceived); + device->sendPropertyGetInquiry (source, header, onResourceListReceived); return true; } @@ -869,7 +851,12 @@ private: if (iter == device->discovered.end()) return false; - iter->second.initiatorPropertyCaches.addChunk (response.requestID, response); + const auto request = RequestID::create (response.requestID); + + if (! request.has_value()) + return false; + + iter->second.initiatorPropertyCaches.addChunk (*request, response); return true; } @@ -916,12 +903,7 @@ private: data.body = result.getBody(); if (data.header.command == PropertySubscriptionCommand::end) - { - const auto foundMuid = device->discovered.find (source); - - if (foundMuid != device->discovered.end()) - foundMuid->second.subscriptions.erase ({ data.header.subscribeId, {} }); - } + device->subscriptionManager.endSubscriptionFromResponder (source, subscribeId); if (data.header.command != PropertySubscriptionCommand::start) device->listeners.call ([source, &data] (auto& l) { l.propertySubscriptionDataReceived (source, data); }); @@ -936,13 +918,18 @@ private: Message::PropertySubscribeResponse { { request, headerBytes, 1, 1, {} } }); }; + const auto requestID = RequestID::create (subscription.requestID); + + if (! requestID.has_value()) + return false; + // Subscription events may be sent at any time by the responder, so there may not be // an existing transaction ID for new subscription messages. iter->second.responderPropertyCaches.primeCache (device->propertyDelegate.getNumSimultaneousRequestsSupported(), callback, - subscription.requestID); + *requestID); - iter->second.responderPropertyCaches.addChunk (subscription.requestID, subscription); + iter->second.responderPropertyCaches.addChunk (*requestID, subscription); return true; } @@ -961,13 +948,17 @@ private: if (iter == device->discovered.end()) return false; - iter->second.initiatorPropertyCaches.notify (notify.requestID, notify.header); - iter->second.responderPropertyCaches.notify (notify.requestID, notify.header); + const auto requestID = RequestID::create (notify.requestID); + + if (! requestID.has_value()) + return false; + + iter->second.initiatorPropertyCaches.notify (*requestID, notify.header); + iter->second.responderPropertyCaches.notify (*requestID, notify.header); return true; } - Impl* device = nullptr; ResponderOutput* output = nullptr; bool* handled = nullptr; @@ -986,7 +977,6 @@ private: InitiatorPropertyExchangeCache initiatorPropertyCaches; ResponderPropertyExchangeCache responderPropertyCaches; var resourceList, deviceInfo, channelList; - std::set subscriptions; ///< subscribeIds of subscriptions that we initiated }; class ConcreteBufferOutput : public BufferOutput @@ -1131,6 +1121,41 @@ private: Impl& device; }; + std::optional sendPropertySubscribe (MUID m, + const PropertySubscriptionHeader& header, + std::function onResult) override + { + const auto iter = discovered.find (m); + + if (iter == discovered.end()) + return {}; + + const auto primed = iter->second.initiatorPropertyCaches.primeCache (propertyDelegate.getNumSimultaneousRequestsSupported(), + std::move (onResult)); + + if (! primed.has_value()) + return {}; + + const auto id = iter->second.initiatorPropertyCaches.getRequestIdForToken (*primed); + jassert (id.has_value()); + + detail::PropertyHostUtils::send (concreteBufferOutput, + options.getFunctionBlock().firstGroup, + detail::MessageMeta::Meta::subID2, + m, + id->asByte(), + Encodings::jsonTo7BitText (header.toVarCondensed()), + {}, + cacheProvider.getMaxSysexSizeForMuid (m)); + + return RequestKey (m, *primed); + } + + void propertySubscriptionChanged (SubscriptionKey key, const std::optional& subscribeId) override + { + listeners.call ([&] (auto& l) { l.propertySubscriptionChanged (key, subscribeId); }); + } + static MUID getReallyRandomMuid() { Random random; @@ -1170,44 +1195,11 @@ private: return supportsFlag (m, &Features::isPropertyExchangeSupported); } - void inquirePropertySubscribe (MUID m, - const PropertySubscriptionHeader& header, - std::function cb) - { - const auto iter = discovered.find (m); - - if (iter == discovered.end() || ! Features { iter->second.discovery.capabilities }.isPropertyExchangeSupported()) - { - // Trying to send a subscription message to a device that doesn't exist (maybe it got removed), or - // that doesn't support property exchange. - jassertfalse; - return; - } - - auto primed = iter->second.initiatorPropertyCaches.primeCache (propertyDelegate.getNumSimultaneousRequestsSupported(), - std::move (cb), - detail::PropertyHostUtils::getTerminator (concreteBufferOutput, options.getFunctionBlock(), m)); - - if (! primed.isValid()) - return; - - // TODO(reuk) this isn't ideal, make subscription/request handling more robust - primed.token.release(); - - detail::PropertyHostUtils::send (concreteBufferOutput, - options.getFunctionBlock().firstGroup, - detail::MessageMeta::Meta::subID2, - m, - primed.id, - Encodings::jsonTo7BitText (header.toVarCondensed()), - {}, - cacheProvider.getMaxSysexSizeForMuid (m)); - } - DeviceOptions options; MUID muid; std::vector outgoing; std::map discovered; + SubscriptionManager subscriptionManager { *this }; ListenerList listeners; ConcreteBufferOutput concreteBufferOutput { *this }; CacheProviderImpl cacheProvider { *this }; @@ -1215,7 +1207,6 @@ private: PropertyDelegateImpl propertyDelegate { *this }; std::optional profileHost; std::optional propertyHost; - std::list ongoingTransactions; }; //============================================================================== @@ -1244,33 +1235,32 @@ void Device::sendPropertyCapabilitiesInquiry (MUID destination) { pimpl->sendPropertyCapabilitiesInquiry (destination); } -ErasedScopeGuard Device::sendPropertyGetInquiry (MUID destination, - const PropertyRequestHeader& header, - std::function onResult) + +std::optional Device::sendPropertyGetInquiry (MUID m, + const PropertyRequestHeader& header, + std::function onResult) { - return pimpl->sendPropertyGetInquiry (destination, header, std::move (onResult)); + return pimpl->sendPropertyGetInquiry (m, header, std::move (onResult)); } -void Device::sendPropertySetInquiry (MUID destination, - const PropertyRequestHeader& header, - Span body, - std::function onResult) + +std::optional Device::sendPropertySetInquiry (MUID m, + const PropertyRequestHeader& header, + Span body, + std::function onResult) { - pimpl->sendPropertySetInquiry (destination, header, body, std::move (onResult)); + return pimpl->sendPropertySetInquiry (m, header, body, std::move (onResult)); } -void Device::sendPropertySubscriptionStart (MUID destination, - const PropertySubscriptionHeader& header, - std::function onResult) -{ - pimpl->sendPropertySubscriptionStart (destination, header, std::move (onResult)); -} -void Device::sendPropertySubscriptionEnd (MUID destination, - const String& subscribeId, - std::function onResult) -{ - pimpl->sendPropertySubscriptionEnd (destination, subscribeId, std::move (onResult)); -} -std::vector Device::getOngoingSubscriptionsForMuid (MUID m) const { return pimpl->getOngoingSubscriptionsForMuid (m); } -int Device::countOngoingPropertyTransactions() const { return pimpl->countOngoingPropertyTransactions(); } +void Device::abortPropertyRequest (RequestKey key) { pimpl->abortPropertyRequest (key); } +std::optional Device::getIdForRequestKey (RequestKey key) const { return pimpl->getIdForRequestKey (key); } +std::vector Device::getOngoingRequests() const { return pimpl->getOngoingRequests(); } + +SubscriptionKey Device::beginSubscription (MUID m, const PropertySubscriptionHeader& header) { return pimpl->beginSubscription (m, header); } +void Device::endSubscription (SubscriptionKey key) { pimpl->endSubscription (key); } +std::vector Device::getOngoingSubscriptions() const { return pimpl->getOngoingSubscriptions(); } +std::optional Device::getSubscribeIdForKey (SubscriptionKey key) const { return pimpl->getSubscribeIdForKey (key); } +std::optional Device::getResourceForKey (SubscriptionKey key) const { return pimpl->getResourceForKey (key); } +bool Device::sendPendingMessages() { return pimpl->sendPendingMessages(); } + void Device::addListener (Listener& l) { pimpl->addListener (l); } void Device::removeListener (Listener& l) { pimpl->removeListener (l); } MUID Device::getMuid() const { return pimpl->getMuid(); } @@ -2312,7 +2302,19 @@ public: output.messages.clear(); - beginTest ("If a request is terminated via notify, the device responds with an error status code."); + const auto makeStatusHeader = [] (int status) + { + auto ptr = std::make_unique(); + ptr->setProperty ("status", status); + return Encodings::jsonTo7BitText (ptr.release()); + }; + + const auto successHeader = makeStatusHeader (200); + const auto retryHeader = makeStatusHeader (343); + const auto cancelHeader = makeStatusHeader (144); + + // Common rules for PE section 10: There is no reply message associated with any Notify message. + beginTest ("If a request is terminated via notify, the device does not respond"); { auto obj = std::make_unique(); obj->setProperty ("resource", "X-CustomProp"); @@ -2326,30 +2328,544 @@ public: device.getMuid() }, Message::PropertySetData { { requestID, header, 2, 1, {} } }) }); - auto notifyHeader = std::make_unique(); - notifyHeader->setProperty ("status", 144); + expect (device.getPropertyHost()->countOngoingTransactions() == 1); + device.processMessage ({ 0, getMessageBytes ({ ChannelInGroup::wholeBlock, detail::MessageMeta::Meta::subID2, detail::MessageMeta::implementationVersion, inquiryMUID, device.getMuid() }, - Message::PropertyNotify { { requestID, Encodings::jsonTo7BitText (notifyHeader.release()), 1, 1, {} } }) }); + Message::PropertyNotify { { requestID, cancelHeader, 1, 1, {} } }) }); - expect (output.messages.size() == 1); - const auto parsed = Parser::parse (output.messages.back().bytes); + expect (device.getPropertyHost()->countOngoingTransactions() == 0); - expect (parsed.has_value()); - expect (parsed->header == Message::Header { ChannelInGroup::wholeBlock, - detail::MessageMeta::Meta::subID2, + expect (output.messages.empty()); + } + + beginTest ("Sending too many property requests simultaneously fails"); + { + PropertyRequestHeader header; + header.resource = "X-CustomProp"; + const auto a = device.sendPropertyGetInquiry (inquiryMUID, header, [] (const PropertyExchangeResult&) {}); + + expect (a.has_value()); + expect (device.getOngoingRequests() == std::vector { *a }); + + // Our device only supports 1 simultaneous request, so this should fail to send + const auto b = device.sendPropertyGetInquiry (inquiryMUID, header, [] (const PropertyExchangeResult&) {}); + expect (! b.has_value()); + expect (device.getOngoingRequests() == std::vector { *a }); + + // Reply to the first request + device.processMessage ({ 0, getMessageBytes ({ ChannelInGroup::wholeBlock, + detail::MessageMeta::Meta::subID2, + detail::MessageMeta::implementationVersion, + inquiryMUID, + device.getMuid() }, + Message::PropertyGetDataResponse { { device.getIdForRequestKey (*a)->asByte(), successHeader, 1, 1, {} } }) }); + + // Now that a response to the first request has been received, there should be no + // requests in progress. + expect (device.getOngoingRequests().empty()); + } + + output.messages.clear(); + + beginTest ("Aborting a property request sends a property notify"); + { + PropertyRequestHeader header; + header.resource = "X-CustomProp"; + + bool callbackCalled = false; + const auto a = device.sendPropertyGetInquiry (inquiryMUID, header, [&] (const PropertyExchangeResult&) + { + callbackCalled = true; + }); + + expect (a.has_value()); + expect (device.getOngoingRequests() == std::vector { *a }); + expect (! callbackCalled); + + const auto requestID = device.getIdForRequestKey (*a); + device.abortPropertyRequest (*a); + + expect (device.getOngoingRequests().empty()); + expect (! callbackCalled); + + expect (output.messages.size() == 2); + + const auto inquiry = Parser::parse (output.messages.front().bytes); + expect (inquiry.has_value()); + expect (inquiry->header == Message::Header { ChannelInGroup::wholeBlock, + detail::MessageMeta::Meta::subID2, + detail::MessageMeta::implementationVersion, + device.getMuid(), + inquiryMUID }); + + const auto notify = Parser::parse (output.messages.back().bytes); + expect (notify.has_value()); + expect (notify->header == Message::Header { ChannelInGroup::wholeBlock, + detail::MessageMeta::Meta::subID2, detail::MessageMeta::implementationVersion, device.getMuid(), inquiryMUID }); - auto* body = std::get_if (&parsed->body); + auto* body = std::get_if (¬ify->body); expect (body != nullptr); - expect (body->requestID == requestID); - auto replyHeader = Encodings::jsonFrom7BitText (body->header); - expect (replyHeader.getProperty ("status", "") == var (400)); + expect (body->requestID == requestID->asByte()); + expect (body->thisChunkNum == 1); + expect (body->totalNumChunks == 1); + + const auto replyHeader = Encodings::jsonFrom7BitText (body->header); + expect (replyHeader.getProperty ("status", "") == var (144)); + } + + output.messages.clear(); + + beginTest ("Aborting a completed property request does nothing"); + { + PropertyRequestHeader header; + header.resource = "X-CustomProp"; + + const auto a = device.sendPropertyGetInquiry (inquiryMUID, header, [&] (const PropertyExchangeResult&) {}); + + expect (a.has_value()); + expect (device.getOngoingRequests() == std::vector { *a }); + + // Reply to the get data request + device.processMessage ({ 0, getMessageBytes ({ ChannelInGroup::wholeBlock, + detail::MessageMeta::Meta::subID2, + detail::MessageMeta::implementationVersion, + inquiryMUID, + device.getMuid() }, + Message::PropertyGetDataResponse { { device.getIdForRequestKey (*a)->asByte(), successHeader, 1, 1, {} } }) }); + + // After replying, there should be no ongoing requests + expect (device.getOngoingRequests().empty()); + expect (output.messages.size() == 1); + + // This request has already finished + device.abortPropertyRequest (*a); + + expect (device.getOngoingRequests().empty()); + expect (output.messages.size() == 1); + } + + output.messages.clear(); + + beginTest ("Beginning a subscription and ending it before the remote device replies causes a property notify to be sent"); + { + PropertySubscriptionHeader header; + header.command = PropertySubscriptionCommand::start; + header.resource = "X-CustomProp"; + + const auto a = device.beginSubscription (inquiryMUID, header); + + expect (device.getOngoingSubscriptions() == std::vector { a }); + // Sending a subscription request uses a request slot + expect (device.getOngoingRequests().size() == 1); + + // subscription id is empty until the responder confirms the subscription + expect (! device.getSubscribeIdForKey (a).has_value()); + expect (device.getResourceForKey (a) == header.resource); + + expect (output.messages.size() == 1); + + { + const auto parsed = Parser::parse (output.messages.back().bytes); + expect (parsed.has_value()); + expect (parsed->header == Message::Header { ChannelInGroup::wholeBlock, + detail::MessageMeta::Meta::subID2, + detail::MessageMeta::implementationVersion, + device.getMuid(), + inquiryMUID }); + auto* body = std::get_if (&parsed->body); + expect (body != nullptr); + const auto bodyHeader = Encodings::jsonFrom7BitText (body->header); + expect (bodyHeader.getProperty ("command", "") == var ("start")); + } + + output.messages.clear(); + + const auto requestID = device.getIdForRequestKey (device.getOngoingRequests().back()); + + device.endSubscription (a); + + expect (output.messages.size() == 1); + + { + const auto parsed = Parser::parse (output.messages.back().bytes); + expect (parsed.has_value()); + expect (parsed->header == Message::Header { ChannelInGroup::wholeBlock, + detail::MessageMeta::Meta::subID2, + detail::MessageMeta::implementationVersion, + device.getMuid(), + inquiryMUID }); + auto* body = std::get_if (&parsed->body); + expect (body != nullptr); + const auto bodyHeader = Encodings::jsonFrom7BitText (body->header); + expect (bodyHeader.getProperty ("status", "") == var (144)); + } + + expect (device.getOngoingSubscriptions().empty()); + // The start request is no longer in progress because it was terminated by the notify + expect (device.getOngoingRequests().empty()); + + device.processMessage ({ 0, getMessageBytes ({ ChannelInGroup::wholeBlock, + detail::MessageMeta::Meta::subID2, + detail::MessageMeta::implementationVersion, + inquiryMUID, + device.getMuid() }, + Message::PropertySubscribeResponse { { requestID->asByte(), successHeader, 1, 1, {} } }) }); + + expect (device.getOngoingRequests().empty()); + + output.messages.clear(); + + // There shouldn't be any queued messages. + device.sendPendingMessages(); + + expect (output.messages.empty()); + expect (device.getOngoingRequests().empty()); + } + + output.messages.clear(); + + beginTest ("Starting a new subscription while the device is waiting for a previous subscription to be confirmed queues further requests"); + { + PropertySubscriptionHeader header; + header.command = PropertySubscriptionCommand::start; + header.resource = "X-CustomProp"; + + const auto a = device.beginSubscription (inquiryMUID, header); + const auto b = device.beginSubscription (inquiryMUID, header); + const auto c = device.beginSubscription (inquiryMUID, header); + + expect (device.getOngoingSubscriptions() == std::vector { a, b, c }); + expect (device.getOngoingRequests().size() == 1); + + // subscription id is empty until the responder confirms the subscription + expect (device.getResourceForKey (a) == header.resource); + expect (device.getResourceForKey (b) == header.resource); + expect (device.getResourceForKey (c) == header.resource); + + expect (output.messages.size() == 1); + + // The device has sent a subscription start for a, but not for c, + // so it should send a notify to end subscription a, but shouldn't emit any + // messages related to subscription c. + device.endSubscription (a); + device.endSubscription (c); + + expect (device.getOngoingSubscriptions() == std::vector { b }); + + expect (output.messages.size() == 2); + expect (device.getOngoingRequests().empty()); + + // There should still be requests related to subscription b pending + device.sendPendingMessages(); + + expect (output.messages.size() == 3); + expect (device.getOngoingRequests().size() == 1); + + // Now, we should send a terminate request for subscription b + device.endSubscription (b); + + expect (device.getOngoingSubscriptions().empty()); + + expect (output.messages.size() == 4); + expect (device.getOngoingRequests().empty()); + } + + output.messages.clear(); + + beginTest ("If the device receives a retry or notify in response to a subscription start request, the subscription is retried or terminated as necessary"); + { + PropertySubscriptionHeader header; + header.command = PropertySubscriptionCommand::start; + header.resource = "X-CustomProp"; + + const auto a = device.beginSubscription (inquiryMUID, header); + + expect (device.getOngoingSubscriptions() == std::vector { a }); + expect (device.getOngoingRequests().size() == 1); + expect (output.messages.size() == 1); + + const auto request0 = device.getOngoingRequests().back(); + + device.processMessage ({ 0, getMessageBytes ({ ChannelInGroup::wholeBlock, + detail::MessageMeta::Meta::subID2, + detail::MessageMeta::implementationVersion, + inquiryMUID, + device.getMuid() }, + Message::PropertySubscribeResponse { { device.getIdForRequestKey (request0)->asByte(), retryHeader, 1, 1, {} } }) }); + + // The subscription is still active from the perspective of the device, but the + // first request is over and should be retried + expect (device.getOngoingSubscriptions() == std::vector { a }); + expect (device.getOngoingRequests().empty()); + expect (output.messages.size() == 1); + + device.sendPendingMessages(); + + expect (device.getOngoingSubscriptions() == std::vector { a }); + expect (device.getOngoingRequests().size() == 1); + expect (output.messages.size() == 2); + + const auto request1 = device.getOngoingRequests().back(); + + device.processMessage ({ 0, getMessageBytes ({ ChannelInGroup::wholeBlock, + detail::MessageMeta::Meta::subID2, + detail::MessageMeta::implementationVersion, + inquiryMUID, + device.getMuid() }, + Message::PropertyNotify { { device.getIdForRequestKey (request1)->asByte(), cancelHeader, 1, 1, {} } }) }); + + expect (device.getOngoingSubscriptions().empty()); + expect (device.getOngoingRequests().empty()); + expect (output.messages.size() == 2); + } + + beginTest ("If the device receives a retry or notify in response to a subscription end request, the subscription is retried as necessary"); + { + PropertySubscriptionHeader header; + header.command = PropertySubscriptionCommand::start; + header.resource = "X-CustomProp"; + + const auto a = device.beginSubscription (inquiryMUID, header); + + expect (device.getOngoingSubscriptions() == std::vector { a }); + expect (device.getResourceForKey (a) == header.resource); + + const auto subscriptionResponseHeader = Encodings::jsonTo7BitText ([] + { + auto ptr = std::make_unique(); + ptr->setProperty ("status", 200); + ptr->setProperty ("subscribeId", "newId"); + return ptr.release(); + }()); + + // Accept the subscription + device.processMessage ({ 0, getMessageBytes ({ ChannelInGroup::wholeBlock, + detail::MessageMeta::Meta::subID2, + detail::MessageMeta::implementationVersion, + inquiryMUID, + device.getMuid() }, + Message::PropertySubscribeResponse { { device.getIdForRequestKey (device.getOngoingRequests().back())->asByte(), subscriptionResponseHeader, 1, 1, {} } }) }); + + // The subscription is still active from the perspective of the device, but the + // request is over and should be retried + expect (device.getOngoingSubscriptions() == std::vector { a }); + // Now that the subscription was accepted, the subscription id should be non-empty + expect (device.getResourceForKey (a) == header.resource); + expect (device.getSubscribeIdForKey (a) == "newId"); + expect (device.getOngoingRequests().empty()); + + device.endSubscription (a); + + expect (device.getOngoingSubscriptions().empty()); + expect (device.getOngoingRequests().size() == 1); + + // The responder is busy, can't process the subscription end + device.processMessage ({ 0, getMessageBytes ({ ChannelInGroup::wholeBlock, + detail::MessageMeta::Meta::subID2, + detail::MessageMeta::implementationVersion, + inquiryMUID, + device.getMuid() }, + Message::PropertySubscribeResponse { { device.getIdForRequestKey (device.getOngoingRequests().back())->asByte(), retryHeader, 1, 1, {} } }) }); + + expect (device.getOngoingSubscriptions().empty()); + expect (device.getOngoingRequests().empty()); + + device.sendPendingMessages(); + + expect (device.getOngoingSubscriptions().empty()); + expect (device.getOngoingRequests().size() == 1); + + // The responder told us to immediately terminate our request to end the subscription! + // It's unclear how this should behave, so we'll just ignore the failure and assume + // the subscription is really over. + device.processMessage ({ 0, getMessageBytes ({ ChannelInGroup::wholeBlock, + detail::MessageMeta::Meta::subID2, + detail::MessageMeta::implementationVersion, + inquiryMUID, + device.getMuid() }, + Message::PropertyNotify { { device.getIdForRequestKey (device.getOngoingRequests().back())->asByte(), cancelHeader, 1, 1, {} } }) }); + + expect (device.getOngoingSubscriptions().empty()); + expect (device.getOngoingRequests().empty()); + + output.messages.clear(); + + device.sendPendingMessages(); + + expect (device.getOngoingSubscriptions().empty()); + expect (device.getOngoingRequests().empty()); + expect (output.messages.empty()); + } + + output.messages.clear(); + + const auto startResponseHeader = [&] + { + auto ptr = std::make_unique(); + ptr->setProperty ("status", 200); + ptr->setProperty ("subscribeId", "newId"); + return Encodings::jsonTo7BitText (ptr.release()); + }(); + + beginTest ("The responder can terminate a subscription"); + { + PropertySubscriptionHeader header; + header.command = PropertySubscriptionCommand::start; + header.resource = "X-CustomProp"; + + const auto a = device.beginSubscription (inquiryMUID, header); + + expect (device.getOngoingRequests().size() == 1); + expect (device.getOngoingSubscriptions().size() == 1); + expect (device.getResourceForKey (a) == "X-CustomProp"); + + device.processMessage ({ 0, getMessageBytes ({ ChannelInGroup::wholeBlock, + detail::MessageMeta::Meta::subID2, + detail::MessageMeta::implementationVersion, + inquiryMUID, + device.getMuid() }, + Message::PropertySubscribeResponse { { device.getIdForRequestKey (device.getOngoingRequests().back())->asByte(), + startResponseHeader, + 1, + 1, + {} } }) }); + + expect (device.getOngoingRequests().empty()); + expect (device.getOngoingSubscriptions().size() == 1); + expect (output.messages.size() == 1); + + output.messages.clear(); + + expect (device.getResourceForKey (a) == "X-CustomProp"); + expect (device.getSubscribeIdForKey (a) == "newId"); + + const auto endRequestHeader = [&] + { + auto ptr = std::make_unique(); + ptr->setProperty ("command", "end"); + ptr->setProperty ("subscribeId", "newId"); + return Encodings::jsonTo7BitText (ptr.release()); + }(); + + device.processMessage ({ 0, getMessageBytes ({ ChannelInGroup::wholeBlock, + detail::MessageMeta::Meta::subID2, + detail::MessageMeta::implementationVersion, + inquiryMUID, + device.getMuid() }, + Message::PropertySubscribe { { std::byte { 0x42 }, endRequestHeader, 1, 1, {} } }) }); + + expect (device.getOngoingRequests().empty()); + expect (device.getOngoingSubscriptions().empty()); + expect (output.messages.size() == 1); + + { + const auto parsed = Parser::parse (output.messages.back().bytes); + expect (parsed.has_value()); + expect (parsed->header == Message::Header { ChannelInGroup::wholeBlock, + detail::MessageMeta::Meta::subID2, + detail::MessageMeta::implementationVersion, + device.getMuid(), + inquiryMUID }); + auto* body = std::get_if (&parsed->body); + expect (body != nullptr); + const auto bodyHeader = Encodings::jsonFrom7BitText (body->header); + expect (bodyHeader.getProperty ("status", "") == var (200)); + } + } + + beginTest ("Invalidating a MUID clears subscriptions to that MUID"); + { + PropertySubscriptionHeader header; + header.command = PropertySubscriptionCommand::start; + header.resource = "X-CustomProp"; + + const auto a = device.beginSubscription (inquiryMUID, header); + + device.processMessage ({ 0, getMessageBytes ({ ChannelInGroup::wholeBlock, + detail::MessageMeta::Meta::subID2, + detail::MessageMeta::implementationVersion, + inquiryMUID, + device.getMuid() }, + Message::PropertySubscribeResponse { { device.getIdForRequestKey (device.getOngoingRequests().back())->asByte(), + startResponseHeader, + 1, + 1, + {} } }) }); + + expect (device.getOngoingSubscriptions() == std::vector { a }); + + device.processMessage ({ 0, getMessageBytes ({ ChannelInGroup::wholeBlock, + detail::MessageMeta::Meta::subID2, + detail::MessageMeta::implementationVersion, + inquiryMUID, + MUID::getBroadcast() }, + Message::InvalidateMUID { inquiryMUID }) }); + + expect (device.getOngoingSubscriptions().empty()); + } + + beginTest ("Disconnecting and then connecting with the same MUID doesn't reuse SubscribeKeys"); + { + expect (device.getDiscoveredMuids().empty()); + + device.processMessage ({ 0, getMessageBytes ({ ChannelInGroup::wholeBlock, + detail::MessageMeta::Meta::subID2, + detail::MessageMeta::implementationVersion, + inquiryMUID, + MUID::getBroadcast() }, + Message::Discovery { {}, DeviceFeatures{}.withPropertyExchangeSupported (true).getSupportedCapabilities(), 512, {} }) }); + + expect (device.getDiscoveredMuids().size() == 1); + + PropertySubscriptionHeader header; + header.command = PropertySubscriptionCommand::start; + header.resource = "X-CustomProp"; + + const auto subscription = device.beginSubscription (inquiryMUID, header); + + device.processMessage ({ 0, getMessageBytes ({ ChannelInGroup::wholeBlock, + detail::MessageMeta::Meta::subID2, + detail::MessageMeta::implementationVersion, + inquiryMUID, + device.getMuid() }, + Message::PropertySubscribeResponse { { device.getIdForRequestKey (device.getOngoingRequests().back())->asByte(), + startResponseHeader, + 1, + 1, + {} } }) }); + + expect (device.getSubscribeIdForKey (subscription) == "newId"); + expect (device.getResourceForKey (subscription) == "X-CustomProp"); + + device.processMessage ({ 0, getMessageBytes ({ ChannelInGroup::wholeBlock, + detail::MessageMeta::Meta::subID2, + detail::MessageMeta::implementationVersion, + inquiryMUID, + MUID::getBroadcast() }, + Message::InvalidateMUID { inquiryMUID }) }); + + expect (device.getDiscoveredMuids().empty()); + + device.processMessage ({ 0, getMessageBytes ({ ChannelInGroup::wholeBlock, + detail::MessageMeta::Meta::subID2, + detail::MessageMeta::implementationVersion, + inquiryMUID, + MUID::getBroadcast() }, + Message::Discovery { {}, DeviceFeatures{}.withPropertyExchangeSupported (true).getSupportedCapabilities(), 512, {} }) }); + + expect (device.getDiscoveredMuids().size() == 1); + + const auto newSubscription = device.beginSubscription (inquiryMUID, header); + + expect (subscription != newSubscription); + expect (device.getOngoingSubscriptions() == std::vector { newSubscription }); } } } diff --git a/modules/juce_midi_ci/ci/juce_CIDevice.h b/modules/juce_midi_ci/ci/juce_CIDevice.h index aa5b9cce2b..c91c519ccc 100644 --- a/modules/juce_midi_ci/ci/juce_CIDevice.h +++ b/modules/juce_midi_ci/ci/juce_CIDevice.h @@ -144,86 +144,74 @@ public: */ void sendPropertyCapabilitiesInquiry (MUID destination); - /** Sends an inquiry to get a property value from another device, invoking a callback once - the full transaction has completed. + /** Initiates an inquiry to fetch a property from a particular device. - @param destination the device whose property will be set - @param header information about the property data that will be sent - @param onResult this will be called once the result of the transaction is known. - If the transaction cannot start for some reason (e.g. the request is - malformed, or there are too many simultaneous requests) then the - function will be called immediately. Otherwise, the function will be - called once the destination device has confirmed receipt of the - inquiry. - @return a token bounding the lifetime of the request. - If you need to terminate the transaction before it has completed, - you can call reset() on this token, or cause its destructor to run. + @param m the MUID of the device to query + @param header specifies the resource to query, along with format/encoding options + @param onResult called when the transaction completes; not called if the transaction fails to start + @returns a key uniquely identifying this request, if the transaction begins successfully, or nullopt otherwise */ - ErasedScopeGuard sendPropertyGetInquiry (MUID destination, - const PropertyRequestHeader& header, - std::function onResult); + std::optional sendPropertyGetInquiry (MUID m, + const PropertyRequestHeader& header, + std::function onResult); - /** Sends an inquiry to set a property value on another device, invoking a callback once - the full transaction has completed. + /** Initiates an inquiry to set a property on a particular device. - @param destination the device whose property will be set - @param header information about the property data that will be sent - @param body the property data payload to send. - If the header specifies 'ascii' encoding, then you are responsible - for ensuring that no byte of the payload data has its most - significant bit set. Sending the message will fail if this is not - the case. Otherwise, if another encoding is specified then the - payload data may contain any byte values. You should not attempt to - encode the data yourself; the payload will be automatically encoded - before being sent. - @param onResult this will be called once the result of the transaction is known. - If the transaction cannot start for some reason (e.g. the - destination does not support property exchange, the request is - malformed, or there are too many simultaneous requests) then the - function will be called immediately. Otherwise, the function will be - called once the destination device has confirmed receipt of the - inquiry. + @param m the MUID of the device to query + @param header specifies the resource to query, along with format/encoding options + @param body the unencoded body content of the message + @param onResult called when the transaction completes; not called if the transaction fails to start + @returns a key uniquely identifying this request, if the transaction begins successfully, or nullopt otherwise */ - void sendPropertySetInquiry (MUID destination, - const PropertyRequestHeader& header, - Span body, - std::function onResult); + std::optional sendPropertySetInquiry (MUID m, + const PropertyRequestHeader& header, + Span body, + std::function onResult); - /** Sends an inquiry to start a subscription to a property on a device. - The provided callback will be called to indicate whether starting the subscription - succeeded or failed. - When the remote device indicates that its property value has changed, - DeviceListener::propertySubscriptionReceived will be called with information about the - update. + /** Cancels a request started with sendPropertyGetInquiry() or sendPropertySetInquiry(). + + This sends a property notify message indicating that the responder no longer needs to + process the initial request. */ - void sendPropertySubscriptionStart (MUID, - const PropertySubscriptionHeader& header, - std::function); + void abortPropertyRequest (RequestKey); - /** Sends an inquiry to end a subscription to a property on a device. - The provided callback will be called to indicate whether the subscriber acknowledged - receipt of the message. - Note that the remote device may also choose to terminate the subscription of its own - accord - in this case, the end request will be sent to - DeviceListener::propertySubscriptionReceived. + /** Returns the request id corresponding to a particular request. + + If the request could not be found (it never started, or already finished), then this + returns nullopt. */ - void sendPropertySubscriptionEnd (MUID, - const String& subscribeId, - std::function); + std::optional getIdForRequestKey (RequestKey) const; - /** Returns all of the subscriptions that we have requested from another device. + /** Returns all the ongoing requests. */ + std::vector getOngoingRequests() const; - Does *not* include subscriptions that other devices have requested from us. + /** Attempts to begin a subscription with the provided attributes. + + Once the subscription is no longer required, cancel it by passing the SubscriptionKey to endSubscription(). */ - std::vector getOngoingSubscriptionsForMuid (MUID m) const; + SubscriptionKey beginSubscription (MUID m, const PropertySubscriptionHeader& header); - /** Returns the number of transactions initiated by us that are yet to receive complete replies. + /** Ends a previously-started subscription. */ + void endSubscription (SubscriptionKey); - Does *not* include the count of unfinished requests addressed to us by other devices. + /** Returns all the subscriptions that have been initiated by this device. */ + std::vector getOngoingSubscriptions() const; - @see PropertyHost::countOngoingTransactions() + /** If the provided subscription has started successfully, this returns the subscribeId assigned + to the subscription by the remote device. */ - int countOngoingPropertyTransactions() const; + std::optional getSubscribeIdForKey (SubscriptionKey key) const; + + /** If the provided subscription has not been cancelled, this returns the name of the + subscribed resource. + */ + std::optional getResourceForKey (SubscriptionKey key) const; + + /** Sends any cached messages that need retrying. + + @returns true if there are no more messages to send, or false otherwise + */ + bool sendPendingMessages(); //============================================================================== /** Adds a listener that will be notified when particular events occur. diff --git a/modules/juce_midi_ci/ci/juce_CIDeviceListener.h b/modules/juce_midi_ci/ci/juce_CIDeviceListener.h index 6314ecd4a7..37e48f326c 100644 --- a/modules/juce_midi_ci/ci/juce_CIDeviceListener.h +++ b/modules/juce_midi_ci/ci/juce_CIDeviceListener.h @@ -141,6 +141,14 @@ struct DeviceListener */ virtual void propertySubscriptionDataReceived ([[maybe_unused]] MUID x, [[maybe_unused]] const PropertySubscriptionData& data) {} + + /** Called when a remote device updates a subscription by accepting or terminating it. + + If the subscription was accepted, the subscribeId will be non-null. Otherwise, a null + subscribeId indicates that the subscription was terminated. + */ + virtual void propertySubscriptionChanged ([[maybe_unused]] SubscriptionKey subscription, + [[maybe_unused]] const std::optional& subscribeId) {} }; } // namespace juce::midi_ci diff --git a/modules/juce_midi_ci/ci/juce_CIPropertyExchangeCache.cpp b/modules/juce_midi_ci/ci/juce_CIPropertyExchangeCache.cpp index 66a17d7a03..518b405730 100644 --- a/modules/juce_midi_ci/ci/juce_CIPropertyExchangeCache.cpp +++ b/modules/juce_midi_ci/ci/juce_CIPropertyExchangeCache.cpp @@ -29,8 +29,7 @@ namespace juce::midi_ci class PropertyExchangeCache { public: - explicit PropertyExchangeCache (std::function term) - : onTerminate (std::move (term)) {} + PropertyExchangeCache() = default; struct OwningResult { @@ -66,16 +65,20 @@ public: const auto headerJson = JSON::parse (String (headerStorage.data(), headerStorage.size())); - onTerminate = nullptr; + terminate(); const auto encodingString = headerJson.getProperty ("mutualEncoding", "ASCII").toString(); if (chunk.thisChunkNum != chunk.totalNumChunks) return std::optional { std::in_place, PropertyExchangeResult::Error::partial }; + const int status = headerJson.getProperty ("status", 200); + + if (status == 343) + return std::optional { std::in_place, PropertyExchangeResult::Error::tooManyTransactions }; + return std::optional { std::in_place, headerJson, Encodings::decode (bodyStorage, EncodingUtils::toEncoding (encodingString.toRawUTF8()).value_or (Encoding::ascii)) }; - } std::optional notify (Span header) @@ -90,21 +93,20 @@ public: if (! status.isInt() || (int) status == 100) return {}; - onTerminate = nullptr; + terminate(); return std::optional { std::in_place, PropertyExchangeResult::Error::notify }; } - void terminate() + bool terminate() { - if (auto t = std::exchange (onTerminate, nullptr)) - t(); + return std::exchange (ongoing, false); } private: std::vector headerStorage; std::vector bodyStorage; - std::function onTerminate; uint16_t lastChunk = 0; + bool ongoing = true; }; //============================================================================== @@ -113,52 +115,104 @@ class PropertyExchangeCacheArray public: PropertyExchangeCacheArray() = default; - ErasedScopeGuard primeCacheForRequestId (std::byte id, - std::function onDone, - std::function onTerminate) + Token64 primeCacheForRequestId (uint8_t id, std::function onDone) { - auto& entry = caches[(uint8_t) id]; - entry = std::make_shared (std::move (onDone), std::move (onTerminate)); - auto weak = std::weak_ptr (entry); + jassert (id < caches.size()); - return ErasedScopeGuard { [&entry, weak] + ++lastKey; + + auto& entry = caches[id]; + + if (entry.has_value()) { - // If this fails, then the transaction finished before the ErasedScopeGuard was destroyed. - if (auto locked = weak.lock()) - { - entry->cache.terminate(); - entry = nullptr; - } - } }; + // Trying to start a new message with the same id as another in-progress message + jassertfalse; + ids.erase (entry->key); + } + + const auto& item = entry.emplace (id, std::move (onDone), Token64 { lastKey }); + ids.emplace (item.key, id); + return item.key; } - void addChunk (std::byte b, const Message::DynamicSizePropertyExchange& chunk) + bool terminate (Token64 key) + { + const auto iter = ids.find (key); + + // If the key isn't found, then the transaction must have completed already + if (iter == ids.end()) + return false; + + // We're about to terminate this transaction, so we don't need to retain this record + auto index = iter->second; + ids.erase (iter); + + auto& entry = caches[index]; + + // If the entry is null, something's gone wrong. The ids map should only contain elements for + // non-null cache entries. + if (! entry.has_value()) + { + jassertfalse; + return false; + } + + const auto result = entry->cache.terminate(); + entry.reset(); + return result; + } + + void addChunk (RequestID b, const Message::DynamicSizePropertyExchange& chunk) { updateCache (b, [&] (PropertyExchangeCache& c) { return c.addChunk (chunk); }); } - void notify (std::byte b, Span header) + void notify (RequestID b, Span header) { updateCache (b, [&] (PropertyExchangeCache& c) { return c.notify (header); }); } - bool hasTransaction (std::byte id) const + std::optional getKeyForId (RequestID id) const { - return caches[(uint8_t) id] != nullptr; + if (auto& c = caches[id.asInt()]) + return c->key; + + return {}; } - uint8_t countOngoingTransactions() const + bool hasTransaction (RequestID id) const { - return (uint8_t) std::count_if (caches.begin(), caches.end(), [] (auto& c) { return c != nullptr; }); + return getKeyForId (id).has_value(); } - /** MSB of result is set on failure. */ - std::byte findUnusedId (uint8_t maxSimultaneousTransactions) const + std::optional getIdForKey (Token64 key) const + { + const auto iter = ids.find (key); + return iter != ids.end() ? RequestID::create (iter->second) : std::nullopt; + } + + auto countOngoingTransactions() const + { + jassert (ids.size() == (size_t) std::count_if (caches.begin(), caches.end(), [] (auto& c) { return c.has_value(); })); + + return (int) ids.size(); + } + + auto getOngoingTransactions() const + { + jassert (ids.size() == (size_t) std::count_if (caches.begin(), caches.end(), [] (auto& c) { return c.has_value(); })); + + std::vector result (ids.size()); + std::transform (ids.begin(), ids.end(), result.begin(), [] (const auto& p) { return Token64 { p.first }; }); + return result; + } + + std::optional findUnusedId (uint8_t maxSimultaneousTransactions) const { if (countOngoingTransactions() >= maxSimultaneousTransactions) - return std::byte { 0xff }; + return {}; - return (std::byte) std::distance (caches.begin(), std::find (caches.begin(), caches.end(), nullptr)); + return RequestID::create ((uint8_t) std::distance (caches.begin(), std::find (caches.begin(), caches.end(), std::nullopt))); } // Instances must stay at the same location to ensure that references captured in the @@ -172,57 +226,66 @@ private: class Transaction { public: - Transaction (std::function onSuccess, - std::function onTerminate) - : cache (std::move (onTerminate)), onFinish (std::move (onSuccess)) {} + Transaction (uint8_t i, std::function onSuccess, Token64 k) + : onFinish (std::move (onSuccess)), key (k), id (i) {} PropertyExchangeCache cache; std::function onFinish; + Token64 key{}; + uint8_t id = 0; }; template - void updateCache (std::byte b, WithCache&& withCache) + void updateCache (RequestID b, WithCache&& withCache) { - if (auto& entry = caches[(uint8_t) b]) + if (auto& entry = caches[b.asInt()]) { if (const auto result = withCache (entry->cache)) { - const auto tmp = std::move (entry->onFinish); - entry = nullptr; - NullCheckedInvocation::invoke (tmp, result->result); + const auto tmp = std::move (*entry); + ids.erase (tmp.key); + entry.reset(); + NullCheckedInvocation::invoke (tmp.onFinish, result->result); } } } - std::array, numCaches> caches; + std::array, numCaches> caches; + std::map ids; + uint64_t lastKey = 0; }; //============================================================================== class InitiatorPropertyExchangeCache::Impl { public: - TokenAndId primeCache (uint8_t maxSimultaneousRequests, - std::function onDone, - std::function onTerminate) + std::optional primeCache (uint8_t maxSimultaneousRequests, + std::function onDone) { const auto id = array.findUnusedId (maxSimultaneousRequests); - if ((id & std::byte { 0x80 }) != std::byte{}) - { - NullCheckedInvocation::invoke (onDone, PropertyExchangeResult { PropertyExchangeResult::Error::tooManyTransactions }); - return {}; - } - - auto token = array.primeCacheForRequestId (id, - std::move (onDone), - [id, term = std::move (onTerminate)] { NullCheckedInvocation::invoke (term, id); }); - return { std::move (token), id }; + return id.has_value() ? std::optional (array.primeCacheForRequestId (id->asInt(), std::move (onDone))) + : std::nullopt; } - void addChunk (std::byte b, const Message::DynamicSizePropertyExchange& chunk) { array.addChunk (b, chunk); } - void notify (std::byte b, Span header) { array.notify (b, header); } - int countOngoingTransactions() const { return array.countOngoingTransactions(); } - bool isAwaitingResponse() const { return countOngoingTransactions() != 0; } + bool terminate (Token64 token) + { + return array.terminate (token); + } + + std::optional getTokenForRequestId (RequestID id) const + { + return array.getKeyForId (id); + } + + std::optional getRequestIdForToken (Token64 token) const + { + return array.getIdForKey (token); + } + + void addChunk (RequestID b, const Message::DynamicSizePropertyExchange& chunk) { array.addChunk (b, chunk); } + void notify (RequestID b, Span header) { array.notify (b, header); } + auto getOngoingTransactions() const { return array.getOngoingTransactions(); } private: PropertyExchangeCacheArray array; @@ -234,17 +297,18 @@ InitiatorPropertyExchangeCache::InitiatorPropertyExchangeCache (InitiatorPropert InitiatorPropertyExchangeCache& InitiatorPropertyExchangeCache::operator= (InitiatorPropertyExchangeCache&&) noexcept = default; InitiatorPropertyExchangeCache::~InitiatorPropertyExchangeCache() = default; -InitiatorPropertyExchangeCache::TokenAndId InitiatorPropertyExchangeCache::primeCache (uint8_t maxSimultaneousTransactions, - std::function onDone, - std::function onTerminate) +std::optional InitiatorPropertyExchangeCache::primeCache (uint8_t maxSimultaneousTransactions, + std::function onDone) { - return pimpl->primeCache (maxSimultaneousTransactions, std::move (onDone), std::move (onTerminate)); + return pimpl->primeCache (maxSimultaneousTransactions, std::move (onDone)); } -void InitiatorPropertyExchangeCache::addChunk (std::byte b, const Message::DynamicSizePropertyExchange& chunk) { pimpl->addChunk (b, chunk); } -void InitiatorPropertyExchangeCache::notify (std::byte b, Span header) { pimpl->notify (b, header); } -int InitiatorPropertyExchangeCache::countOngoingTransactions() const { return pimpl->countOngoingTransactions(); } -bool InitiatorPropertyExchangeCache::isAwaitingResponse() const { return pimpl->isAwaitingResponse(); } +bool InitiatorPropertyExchangeCache::terminate (Token64 token) { return pimpl->terminate (token); } +std::optional InitiatorPropertyExchangeCache::getTokenForRequestId (RequestID id) const { return pimpl->getTokenForRequestId (id); } +std::optional InitiatorPropertyExchangeCache::getRequestIdForToken (Token64 token) const { return pimpl->getRequestIdForToken (token); } +void InitiatorPropertyExchangeCache::addChunk (RequestID b, const Message::DynamicSizePropertyExchange& chunk) { pimpl->addChunk (b, chunk); } +void InitiatorPropertyExchangeCache::notify (RequestID b, Span header) { pimpl->notify (b, header); } +std::vector InitiatorPropertyExchangeCache::getOngoingTransactions() const { return pimpl->getOngoingTransactions(); } //============================================================================== class ResponderPropertyExchangeCache::Impl @@ -252,7 +316,7 @@ class ResponderPropertyExchangeCache::Impl public: void primeCache (uint8_t maxSimultaneousTransactions, std::function onDone, - std::byte id) + RequestID id) { if (array.hasTransaction (id)) return; @@ -260,11 +324,11 @@ public: if (array.countOngoingTransactions() >= maxSimultaneousTransactions) NullCheckedInvocation::invoke (onDone, PropertyExchangeResult { PropertyExchangeResult::Error::tooManyTransactions }); else - array.primeCacheForRequestId (id, std::move (onDone), nullptr).release(); + array.primeCacheForRequestId (id.asInt(), std::move (onDone)); } - void addChunk (std::byte b, const Message::DynamicSizePropertyExchange& chunk) { array.addChunk (b, chunk); } - void notify (std::byte b, Span header) { array.notify (b, header); } + void addChunk (RequestID b, const Message::DynamicSizePropertyExchange& chunk) { array.addChunk (b, chunk); } + void notify (RequestID b, Span header) { array.notify (b, header); } int countOngoingTransactions() const { return array.countOngoingTransactions(); } private: @@ -279,13 +343,13 @@ ResponderPropertyExchangeCache::~ResponderPropertyExchangeCache() = default; void ResponderPropertyExchangeCache::primeCache (uint8_t maxSimultaneousTransactions, std::function onDone, - std::byte id) + RequestID id) { return pimpl->primeCache (maxSimultaneousTransactions, std::move (onDone), id); } -void ResponderPropertyExchangeCache::addChunk (std::byte b, const Message::DynamicSizePropertyExchange& chunk) { pimpl->addChunk (b, chunk); } -void ResponderPropertyExchangeCache::notify (std::byte b, Span header) { pimpl->notify (b, header); } +void ResponderPropertyExchangeCache::addChunk (RequestID b, const Message::DynamicSizePropertyExchange& chunk) { pimpl->addChunk (b, chunk); } +void ResponderPropertyExchangeCache::notify (RequestID b, Span header) { pimpl->notify (b, header); } int ResponderPropertyExchangeCache::countOngoingTransactions() const { return pimpl->countOngoingTransactions(); } } // namespace juce::midi_ci diff --git a/modules/juce_midi_ci/ci/juce_CIPropertyExchangeCache.h b/modules/juce_midi_ci/ci/juce_CIPropertyExchangeCache.h index 2c077d2fba..8dbc7077e5 100644 --- a/modules/juce_midi_ci/ci/juce_CIPropertyExchangeCache.h +++ b/modules/juce_midi_ci/ci/juce_CIPropertyExchangeCache.h @@ -26,6 +26,81 @@ namespace juce::midi_ci { +/** + A strongly-typed identifier for a 7-bit request ID with a nullable state. + + @tags{Audio} +*/ +class RequestID +{ +public: + /** Constructs a RequestID if the provided value is valid, i.e. its most significant bit is + not set. Otherwise, returns nullopt. + */ + static std::optional create (uint8_t v) + { + if (v < 128) + return RequestID { v }; + + return {}; + } + + /** Constructs a RequestID if the provided value is valid, i.e. its most significant bit is + not set. Otherwise, returns nullopt. + */ + static std::optional create (std::byte value) + { + return create (static_cast (value)); + } + + /** Returns the byte corresponding to this ID. */ + std::byte asByte() const + { + return std::byte { value }; + } + + /** Returns the int value of this ID. */ + uint8_t asInt() const + { + return value; + } + + /** Equality operator. */ + bool operator== (RequestID other) const + { + return value == other.value; + } + + /** Inequality operator. */ + bool operator!= (RequestID other) const + { + return ! operator== (other); + } + +private: + /* Constructs a non-null request ID. + + The argument must not have its most significant bit set. + */ + explicit RequestID (uint8_t index) + : value (index) + { + // IDs must only use the lowest 7 bits + jassert (value < 128); + } + + uint8_t value{}; +}; + +/** A strongly-typed 64-bit identifier. */ +enum class Token64 : uint64_t {}; + +/** Compares Token64 instances. */ +constexpr bool operator< (Token64 a, Token64 b) +{ + return toUnderlyingType (a) < toUnderlyingType (b); +} + /** Accumulates message chunks that have been sent by another device in response to a transaction initiated by a local device. @@ -43,44 +118,38 @@ public: JUCE_DECLARE_NON_COPYABLE (InitiatorPropertyExchangeCache) - /** Holds a token that can be used to stop waiting for a reply, along with - an identifier byte that uniquely identifies an ongoing transaction. - - @tags{Audio} - */ - struct TokenAndId - { - TokenAndId() = default; - TokenAndId (ErasedScopeGuard tokenIn, std::byte idIn) - : token (std::move (tokenIn)), id (idIn) {} - - bool isValid() const { return (id & std::byte { 0x80 }) == std::byte{}; } - - ErasedScopeGuard token{}; - std::byte id { 0x80 }; - }; - /** Picks an unused request ID, and prepares the cache for that ID to accumulate message chunks. Incoming chunks added with addChunk are generated by another device acting as a responder. */ - TokenAndId primeCache (uint8_t maxSimultaneousRequests, - std::function onDone, - std::function onTerminate); + std::optional primeCache (uint8_t maxSimultaneousRequests, + std::function onDone); + + /** Terminates/cancels an ongoing transaction. + + Returns true if the termination had an effect (i.e. the transaction was still ongoing), or + false otherwise (the transaction already ended or never started). + */ + bool terminate (Token64); + + /** If there's a transaction ongoing with the given request id, returns the token uniquely + identifying that transaction, otherwise returns nullopt. + */ + std::optional getTokenForRequestId (RequestID) const; + + /** If the token refers to an ongoing transaction, returns the request id of that transaction. + Otherwise, returns an invalid request id. + */ + std::optional getRequestIdForToken (Token64) const; /** Adds a message chunk for the provided transaction id. */ - void addChunk (std::byte b, const Message::DynamicSizePropertyExchange& chunk); + void addChunk (RequestID, const Message::DynamicSizePropertyExchange& chunk); /** Updates the transaction state based on the contents of the provided notification. */ - void notify (std::byte b, Span header); + void notify (RequestID, Span header); - /** Returns the number of transactions that have been started but not finished. */ - int countOngoingTransactions() const; - - /** Returns true if there are any transactions in progress that - haven't yet received replies. - */ - bool isAwaitingResponse() const; + /** Returns all ongoing transactions. */ + std::vector getOngoingTransactions() const; private: class Impl; @@ -104,19 +173,19 @@ public: JUCE_DECLARE_NON_COPYABLE (ResponderPropertyExchangeCache) - /** Prepares the cache for the given requestId to accumulate message chunks. + /** Prepares the cache for the given requestID to accumulate message chunks. Incoming chunks added with addChunk are generated by another device acting as an initiator. */ void primeCache (uint8_t maxSimultaneousTransactions, std::function onDone, - std::byte id); + RequestID id); /** Adds a message chunk for the provided transaction id. */ - void addChunk (std::byte b, const Message::DynamicSizePropertyExchange& chunk); + void addChunk (RequestID, const Message::DynamicSizePropertyExchange& chunk); /** Updates the transaction state based on the contents of the provided notification. */ - void notify (std::byte b, Span header); + void notify (RequestID, Span header); /** Returns the number of transactions that have been started but not finished. */ int countOngoingTransactions() const; diff --git a/modules/juce_midi_ci/ci/juce_CIPropertyExchangeResult.h b/modules/juce_midi_ci/ci/juce_CIPropertyExchangeResult.h index 29d19dc463..cecd992a82 100644 --- a/modules/juce_midi_ci/ci/juce_CIPropertyExchangeResult.h +++ b/modules/juce_midi_ci/ci/juce_CIPropertyExchangeResult.h @@ -49,10 +49,6 @@ public: tooManyTransactions, ///< Unable to send the request because doing so would ///< exceed the number of simultaneous inquiries that were declared. ///< @see PropertyDelegate::getNumSimultaneousRequestsSupported(). - - invalidPayload, ///< The payload couldn't be encoded for transmission. If you're - ///< using the ASCII encoding, maybe some bytes have their most - ///< significant bit set. }; /** Creates a result denoting an error state. */ diff --git a/modules/juce_midi_ci/ci/juce_CIPropertyHost.cpp b/modules/juce_midi_ci/ci/juce_CIPropertyHost.cpp index ce518f443f..7e9c5f8576 100644 --- a/modules/juce_midi_ci/ci/juce_CIPropertyHost.cpp +++ b/modules/juce_midi_ci/ci/juce_CIPropertyHost.cpp @@ -36,7 +36,6 @@ public: void visit (const Message::PropertyGetData& body) const override { visitImpl (body); } void visit (const Message::PropertySetData& body) const override { visitImpl (body); } void visit (const Message::PropertySubscribe& body) const override { visitImpl (body); } - void visit (const Message::PropertyNotify& body) const override { visitImpl (body); } using MessageVisitor::visit; private: @@ -88,43 +87,56 @@ private: const auto source = output->getIncomingHeader().source; const auto dest = output->getIncomingHeader().destination; const auto group = output->getIncomingGroup(); - const auto request = data.requestID; - caches->primeCache (host->delegate.getNumSimultaneousRequestsSupported(), [this, source, dest, group, request] (const PropertyExchangeResult& result) + const auto request = RequestID::create (data.requestID); + + if (! request.has_value()) + return false; + + caches->primeCache (host->delegate.getNumSimultaneousRequestsSupported(), [hostPtr = host, source, dest, group, request] (const PropertyExchangeResult& result) { const auto send = [&] (const PropertyReplyHeader& header) { - detail::MessageTypeUtils::send (host->output, + detail::MessageTypeUtils::send (hostPtr->output, group, Message::Header { ChannelInGroup::wholeBlock, detail::MessageMeta::Meta::subID2, detail::MessageMeta::implementationVersion, dest, source }, - Message::PropertySetDataResponse { { request, Encodings::jsonTo7BitText (header.toVarCondensed()) } }); + Message::PropertySetDataResponse { { request->asByte(), Encodings::jsonTo7BitText (header.toVarCondensed()) } }); }; - if (result.getError() == PropertyExchangeResult::Error::tooManyTransactions) + const auto sendStatus = [&] (int status, StringRef message) { PropertyReplyHeader header; - header.status = 343; - header.message = TRANS ("The device has initiated too many simultaneous requests"); + header.status = status; + header.message = message; send (header); + }; + + if (const auto error = result.getError()) + { + switch (*error) + { + case PropertyExchangeResult::Error::tooManyTransactions: + sendStatus (343, TRANS ("The device has initiated too many simultaneous requests")); + break; + + case PropertyExchangeResult::Error::partial: + sendStatus (400, TRANS ("Request was incomplete")); + break; + + case PropertyExchangeResult::Error::notify: + break; + } + return; } - if (result.getError().has_value()) - { - PropertyReplyHeader header; - header.status = 400; - header.message = TRANS ("Request was incomplete"); - send (header); - return; - } + send (hostPtr->delegate.propertySetDataRequested (source, { result.getHeaderAsRequestHeader(), result.getBody() })); + }, *request); - send (host->delegate.propertySetDataRequested (source, { result.getHeaderAsRequestHeader(), result.getBody() })); - }, request); - - caches->addChunk (data.requestID, data); + caches->addChunk (*request, data); return true; } @@ -206,19 +218,6 @@ private: return false; } - bool messageReceived (const Message::PropertyNotify& n) const - { - const auto m = output->getIncomingHeader().source; - - if (auto* it = host->cacheProvider.getCacheForMuidAsResponder (m)) - it->notify (n.requestID, n.header); - - if (auto* it = host->cacheProvider.getCacheForMuidAsInitiator (m)) - it->notify (n.requestID, n.header); - - return true; - } - PropertyHost* host = nullptr; ResponderOutput* output = nullptr; bool* handled = nullptr; @@ -263,10 +262,10 @@ bool PropertyHost::tryRespond (ResponderOutput& responderOutput, const Message:: return result; } -ErasedScopeGuard PropertyHost::sendSubscriptionUpdate (MUID device, - const PropertySubscriptionHeader& header, - Span body, - std::function cb) +std::optional PropertyHost::sendSubscriptionUpdate (MUID device, + const PropertySubscriptionHeader& header, + Span body, + std::function cb) { const auto deviceIter = registry.find (device); @@ -309,7 +308,6 @@ ErasedScopeGuard PropertyHost::sendSubscriptionUpdate (MUID device, if (caches == nullptr) return {}; - const auto terminator = detail::PropertyHostUtils::getTerminator (output, functionBlock, device); auto wrappedCallback = [&]() -> std::function { if (header.command != PropertySubscriptionCommand::end) @@ -331,27 +329,32 @@ ErasedScopeGuard PropertyHost::sendSubscriptionUpdate (MUID device, if (! encoded.has_value()) { - NullCheckedInvocation::invoke (wrappedCallback, PropertyExchangeResult { PropertyExchangeResult::Error::invalidPayload }); + // The data could not be encoded successfully + jassertfalse; return {}; } - auto primed = caches->primeCache (delegate.getNumSimultaneousRequestsSupported(), - std::move (wrappedCallback), - std::move (terminator)); + const auto primed = caches->primeCache (delegate.getNumSimultaneousRequestsSupported(), + std::move (wrappedCallback)); - if (! primed.isValid()) + if (! primed.has_value()) + return {}; + + const auto id = caches->getRequestIdForToken (*primed); + + if (! id.has_value()) return {}; detail::PropertyHostUtils::send (output, functionBlock.firstGroup, detail::MessageMeta::Meta::subID2, device, - primed.id, + id->asByte(), Encodings::jsonTo7BitText (header.toVarCondensed()), *encoded, cacheProvider.getMaxSysexSizeForMuid (device)); - return std::move (primed.token); + return RequestKey { device, *primed }; } void PropertyHost::terminateSubscription (MUID device, const String& subscribeId) @@ -380,7 +383,7 @@ void PropertyHost::terminateSubscription (MUID device, const String& subscribeId header.subscribeId = subscribeId; header.resource = subIter->second; - sendSubscriptionUpdate (device, header, {}, nullptr).release(); + sendSubscriptionUpdate (device, header, {}, nullptr); } PropertyHost::SubscriptionToken PropertyHost::uidFromSubscribeId (String id) diff --git a/modules/juce_midi_ci/ci/juce_CIPropertyHost.h b/modules/juce_midi_ci/ci/juce_CIPropertyHost.h index f515afa231..0c410e46c1 100644 --- a/modules/juce_midi_ci/ci/juce_CIPropertyHost.h +++ b/modules/juce_midi_ci/ci/juce_CIPropertyHost.h @@ -26,6 +26,39 @@ namespace juce::midi_ci { +/** + A key used to uniquely identify ongoing transactions initiated by a ci::Device. + + @tags{Audio} +*/ +class RequestKey +{ + auto tie() const { return std::tuple (m, v); } + +public: + /** Constructor. */ + RequestKey (MUID muid, Token64 key) : m (muid), v (key) {} + + /** Returns the muid of the device to which we are subscribed. */ + MUID getMuid() const { return m; } + + /** Returns an identifier unique to this subscription. */ + Token64 getKey() const { return v; } + + /** Equality operator. */ + bool operator== (const RequestKey& other) const { return tie() == other.tie(); } + + /** Inequality operator. */ + bool operator!= (const RequestKey& other) const { return tie() != other.tie(); } + + /** Less-than operator. */ + bool operator< (const RequestKey& other) const { return tie() < other.tie(); } + +private: + MUID m; + Token64 v{}; +}; + /** Acting as a ResponderListener, instances of this class can formulate appropriate replies to property transactions initiated by remote devices. @@ -67,13 +100,12 @@ public: The provided callback will be called once the remote device has confirmed receipt of the subscription update. If the state of your application changes such that you no longer need to respond/wait for confirmation, - you can allow the returned Guard to fall out of scope, or reset it - manually. + you can pass the request key to Device::abortPropertyRequest(). */ - ErasedScopeGuard sendSubscriptionUpdate (MUID device, - const PropertySubscriptionHeader& header, - Span body, - std::function callback); + std::optional sendSubscriptionUpdate (MUID device, + const PropertySubscriptionHeader& header, + Span body, + std::function callback); /** Terminates a subscription that was started by a remote device. diff --git a/modules/juce_midi_ci/ci/juce_CISubscriptionManager.cpp b/modules/juce_midi_ci/ci/juce_CISubscriptionManager.cpp new file mode 100644 index 0000000000..0d7c0957b1 --- /dev/null +++ b/modules/juce_midi_ci/ci/juce_CISubscriptionManager.cpp @@ -0,0 +1,812 @@ +/* + ============================================================================== + + This file is part of the JUCE library. + Copyright (c) 2022 - Raw Material Software Limited + + JUCE is an open source library subject to commercial or open-source + licensing. + + By using JUCE, you agree to the terms of both the JUCE 7 End-User License + Agreement and JUCE Privacy Policy. + + End User License Agreement: www.juce.com/juce-7-licence + Privacy Policy: www.juce.com/juce-privacy-policy + + Or: You may also use this code under the terms of the GPL v3 (see + www.gnu.org/licenses). + + JUCE IS PROVIDED "AS IS" WITHOUT ANY WARRANTY, AND ALL WARRANTIES, WHETHER + EXPRESSED OR IMPLIED, INCLUDING MERCHANTABILITY AND FITNESS FOR PURPOSE, ARE + DISCLAIMED. + + ============================================================================== +*/ + +namespace juce::midi_ci +{ + +struct RequestRetryQueueEntry +{ + PropertySubscriptionHeader msg; + Token64 key{}; ///< A unique identifier for this message + bool inFlight = false; ///< True if the message has been sent and we're waiting for a reply, false otherwise +}; + +/* + A queue to store pending property exchange messages. + + A property exchange message may fail to send because the initiator doesn't have enough vacant + property exchange IDs. + Similarly, if the responder doesn't have enough vacant IDs, then it may tell us to retry the + request. + + We store messages that we're planning to send, and mark them as in-flight once we've attempted + to send them. + We always try to send the first not-in-flight message in the queue. + If the responder informs us that the message was actioned, or there was an unrecoverable error, + then we can remove the message from the queue. We can also remove the message if the user + decides that the message is no longer important. + Otherwise, if the message wasn't sent successfully, we leave the message at its current + position in the queue, and mark it as not-in-flight again. +*/ +class RequestRetryQueue +{ + using Entry = RequestRetryQueueEntry; + +private: + auto getIter (Token64 k) + { + const auto iter = std::lower_bound (entries.begin(), + entries.end(), + (uint64_t) k, + [] (const Entry& e, uint64_t v) { return (uint64_t) e.key < v; }); + return iter != entries.end() && iter->key == k ? iter : entries.end(); + } + +public: + /* Add a new message at the end of the queue, and return the entry for that message. */ + Entry* add (PropertySubscriptionHeader msg) + { + const auto key = ++lastKey; + + entries.push_back (Entry { std::move (msg), Token64 { key }, false }); + return &entries.back(); + } + + /* Erase the entry for a given key. */ + std::optional erase (Token64 k) + { + const auto iter = getIter (k); + + if (iter == entries.end()) + return {}; + + auto result = std::move (*iter); + entries.erase (iter); + return result; + } + + /* Find the next entry that should be sent, and return it after marking it as in-flight. */ + const Entry* markNextInFlight() + { + const auto iter = std::find_if (entries.begin(), entries.end(), [] (const Entry& e) { return ! e.inFlight; }); + + if (iter == entries.end()) + return nullptr; + + iter->inFlight = true; + return &*iter; + } + + void markNotInFlight (Token64 k) + { + const auto iter = getIter (k); + + if (iter != entries.end()) + iter->inFlight = false; + } + +private: + std::vector entries; + uint64_t lastKey = 0; +}; + +/** + Info about a particular subscription. + + You can think of this as a subscription agreement as identified by a subscribeId, but this + also holds state that is necessary to negotiate the subscribeId. +*/ +struct SubscriptionState +{ + // If we're waiting to send this subscription request, this is monostate + // If the request has been sent, but we haven't received a reply, this is the id of the request + // If the subscription started successfully, this is the subscribeId for the subscription + std::variant state; + String resource; +}; + +/** + Info about all the subscriptions requested of a particular device/MUID. + This keeps track of the order in which subscription requests are made, so that requests can + be re-tried in order if the initial sending of a request fails. +*/ +class DeviceSubscriptionStates +{ +public: + Token64 postToQueue (const PropertySubscriptionHeader& header) + { + return queue.add (header)->key; + } + + Token64 beginSubscription (const PropertySubscriptionHeader& header) + { + jassert (header.command == PropertySubscriptionCommand::start); + + auto headerCopy = header; + headerCopy.command = PropertySubscriptionCommand::start; + + const auto key = postToQueue (headerCopy); + stateForSubscription[key].resource = headerCopy.resource; + + return key; + } + + std::optional endSubscription (Token64 key) + { + queue.erase (key); + + const auto iter = stateForSubscription.find (key); + + if (iter == stateForSubscription.end()) + return {}; + + auto subInfo = iter->second; + stateForSubscription.erase (iter); + + return { std::move (subInfo) }; + } + + std::vector endSubscription (String subscribeId) + { + std::vector ended; + + for (auto it = stateForSubscription.begin(); it != stateForSubscription.end();) + { + if (const auto* id = std::get_if (&it->second.state)) + { + if (*id == subscribeId) + { + ended.push_back (it->first); + queue.erase (it->first); + it = stateForSubscription.erase (it); + continue; + } + } + + ++it; + } + + return ended; + } + + void endAll() + { + for (auto& item : stateForSubscription) + queue.erase (item.first); + + stateForSubscription.clear(); + } + + void resetKey (Token64 key) + { + const auto iter = stateForSubscription.find (key); + + if (iter != stateForSubscription.end()) + iter->second.state = std::monostate{}; + + queue.markNotInFlight (key); + } + + void setRequestIdForKey (Token64 key, Token64 request) + { + const auto iter = stateForSubscription.find (key); + + if (iter != stateForSubscription.end()) + iter->second.state = request; + } + + void setSubscribeIdForKey (Token64 key, String subscribeId) + { + const auto iter = stateForSubscription.find (key); + + if (iter != stateForSubscription.end()) + iter->second.state = subscribeId; + + queue.erase (key); + } + + auto* markNextInFlight() + { + return queue.markNextInFlight(); + } + + std::optional getInfoForSubscriptionKey (Token64 key) const + { + const auto iter = stateForSubscription.find (key); + + if (iter != stateForSubscription.end()) + return iter->second; + + return {}; + } + + auto begin() const { return stateForSubscription.begin(); } + auto end() const { return stateForSubscription.end(); } + +private: + RequestRetryQueue queue; + std::map stateForSubscription; +}; + +class SubscriptionManager::Impl : public std::enable_shared_from_this, + private DeviceListener +{ +public: + explicit Impl (SubscriptionManagerDelegate& d) + : delegate (d) {} + + SubscriptionKey beginSubscription (MUID m, const PropertySubscriptionHeader& header) + { + const auto key = infoForMuid[m].beginSubscription (header); + sendPendingMessages(); + return SubscriptionKey { m, key }; + } + + void endSubscription (SubscriptionKey key) + { + const auto iter = infoForMuid.find (key.getMuid()); + + if (iter == infoForMuid.end()) + return; + + const auto ended = iter->second.endSubscription (key.getKey()); + + if (! ended.has_value()) + return; + + if (auto* request = std::get_if (&ended->state)) + { + delegate.abortPropertyRequest ({ key.getMuid(), *request }); + } + else if (auto* subscribeId = std::get_if (&ended->state)) + { + PropertySubscriptionHeader header; + header.command = PropertySubscriptionCommand::end; + header.subscribeId = *subscribeId; + iter->second.postToQueue (header); + sendPendingMessages(); + } + } + + void endSubscriptionFromResponder (MUID m, String sub) + { + const auto iter = infoForMuid.find (m); + + if (iter != infoForMuid.end()) + for (const auto& ended : iter->second.endSubscription (sub)) + delegate.propertySubscriptionChanged ({ m, ended }, std::nullopt); + } + + void endSubscriptionsFromResponder (MUID m) + { + const auto iter = infoForMuid.find (m); + + if (iter == infoForMuid.end()) + return; + + std::vector tokens; + std::transform (iter->second.begin(), + iter->second.end(), + std::back_inserter (tokens), + [] (const auto& p) { return p.first; }); + + iter->second.endAll(); + + for (const auto& ended : tokens) + delegate.propertySubscriptionChanged ({ m, ended }, std::nullopt); + } + + std::vector getOngoingSubscriptions() const + { + std::vector result; + + for (const auto& pair : infoForMuid) + for (const auto& info : pair.second) + result.emplace_back (pair.first, info.first); + + return result; + } + + std::optional getInfoForSubscriptionKey (SubscriptionKey key) const + { + const auto iter = infoForMuid.find (key.getMuid()); + + if (iter != infoForMuid.end()) + return iter->second.getInfoForSubscriptionKey (key.getKey()); + + return {}; + } + + bool sendPendingMessages() + { + // Note: not using any_of here because we don't want the early-exit behaviour + bool result = true; + + for (auto& pair : infoForMuid) + result &= sendPendingMessages (pair.first, pair.second); + + return result; + } + +private: + void handleReply (SubscriptionKey subscriptionKey, PropertySubscriptionCommand command, const PropertyExchangeResult& r) + { + const auto iter = infoForMuid.find (subscriptionKey.getMuid()); + + if (iter == infoForMuid.end()) + return; + + auto& second = iter->second; + + if (const auto error = r.getError()) + { + // If the responder requested a retry, keep the message in the queue so that + // it can be re-sent + if (*error == PropertyExchangeResult::Error::tooManyTransactions) + { + second.resetKey (subscriptionKey.getKey()); + return; + } + + // We tried to begin or end a subscription, but the responder said no! + // If the responder declined to start a subscription, we can just + // mark the subscription as ended. + // If the responder declined to end a subscription, that's a bit trickier. + // Hopefully this won't happen in practice, because all the options to resolve are pretty bad: + // - One option is to ignore the failure. The remote device can carry on sending us updates. + // This might be a bit dangerous if we repeatedly subscribe and then fail to unsubscribe, as this + // would result in lots of redundant subscription messages that could clog the connection. + // - Another option is to store the subscription-end request and to attempt to send it again later. + // This also has the potential to clog up the connection, depending on how frequently we attempt + // to re-send failed messages. Given that unsubscribing has already failed once, there's no + // guarantee that any future attempts will succeed, so we might end up in a loop, sending the + // same message over and over. + // On balance, I think the former option is best for now. If this ends up being an issue in + // practice, perhaps we could add a mechanism to do exponential back-off, but that would + // add complexity that isn't necessarily required. + jassert (*error != PropertyExchangeResult::Error::notify); + + // If we failed to begin a subscription, then the subscription never started, + // and we should remove it from the set of ongoing subscriptions. + second.endSubscription (subscriptionKey.getKey()); + + // We only need to alert the delegate if the subscription failed to start. + // If the subscription fails to end, we'll treat the subscription as ended anyway. + if (command == PropertySubscriptionCommand::start) + delegate.propertySubscriptionChanged (subscriptionKey, std::nullopt); + + return; + } + + if (command == PropertySubscriptionCommand::start) + { + second.setSubscribeIdForKey (subscriptionKey.getKey(), r.getHeaderAsSubscriptionHeader().subscribeId); + delegate.propertySubscriptionChanged (subscriptionKey, r.getHeaderAsSubscriptionHeader().subscribeId); + } + } + + bool sendPendingMessages (MUID m, DeviceSubscriptionStates& info) + { + while (auto* entry = info.markNextInFlight()) + { + const auto requestKind = entry->msg.command; + const SubscriptionKey subscriptionKey { m, entry->key }; + + auto cb = [weak = weak_from_this(), requestKind, subscriptionKey] (const PropertyExchangeResult& r) + { + if (const auto locked = weak.lock()) + locked->handleReply (subscriptionKey, requestKind, r); + }; + + if (const auto request = delegate.sendPropertySubscribe (m, entry->msg, std::move (cb))) + { + if (entry->msg.command == PropertySubscriptionCommand::start) + info.setRequestIdForKey (entry->key, request->getKey()); + } + else + { + // Couldn't find a valid ID to use, so we must have exhausted all message slots. + // There's no point trying to send the rest of the messages that are queued for this + // MUID, so give up. It's probably a good idea to try again in a bit. + info.resetKey (entry->key); + return false; + } + } + + return true; + } + + SubscriptionManagerDelegate& delegate; + std::map infoForMuid; +}; + +//============================================================================== +SubscriptionManager::SubscriptionManager (SubscriptionManagerDelegate& delegate) + : pimpl (std::make_shared (delegate)) {} + +SubscriptionKey SubscriptionManager::beginSubscription (MUID m, const PropertySubscriptionHeader& header) +{ + return pimpl->beginSubscription (m, header); +} + +void SubscriptionManager::endSubscription (SubscriptionKey key) +{ + pimpl->endSubscription (key); +} + +void SubscriptionManager::endSubscriptionFromResponder (MUID m, String sub) +{ + pimpl->endSubscriptionFromResponder (m, sub); +} + +void SubscriptionManager::endSubscriptionsFromResponder (MUID m) +{ + pimpl->endSubscriptionsFromResponder (m); +} + +std::vector SubscriptionManager::getOngoingSubscriptions() const +{ + return pimpl->getOngoingSubscriptions(); +} + +std::optional SubscriptionManager::getSubscribeIdForKey (SubscriptionKey key) const +{ + if (const auto info = pimpl->getInfoForSubscriptionKey (key)) + if (const auto* subscribeId = std::get_if (&info->state)) + return *subscribeId; + + return {}; +} + +std::optional SubscriptionManager::getResourceForKey (SubscriptionKey key) const +{ + if (const auto info = pimpl->getInfoForSubscriptionKey (key)) + return info->resource; + + return {}; +} + +bool SubscriptionManager::sendPendingMessages() +{ + return pimpl->sendPendingMessages(); +} + +//============================================================================== +//============================================================================== +#if JUCE_UNIT_TESTS + +class SubscriptionTests : public UnitTest +{ +public: + SubscriptionTests() : UnitTest ("Subscription", UnitTestCategories::midi) {} + + void runTest() override + { + auto random = getRandom(); + + class Delegate : public SubscriptionManagerDelegate + { + public: + std::optional sendPropertySubscribe (MUID m, + const PropertySubscriptionHeader&, + std::function cb) override + { + ++sendCount; + + if (! sendShouldSucceed) + return {}; + + const RequestKey key { m, Token64 { ++lastKey } }; + callbacks[key] = std::move (cb); + + return key; + } + + void abortPropertyRequest (RequestKey k) override + { + ++abortCount; + callbacks.erase (k); + } + + void propertySubscriptionChanged (SubscriptionKey, const std::optional&) override + { + subChanged = true; + } + + void setSendShouldSucceed (bool b) { sendShouldSucceed = b; } + + void sendResult (RequestKey key, const PropertyExchangeResult& r) + { + const auto iter = callbacks.find (key); + + if (iter != callbacks.end()) + NullCheckedInvocation::invoke (iter->second, r); + + callbacks.erase (key); + } + + std::vector getOngoingRequests() const + { + std::vector result; + result.reserve (callbacks.size()); + std::transform (callbacks.begin(), callbacks.end(), std::back_inserter (result), [] (const auto& p) { return p.first; }); + return result; + } + + uint64_t getAndClearSendCount() { return std::exchange (sendCount, 0); } + uint64_t getAndClearAbortCount() { return std::exchange (abortCount, 0); } + bool getAndClearSubChanged() { return std::exchange (subChanged, false); } + + private: + std::map> callbacks; + uint64_t sendCount = 0, abortCount = 0; + uint64_t lastKey = 0; + bool sendShouldSucceed = true, subChanged = false; + }; + + Delegate delegate; + SubscriptionManager manager { delegate }; + + const auto inquiryMUID = MUID::makeRandom (random); + + beginTest ("Beginning a subscription and ending it before the remote device replies aborts the request"); + { + PropertySubscriptionHeader header; + header.command = PropertySubscriptionCommand::start; + header.resource = "X-CustomProp"; + + const auto a = manager.beginSubscription (inquiryMUID, header); + + expect (manager.getOngoingSubscriptions() == std::vector { a }); + expect (delegate.getAndClearSendCount() == 1); + + // Sending a subscription request uses a request slot + expect (delegate.getOngoingRequests().size() == 1); + const auto request = delegate.getOngoingRequests().back(); + + // subscription id is empty until the responder confirms the subscription + expect (manager.getResourceForKey (a) == header.resource); + + manager.endSubscription (a); + + expect (delegate.getOngoingRequests().empty()); + expect (delegate.getAndClearAbortCount() == 1); + expect (manager.getOngoingSubscriptions().empty()); + + const auto successHeader = [] + { + auto ptr = std::make_unique(); + ptr->setProperty ("status", 200); + ptr->setProperty ("subscribeId", "anId"); + return var { ptr.release() }; + }(); + + delegate.sendResult (request, PropertyExchangeResult { successHeader, {} }); + + // Already ended, the confirmation shouldn't do anything + expect (! delegate.getAndClearSubChanged()); + expect (delegate.getOngoingRequests().empty()); + expect (delegate.getAndClearSendCount() == 0); + expect (delegate.getAndClearAbortCount() == 0); + + // There shouldn't be any queued messages. + expect (manager.sendPendingMessages()); + expect (! delegate.getAndClearSubChanged()); + expect (delegate.getOngoingRequests().empty()); + expect (delegate.getAndClearSendCount() == 0); + expect (delegate.getAndClearAbortCount() == 0); + } + + beginTest ("Starting a new subscription while the device is waiting for a previous subscription to be confirmed queues further requests"); + { + PropertySubscriptionHeader header; + header.command = PropertySubscriptionCommand::start; + header.resource = "X-CustomProp"; + + delegate.setSendShouldSucceed (false); + const auto a = manager.beginSubscription (inquiryMUID, header); + expect (delegate.getAndClearSendCount() == 1); + + expect (! manager.sendPendingMessages()); + expect (delegate.getAndClearSendCount() == 1); + + delegate.setSendShouldSucceed (true); + expect (manager.sendPendingMessages()); + expect (delegate.getAndClearSendCount() == 1); + + delegate.setSendShouldSucceed (false); + const auto b = manager.beginSubscription (inquiryMUID, header); + const auto c = manager.beginSubscription (inquiryMUID, header); + + expect (manager.getOngoingSubscriptions() == std::vector { a, b, c }); + expect (delegate.getOngoingRequests().size() == 1); + + // subscription id is empty until the responder confirms the subscription + expect (manager.getResourceForKey (a) == header.resource); + expect (manager.getResourceForKey (b) == header.resource); + expect (manager.getResourceForKey (c) == header.resource); + + expect (delegate.getAndClearSendCount() == 2); + expect (delegate.getAndClearAbortCount() == 0); + + delegate.setSendShouldSucceed (true); + + // The device has sent a subscription start for a, but not for c, + // so it should send a notify to end subscription a, but shouldn't emit any + // messages related to subscription c. + manager.endSubscription (a); + manager.endSubscription (c); + + expect (manager.getOngoingSubscriptions() == std::vector { b }); + + expect (delegate.getOngoingRequests().empty()); + expect (delegate.getAndClearSendCount() == 0); + expect (delegate.getAndClearAbortCount() == 1); + + // There should still be requests related to subscription b pending + expect (manager.sendPendingMessages()); + expect (delegate.getOngoingRequests().size() == 1); + expect (delegate.getAndClearSendCount() == 1); + expect (delegate.getAndClearAbortCount() == 0); + + // Now, we should send a terminate request for subscription b + manager.endSubscription (b); + + expect (delegate.getOngoingRequests().empty()); + expect (delegate.getAndClearSendCount() == 0); + expect (delegate.getAndClearAbortCount() == 1); + + // The manager never received any replies, so it shouldn't have notified listeners about + // changed subscriptions + expect (! delegate.getAndClearSubChanged()); + } + + beginTest ("If the device receives a retry or notify in response to a subscription start request, the subscription is retried or terminated as necessary"); + { + PropertySubscriptionHeader header; + header.command = PropertySubscriptionCommand::start; + header.resource = "X-CustomProp"; + + const auto a = manager.beginSubscription (inquiryMUID, header); + + expect (manager.getOngoingSubscriptions() == std::vector { a }); + expect (delegate.getOngoingRequests().size() == 1); + expect (delegate.getAndClearSendCount() == 1); + expect (delegate.getAndClearAbortCount() == 0); + + delegate.sendResult (delegate.getOngoingRequests().back(), PropertyExchangeResult { PropertyExchangeResult::Error::tooManyTransactions }); + + // The subscription is still active from the perspective of the manager, but the + // first request is over and should be retried + expect (manager.getOngoingSubscriptions() == std::vector { a }); + expect (! delegate.getAndClearSubChanged()); + expect (delegate.getOngoingRequests().empty()); + expect (delegate.getAndClearSendCount() == 0); + expect (delegate.getAndClearAbortCount() == 0); + + expect (manager.sendPendingMessages()); + + expect (manager.getOngoingSubscriptions() == std::vector { a }); + expect (delegate.getOngoingRequests().size() == 1); + expect (delegate.getAndClearSendCount() == 1); + expect (delegate.getAndClearAbortCount() == 0); + + delegate.sendResult (delegate.getOngoingRequests().back(), PropertyExchangeResult { PropertyExchangeResult::Error::notify }); + + // The request was terminated by the responder, so the delegate should get a sub-changed message + expect (delegate.getAndClearSubChanged()); + expect (manager.getOngoingSubscriptions().empty()); + expect (delegate.getOngoingRequests().empty()); + expect (delegate.getAndClearSendCount() == 0); + expect (delegate.getAndClearAbortCount() == 0); + + expect (manager.sendPendingMessages()); + expect (! delegate.getAndClearSubChanged()); + expect (manager.getOngoingSubscriptions().empty()); + expect (delegate.getOngoingRequests().empty()); + expect (delegate.getAndClearSendCount() == 0); + expect (delegate.getAndClearAbortCount() == 0); + } + + beginTest ("If the device receives a retry or notify in response to a subscription end request, the subscription is retried as necessary"); + { + PropertySubscriptionHeader header; + header.command = PropertySubscriptionCommand::start; + header.resource = "X-CustomProp"; + + const auto a = manager.beginSubscription (inquiryMUID, header); + + expect (manager.getOngoingSubscriptions() == std::vector { a }); + expect (manager.getResourceForKey (a) == header.resource); + expect (! manager.getSubscribeIdForKey (a).has_value()); + + const auto subscriptionResponseHeader = [] + { + auto ptr = std::make_unique(); + ptr->setProperty ("status", 200); + ptr->setProperty ("subscribeId", "newId"); + return ptr.release(); + }(); + + // Accept the subscription + delegate.sendResult (delegate.getOngoingRequests().back(), PropertyExchangeResult { subscriptionResponseHeader, {} }); + + // The subscription is still active from the perspective of the device, but the + // request is over and should be retried + expect (manager.getOngoingSubscriptions() == std::vector { a }); + expect (delegate.getAndClearSubChanged()); + // Now that the subscription was accepted, the subscription id should be non-empty + expect (manager.getResourceForKey (a) == header.resource); + expect (manager.getSubscribeIdForKey (a) == "newId"); + expect (delegate.getOngoingRequests().empty()); + expect (delegate.getAndClearSendCount() == 1); + expect (delegate.getAndClearAbortCount() == 0); + + manager.endSubscription (a); + + expect (manager.getOngoingSubscriptions().empty()); + expect (! delegate.getAndClearSubChanged()); + expect (delegate.getOngoingRequests().size() == 1); + expect (delegate.getAndClearSendCount() == 1); + expect (delegate.getAndClearAbortCount() == 0); + + // The responder is busy, can't process the subscription end + delegate.sendResult (delegate.getOngoingRequests().back(), PropertyExchangeResult { PropertyExchangeResult::Error::tooManyTransactions }); + + expect (manager.getOngoingSubscriptions().empty()); + expect (delegate.getOngoingRequests().empty()); + expect (delegate.getAndClearSendCount() == 0); + expect (delegate.getAndClearAbortCount() == 0); + + expect (manager.sendPendingMessages()); + expect (manager.getOngoingSubscriptions().empty()); + expect (delegate.getOngoingRequests().size() == 1); + expect (delegate.getAndClearSendCount() == 1); + expect (delegate.getAndClearAbortCount() == 0); + + // The responder told us to immediately terminate our request to end the subscription! + // It's unclear how this should behave, so we'll just ignore the failure and assume + // the subscription is really over. + delegate.sendResult (delegate.getOngoingRequests().back(), PropertyExchangeResult { PropertyExchangeResult::Error::notify }); + + expect (manager.getOngoingSubscriptions().empty()); + expect (delegate.getOngoingRequests().empty()); + expect (delegate.getAndClearSendCount() == 0); + expect (delegate.getAndClearAbortCount() == 0); + + expect (manager.sendPendingMessages()); + expect (delegate.getAndClearSendCount() == 0); + expect (delegate.getAndClearAbortCount() == 0); + + expect (! delegate.getAndClearSubChanged()); + } + } +}; + +static SubscriptionTests subscriptionTests; + +#endif + +} // namespace juce::midi_ci diff --git a/modules/juce_midi_ci/ci/juce_CISubscriptionManager.h b/modules/juce_midi_ci/ci/juce_CISubscriptionManager.h new file mode 100644 index 0000000000..bd0d11e0f5 --- /dev/null +++ b/modules/juce_midi_ci/ci/juce_CISubscriptionManager.h @@ -0,0 +1,167 @@ +/* + ============================================================================== + + This file is part of the JUCE library. + Copyright (c) 2022 - Raw Material Software Limited + + JUCE is an open source library subject to commercial or open-source + licensing. + + By using JUCE, you agree to the terms of both the JUCE 7 End-User License + Agreement and JUCE Privacy Policy. + + End User License Agreement: www.juce.com/juce-7-licence + Privacy Policy: www.juce.com/juce-privacy-policy + + Or: You may also use this code under the terms of the GPL v3 (see + www.gnu.org/licenses). + + JUCE IS PROVIDED "AS IS" WITHOUT ANY WARRANTY, AND ALL WARRANTIES, WHETHER + EXPRESSED OR IMPLIED, INCLUDING MERCHANTABILITY AND FITNESS FOR PURPOSE, ARE + DISCLAIMED. + + ============================================================================== +*/ + +namespace juce::midi_ci +{ + +/** + A key used to uniquely identify ongoing property subscriptions initiated by a ci::Device. + + @tags{Audio} +*/ +class SubscriptionKey +{ + auto tie() const { return std::tuple (m, v); } + +public: + /** Constructor */ + SubscriptionKey() = default; + + /** Constructor */ + SubscriptionKey (MUID muid, Token64 key) : m (muid), v (key) {} + + /** Returns the muid of the device to which we are subscribed. */ + MUID getMuid() const { return m; } + + /** Returns an identifier unique to this subscription. */ + Token64 getKey() const { return v; } + + /** Equality operator. */ + bool operator== (const SubscriptionKey& other) const { return tie() == other.tie(); } + + /** Inequality operator. */ + bool operator!= (const SubscriptionKey& other) const { return tie() != other.tie(); } + + /** Less-than operator. */ + bool operator< (const SubscriptionKey& other) const { return tie() < other.tie(); } + +private: + MUID m = MUID::getBroadcast(); + Token64 v{}; +}; + +/** + Functions used by a SubscriptionManager to negotiate subscriptions. + + @tags{Audio} +*/ +struct SubscriptionManagerDelegate +{ + virtual ~SubscriptionManagerDelegate() = default; + + /** Called when the manager wants to send an update. */ + virtual std::optional sendPropertySubscribe (MUID m, + const PropertySubscriptionHeader& header, + std::function onResult) = 0; + + /** Called by the manager to cancel a previous request. */ + virtual void abortPropertyRequest (RequestKey) = 0; + + /** Called by the manager when the remote device provides a subscribeId, or when it + terminates a subscription. + */ + virtual void propertySubscriptionChanged (SubscriptionKey, const std::optional&) = 0; +}; + +/** + Manages subscriptions to properties on remote devices. + + Occasionally, sending a subscription-begin request may fail, in which case the request will be + cached. Cached requests will be sent during a future call to sendPendingMessages(). + + To use this: + - pass a SubscriptionManagerDelegate (such as a ci::Device) to the constructor + - call sendPendingMessages() periodically, e.g. in a timer callback + + @tags{Audio} +*/ +class SubscriptionManager +{ +public: + /** Constructor. + + The delegate functions will be called when necessary to start and cancel property requests. + */ + explicit SubscriptionManager (SubscriptionManagerDelegate& delegate); + + /** Attempts to begin a subscription using the provided details. + + @returns a token that uniquely identifies this subscription. This token can be passed to + endSubscription to terminate an ongoing subscription. + */ + SubscriptionKey beginSubscription (MUID m, const PropertySubscriptionHeader& header); + + /** Ends an ongoing subscription by us. + + If the subscription begin request hasn't been sent yet, then this will just cancel the cached request. + + If a subscription begin request has been sent, but no response has been received, this will + send a notification cancelling the initial request via SubscriptionManagerDelegate::abortPropertyRequest(). + + If the subscription has started successfully, then this will send a subscription end request + via SubscriptionManagerDelegate::sendPropertySubscribe(). + */ + void endSubscription (SubscriptionKey); + + /** Ends an ongoing subscription as requested from the remote device. + + Unlike the other overload of endSubscription, this won't notify the delegate. It will only + update the internal record of active subscriptions. + + Calls Delegate::propertySubscriptionChanged(). + */ + void endSubscriptionFromResponder (MUID, String); + + /** Ends all ongoing subscriptions as requested from a remote device. + + Calls Delegate::propertySubscriptionChanged(). + */ + void endSubscriptionsFromResponder (MUID); + + /** Returns all of the subscriptions that have been initiated by this manager. */ + std::vector getOngoingSubscriptions() const; + + /** If the provided subscription has started successfully, this returns the subscribeId assigned + to the subscription by the remote device. + */ + std::optional getSubscribeIdForKey (SubscriptionKey key) const; + + /** If the provided subscription has not been cancelled, this returns the name of the + subscribed resource. + */ + std::optional getResourceForKey (SubscriptionKey key) const; + + /** Sends any cached messages that need retrying. + + @returns true if there are no more messages to send, or false otherwise + */ + bool sendPendingMessages(); + +private: + class Impl; + std::shared_ptr pimpl; +}; + +} // namespace juce::midi_ci diff --git a/modules/juce_midi_ci/detail/juce_CIPropertyHostUtils.h b/modules/juce_midi_ci/detail/juce_CIPropertyHostUtils.h index 0a4ddf388e..f04dc2453b 100644 --- a/modules/juce_midi_ci/detail/juce_CIPropertyHostUtils.h +++ b/modules/juce_midi_ci/detail/juce_CIPropertyHostUtils.h @@ -51,28 +51,5 @@ struct PropertyHostUtils std::for_each (chunker.begin(), chunker.end(), [&] (auto) { output.send (group); }); } - - static auto getTerminator (BufferOutput& output, FunctionBlock fb, MUID them) - { - const auto us = output.getMuid(); - return [&output, fb, us, them] (std::byte id) - { - const Message::Header notifyHeader - { - ChannelInGroup::wholeBlock, - detail::MessageMeta::Meta::subID2, - detail::MessageMeta::implementationVersion, - us, - them, - }; - - const auto jsonHeader = Encodings::jsonTo7BitText (JSONUtils::makeObjectWithKeyFirst ({ { "status", 144 } }, "status")); - detail::MessageTypeUtils::send (output, - fb.firstGroup, - notifyHeader, - Message::PropertyNotify { { id, jsonHeader, 1, 1, {} } }); - }; - } - }; } // namespace juce::midi_ci::detail diff --git a/modules/juce_midi_ci/juce_midi_ci.cpp b/modules/juce_midi_ci/juce_midi_ci.cpp index db9a407740..1fa9d5cc95 100644 --- a/modules/juce_midi_ci/juce_midi_ci.cpp +++ b/modules/juce_midi_ci/juce_midi_ci.cpp @@ -53,3 +53,4 @@ #include #include #include +#include diff --git a/modules/juce_midi_ci/juce_midi_ci.h b/modules/juce_midi_ci/juce_midi_ci.h index 8a002ea702..5fe3cf98e1 100644 --- a/modules/juce_midi_ci/juce_midi_ci.h +++ b/modules/juce_midi_ci/juce_midi_ci.h @@ -76,6 +76,7 @@ #include #include #include +#include #include #include