1
0
Fork 0
mirror of https://github.com/juce-framework/JUCE.git synced 2026-01-10 23:44:24 +00:00

Removed some default parameters in NamedPipe methods and fixed time-outs when using posix NamedPipe::read().

This commit is contained in:
jules 2012-07-17 15:49:55 +01:00
parent 601f729bf7
commit 9b8a39e27c
7 changed files with 129 additions and 118 deletions

View file

@ -211,7 +211,7 @@ public:
{
ScopedPointer<DemoInterprocessConnection> newConnection (new DemoInterprocessConnection (*this));
openedOk = newConnection->createPipe (pipeName.getText());
openedOk = newConnection->createPipe (pipeName.getText(), 2000);
if (openedOk)
{

View file

@ -67,6 +67,31 @@ inline int getAddressDifference (Type1* pointer1, Type2* pointer2) noexcept { r
template <class Type>
inline Type* createCopyIfNotNull (Type* pointer) { return pointer != nullptr ? new Type (*pointer) : nullptr; }
//==============================================================================
#if JUCE_MAC || JUCE_IOS || DOXYGEN
/** A handy C++ wrapper that creates and deletes an NSAutoreleasePool object using RAII.
You should use the JUCE_AUTORELEASEPOOL macro to create a local auto-release pool on the stack.
*/
class JUCE_API ScopedAutoReleasePool
{
public:
ScopedAutoReleasePool();
~ScopedAutoReleasePool();
private:
void* pool;
JUCE_DECLARE_NON_COPYABLE (ScopedAutoReleasePool);
};
/** A macro that can be used to easily declare a local ScopedAutoReleasePool object for RAII-based obj-C autoreleasing. */
#define JUCE_AUTORELEASEPOOL const juce::ScopedAutoReleasePool JUCE_JOIN_MACRO (autoReleasePool_, __LINE__);
#else
#define JUCE_AUTORELEASEPOOL
#endif
//==============================================================================
/* In a Windows DLL build, we'll expose some malloc/free functions that live inside the DLL, and use these for
allocating all the objects - that way all juce objects in the DLL and in the host will live in the same heap,

View file

@ -49,15 +49,28 @@ public:
}
}
int read (char* destBuffer, int maxBytesToRead)
static int openPipe (const String& name, int flags, const uint32 timeoutEnd)
{
for (;;)
{
const int p = ::open (name.toUTF8(), flags);
if (p != -1 || hasExpired (timeoutEnd))
return p;
Thread::sleep (2);
}
}
int read (char* destBuffer, int maxBytesToRead, int timeOutMilliseconds)
{
int bytesRead = -1;
blocked = true;
const uint32 timeoutEnd = getTimeoutEnd (timeOutMilliseconds);
if (pipeIn == -1)
{
pipeIn = ::open ((createdPipe ? pipeInName
: pipeOutName).toUTF8(), O_RDWR);
pipeIn = openPipe (createdPipe ? pipeInName : pipeOutName, O_RDWR | O_NONBLOCK, timeoutEnd);
if (pipeIn == -1)
{
@ -73,10 +86,16 @@ public:
const int bytesThisTime = maxBytesToRead - bytesRead;
const int numRead = (int) ::read (pipeIn, destBuffer, bytesThisTime);
if (numRead <= 0 || stopReadOperation)
if (numRead <= 0)
{
bytesRead = -1;
break;
if (errno != EWOULDBLOCK || stopReadOperation || hasExpired (timeoutEnd))
{
bytesRead = -1;
break;
}
Thread::yield();
continue;
}
bytesRead += numRead;
@ -90,21 +109,19 @@ public:
int write (const char* sourceBuffer, int numBytesToWrite, int timeOutMilliseconds)
{
int bytesWritten = -1;
const uint32 timeoutEnd = getTimeoutEnd (timeOutMilliseconds);
if (pipeOut == -1)
{
pipeOut = ::open ((createdPipe ? pipeOutName
: pipeInName).toUTF8(), O_WRONLY);
pipeOut = openPipe (createdPipe ? pipeOutName : pipeInName, O_WRONLY, timeoutEnd);
if (pipeOut == -1)
return -1;
}
bytesWritten = 0;
const uint32 timeOutTime = Time::getMillisecondCounter() + timeOutMilliseconds;
while (bytesWritten < numBytesToWrite
&& (timeOutMilliseconds < 0 || Time::getMillisecondCounter() < timeOutTime))
while (bytesWritten < numBytesToWrite && ! hasExpired (timeoutEnd))
{
const int bytesThisTime = numBytesToWrite - bytesWritten;
const int numWritten = (int) ::write (pipeOut, sourceBuffer, bytesThisTime);
@ -137,6 +154,16 @@ public:
private:
static void signalHandler (int) {}
static uint32 getTimeoutEnd (const int timeOutMilliseconds)
{
return timeOutMilliseconds >= 0 ? Time::getMillisecondCounter() + timeOutMilliseconds : 0;
}
static bool hasExpired (const uint32 timeoutEnd)
{
return timeoutEnd != 0 && Time::getMillisecondCounter() >= timeoutEnd;
}
JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR (Pimpl);
};
@ -193,9 +220,9 @@ bool NamedPipe::openInternal (const String& pipeName, const bool createPipe)
return true;
}
int NamedPipe::read (void* destBuffer, int maxBytesToRead, int /*timeOutMilliseconds*/)
int NamedPipe::read (void* destBuffer, int maxBytesToRead, int timeOutMilliseconds)
{
return pimpl != nullptr ? pimpl->read (static_cast <char*> (destBuffer), maxBytesToRead) : -1;
return pimpl != nullptr ? pimpl->read (static_cast <char*> (destBuffer), maxBytesToRead, timeOutMilliseconds) : -1;
}
int NamedPipe::write (const void* sourceBuffer, int numBytesToWrite, int timeOutMilliseconds)

View file

@ -31,7 +31,7 @@
/**
A cross-process pipe that can have data written to and read from it.
Two or more processes can use these for inter-process communication.
Two processes can use NamedPipe objects to exchange blocks of data.
@see InterprocessConnection
*/
@ -45,16 +45,13 @@ public:
/** Destructor. */
~NamedPipe();
//==============================================================================
/** Tries to open a pipe that already exists.
Returns true if it succeeds.
*/
bool openExisting (const String& pipeName);
/** Tries to create a new pipe.
Returns true if it succeeds.
*/
bool createNewPipe (const String& pipeName);
@ -81,18 +78,14 @@ public:
If timeOutMilliseconds is less than zero, it will wait indefinitely, otherwise
this is a maximum timeout for reading from the pipe.
*/
int read (void* destBuffer, int maxBytesToRead, int timeOutMilliseconds = 5000);
int read (void* destBuffer, int maxBytesToRead, int timeOutMilliseconds);
/** Writes some data to the pipe.
If the operation fails, it returns -1, otherwise, it will return the number of
bytes written.
@returns the number of bytes written, or -1 on failure.
*/
int write (const void* sourceBuffer, int numBytesToWrite,
int timeOutMilliseconds = 2000);
int write (const void* sourceBuffer, int numBytesToWrite, int timeOutMilliseconds);
/** If any threads are currently blocked on a read operation, this tells them to abort.
*/
/** If any threads are currently blocked on a read operation, this tells them to abort. */
void cancelPendingReads();
private:

View file

@ -43,24 +43,15 @@
See also SystemStats::getJUCEVersion() for a string version.
*/
#define JUCE_VERSION ((JUCE_MAJOR_VERSION << 16) + (JUCE_MINOR_VERSION << 8) + JUCE_BUILDNUMBER)
#define JUCE_VERSION ((JUCE_MAJOR_VERSION << 16) + (JUCE_MINOR_VERSION << 8) + JUCE_BUILDNUMBER)
//==============================================================================
#include "juce_TargetPlatform.h" // (sets up the various JUCE_WINDOWS, JUCE_MAC, etc flags)
//==============================================================================
#ifndef DOXYGEN
// These are old macros that are now deprecated: you should just use the juce namespace directly.
#define JUCE_NAMESPACE juce
#define BEGIN_JUCE_NAMESPACE namespace juce {
#define END_JUCE_NAMESPACE }
#endif
//==============================================================================
#include "juce_PlatformDefs.h"
// Now we'll include any OS headers we need.. (at this point we are outside the Juce namespace).
//==============================================================================
// Now we'll include some common OS headers..
#if JUCE_MSVC
#pragma warning (push)
#pragma warning (disable: 4514 4245 4100)
@ -112,6 +103,12 @@
#include <byteswap.h>
#endif
// undef symbols that are sometimes set by misguided 3rd-party headers..
#undef check
#undef TYPE_BOOL
#undef max
#undef min
//==============================================================================
// DLL building settings on Windows
#if JUCE_MSVC
@ -131,70 +128,40 @@
#endif
#endif
//==============================================================================
#ifndef JUCE_API
/** This macro is added to all juce public class declarations. */
#define JUCE_API
#define JUCE_API /**< This macro is added to all juce public class declarations. */
#endif
/** This macro is added to all juce public function declarations. */
#define JUCE_PUBLIC_FUNCTION JUCE_API JUCE_CALLTYPE
/** This turns on some non-essential bits of code that should prevent old code from compiling
in cases where method signatures have changed, etc.
*/
#if (! defined (JUCE_CATCH_DEPRECATED_CODE_MISUSE)) && JUCE_DEBUG && ! DOXYGEN
/** This turns on some non-essential bits of code that should prevent old code from compiling
in cases where method signatures have changed, etc.
*/
#define JUCE_CATCH_DEPRECATED_CODE_MISUSE 1
#endif
//==============================================================================
// Now include some basics that are needed by most of the Juce classes...
BEGIN_JUCE_NAMESPACE
extern JUCE_API bool JUCE_CALLTYPE juce_isRunningUnderDebugger();
#if JUCE_LOG_ASSERTIONS
extern JUCE_API void logAssertion (const char* filename, int lineNum) noexcept;
#ifndef DOXYGEN
#define JUCE_NAMESPACE juce // This old macro is deprecated: you should just use the juce namespace directly.
#endif
#undef max
#undef min
#include "../memory/juce_Memory.h"
#include "../maths/juce_MathsFunctions.h"
#include "../memory/juce_ByteOrder.h"
#include "../logging/juce_Logger.h"
#include "../memory/juce_LeakedObjectDetector.h"
// unbelievably, some system headers actually use macros to define these symbols:
#undef check
#undef TYPE_BOOL
//==============================================================================
#if JUCE_MAC || JUCE_IOS || DOXYGEN
// Now include some common headers...
namespace juce
{
extern JUCE_API bool JUCE_CALLTYPE juce_isRunningUnderDebugger();
/** A handy C++ wrapper that creates and deletes an NSAutoreleasePool object using RAII.
You should use the JUCE_AUTORELEASEPOOL macro to create a local auto-release pool on the stack.
*/
class JUCE_API ScopedAutoReleasePool
{
public:
ScopedAutoReleasePool();
~ScopedAutoReleasePool();
private:
void* pool;
JUCE_DECLARE_NON_COPYABLE (ScopedAutoReleasePool);
};
/** A macro that can be used to easily declare a local ScopedAutoReleasePool object for RAII-based obj-C autoreleasing. */
#define JUCE_AUTORELEASEPOOL const juce::ScopedAutoReleasePool JUCE_JOIN_MACRO (autoReleasePool_, __LINE__);
#else
#define JUCE_AUTORELEASEPOOL
#endif
END_JUCE_NAMESPACE
#if JUCE_LOG_ASSERTIONS
extern JUCE_API void logAssertion (const char* file, int line) noexcept;
#endif
#include "../memory/juce_Memory.h"
#include "../maths/juce_MathsFunctions.h"
#include "../memory/juce_ByteOrder.h"
#include "../logging/juce_Logger.h"
#include "../memory/juce_LeakedObjectDetector.h"
}
#endif // __JUCE_STANDARDHEADER_JUCEHEADER__

View file

@ -64,8 +64,7 @@ bool InterprocessConnection::connectToSocket (const String& hostName,
}
}
bool InterprocessConnection::connectToPipe (const String& pipeName,
const int pipeReceiveMessageTimeoutMs)
bool InterprocessConnection::connectToPipe (const String& pipeName, const int timeoutMs)
{
disconnect();
@ -74,7 +73,7 @@ bool InterprocessConnection::connectToPipe (const String& pipeName,
if (newPipe->openExisting (pipeName))
{
const ScopedLock sl (pipeAndSocketLock);
pipeReceiveMessageTimeout = pipeReceiveMessageTimeoutMs;
pipeReceiveMessageTimeout = timeoutMs;
initialiseWithPipe (newPipe.release());
return true;
}
@ -82,8 +81,7 @@ bool InterprocessConnection::connectToPipe (const String& pipeName,
return false;
}
bool InterprocessConnection::createPipe (const String& pipeName,
const int pipeReceiveMessageTimeoutMs)
bool InterprocessConnection::createPipe (const String& pipeName, const int timeoutMs)
{
disconnect();
@ -92,7 +90,7 @@ bool InterprocessConnection::createPipe (const String& pipeName,
if (newPipe->createNewPipe (pipeName))
{
const ScopedLock sl (pipeAndSocketLock);
pipeReceiveMessageTimeout = pipeReceiveMessageTimeoutMs;
pipeReceiveMessageTimeout = timeoutMs;
initialiseWithPipe (newPipe.release());
return true;
}
@ -163,7 +161,7 @@ bool InterprocessConnection::sendMessage (const MemoryBlock& message)
if (socket != nullptr)
bytesWritten = socket->write (messageData.getData(), (int) messageData.getSize());
else if (pipe != nullptr)
bytesWritten = pipe->write (messageData.getData(), (int) messageData.getSize());
bytesWritten = pipe->write (messageData.getData(), (int) messageData.getSize(), pipeReceiveMessageTimeout);
return bytesWritten == (int) messageData.getSize();
}
@ -171,7 +169,7 @@ bool InterprocessConnection::sendMessage (const MemoryBlock& message)
//==============================================================================
void InterprocessConnection::initialiseWithSocket (StreamingSocket* const socket_)
{
jassert (socket == 0);
jassert (socket == nullptr);
socket = socket_;
connectionMadeInt();
startThread();
@ -179,7 +177,7 @@ void InterprocessConnection::initialiseWithSocket (StreamingSocket* const socket
void InterprocessConnection::initialiseWithPipe (NamedPipe* const pipe_)
{
jassert (pipe == 0);
jassert (pipe == nullptr);
pipe = pipe_;
connectionMadeInt();
startThread();
@ -206,6 +204,8 @@ struct ConnectionStateMessage : public MessageManager::MessageBase
WeakReference<InterprocessConnection> owner;
bool connectionMade;
JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR (ConnectionStateMessage);
};
void InterprocessConnection::connectionMadeInt()
@ -264,18 +264,16 @@ void InterprocessConnection::deliverDataInt (const MemoryBlock& data)
//==============================================================================
bool InterprocessConnection::readNextMessageInt()
{
const int maximumMessageSize = 1024 * 1024 * 10; // sanity check
uint32 messageHeader[2];
const int bytes = socket != nullptr ? socket->read (messageHeader, sizeof (messageHeader), true)
: pipe ->read (messageHeader, sizeof (messageHeader), pipeReceiveMessageTimeout);
: pipe ->read (messageHeader, sizeof (messageHeader), -1);
if (bytes == sizeof (messageHeader)
&& ByteOrder::swapIfBigEndian (messageHeader[0]) == magicMessageHeader)
{
int bytesInMessage = (int) ByteOrder::swapIfBigEndian (messageHeader[1]);
if (bytesInMessage > 0 && bytesInMessage < maximumMessageSize)
if (bytesInMessage > 0)
{
MemoryBlock messageData ((size_t) bytesInMessage, true);
int bytesRead = 0;
@ -286,8 +284,10 @@ bool InterprocessConnection::readNextMessageInt()
return false;
const int numThisTime = jmin (bytesInMessage, 65536);
const int bytesIn = socket != nullptr ? socket->read (static_cast <char*> (messageData.getData()) + bytesRead, numThisTime, true)
: pipe ->read (static_cast <char*> (messageData.getData()) + bytesRead, numThisTime, pipeReceiveMessageTimeout);
void* const data = addBytesToPointer (messageData.getData(), bytesRead);
const int bytesIn = socket != nullptr ? socket->read (data, numThisTime, true)
: pipe ->read (data, numThisTime, -1);
if (bytesIn <= 0)
break;
@ -339,7 +339,7 @@ void InterprocessConnection::run()
}
else
{
Thread::sleep (2);
Thread::sleep (1);
}
}
else if (pipe != nullptr)

View file

@ -96,25 +96,26 @@ public:
an InterprocessConnection object and used createPipe() to create a pipe for this
to connect to.
You can optionally specify a timeout length to be passed to the NamedPipe::read() method.
@param pipeName the name to use for the pipe - this should be unique to your app
@param pipeReceiveMessageTimeoutMs a timeout length to be used when reading or writing
to the pipe, or -1 for an infinite timeout.
@returns true if it connects successfully.
@see createPipe, NamedPipe
*/
bool connectToPipe (const String& pipeName,
int pipeReceiveMessageTimeoutMs = -1);
bool connectToPipe (const String& pipeName, int pipeReceiveMessageTimeoutMs);
/** Tries to create a new pipe for other processes to connect to.
This creates a pipe with the given name, so that other processes can use
connectToPipe() to connect to the other end.
You can optionally specify a timeout length to be passed to the NamedPipe::read() method.
If another process is already using this pipe, this will fail and return false.
@param pipeName the name to use for the pipe - this should be unique to your app
@param pipeReceiveMessageTimeoutMs a timeout length to be used when reading or writing
to the pipe, or -1 for an infinite timeout.
@returns true if the pipe was created, or false if it fails (e.g. if another process is
already using using the pipe).
*/
bool createPipe (const String& pipeName,
int pipeReceiveMessageTimeoutMs = -1);
bool createPipe (const String& pipeName, int pipeReceiveMessageTimeoutMs);
/** Disconnects and closes any currently-open sockets or pipes. */
void disconnect();
@ -122,16 +123,14 @@ public:
/** True if a socket or pipe is currently active. */
bool isConnected() const;
/** Returns the socket that this connection is using (or null if it uses a pipe). */
/** Returns the socket that this connection is using (or nullptr if it uses a pipe). */
StreamingSocket* getSocket() const noexcept { return socket; }
/** Returns the pipe that this connection is using (or null if it uses a socket). */
/** Returns the pipe that this connection is using (or nullptr if it uses a socket). */
NamedPipe* getPipe() const noexcept { return pipe; }
/** Returns the name of the machine at the other end of this connection.
This will return an empty string if the other machine isn't known for
some reason.
This may return an empty string if the name is unknown.
*/
String getConnectedHostName() const;