diff --git a/modules/juce_core/native/juce_posix_NamedPipe.cpp b/modules/juce_core/native/juce_posix_NamedPipe.cpp index 959a54fcd7..efcad1c278 100644 --- a/modules/juce_core/native/juce_posix_NamedPipe.cpp +++ b/modules/juce_core/native/juce_posix_NamedPipe.cpp @@ -31,7 +31,7 @@ public: pipeOutName (pipePath + "_out"), pipeIn (-1), pipeOut (-1), createdPipe (createPipe), - blocked (false), stopReadOperation (false) + stopReadOperation (false) { signal (SIGPIPE, signalHandler); siginterrupt (SIGPIPE, 1); @@ -49,6 +49,98 @@ public: } } + int read (char* destBuffer, int maxBytesToRead, int timeOutMilliseconds) + { + const uint32 timeoutEnd = getTimeoutEnd (timeOutMilliseconds); + + if (pipeIn == -1) + { + pipeIn = openPipe (createdPipe ? pipeInName : pipeOutName, O_RDWR | O_NONBLOCK, timeoutEnd); + + if (pipeIn == -1) + return -1; + } + + int bytesRead = 0; + + while (bytesRead < maxBytesToRead) + { + const int bytesThisTime = maxBytesToRead - bytesRead; + const int numRead = (int) ::read (pipeIn, destBuffer, (size_t) bytesThisTime); + + if (numRead <= 0) + { + if (errno != EWOULDBLOCK || stopReadOperation || hasExpired (timeoutEnd)) + return -1; + + const int maxWaitingTime = 30; + waitForInput (pipeIn, timeoutEnd == 0 ? maxWaitingTime + : jmin (maxWaitingTime, + (int) (timeoutEnd - Time::getMillisecondCounter()))); + continue; + } + + bytesRead += numRead; + destBuffer += numRead; + } + + return bytesRead; + } + + int write (const char* sourceBuffer, int numBytesToWrite, int timeOutMilliseconds) + { + const uint32 timeoutEnd = getTimeoutEnd (timeOutMilliseconds); + + if (pipeOut == -1) + { + pipeOut = openPipe (createdPipe ? pipeOutName : pipeInName, O_WRONLY, timeoutEnd); + + if (pipeOut == -1) + return -1; + } + + int bytesWritten = 0; + + while (bytesWritten < numBytesToWrite && ! hasExpired (timeoutEnd)) + { + const int bytesThisTime = numBytesToWrite - bytesWritten; + const int numWritten = (int) ::write (pipeOut, sourceBuffer, (size_t) bytesThisTime); + + if (numWritten <= 0) + return -1; + + bytesWritten += numWritten; + sourceBuffer += numWritten; + } + + return bytesWritten; + } + + bool createFifos() const + { + return (mkfifo (pipeInName .toUTF8(), 0666) == 0 || errno == EEXIST) + && (mkfifo (pipeOutName.toUTF8(), 0666) == 0 || errno == EEXIST); + } + + const String pipeInName, pipeOutName; + int pipeIn, pipeOut; + + const bool createdPipe; + bool stopReadOperation; + +private: + static void signalHandler (int) {} + + static uint32 getTimeoutEnd (const int timeOutMilliseconds) + { + return timeOutMilliseconds >= 0 ? Time::getMillisecondCounter() + (uint32) timeOutMilliseconds : 0; + } + + static bool hasExpired (const uint32 timeoutEnd) + { + return timeoutEnd != 0 && Time::getMillisecondCounter() >= timeoutEnd; + } + static int openPipe (const String& name, int flags, const uint32 timeoutEnd) { for (;;) @@ -75,152 +167,25 @@ public: select (handle + 1, &rset, nullptr, 0, &timeout); } - int read (char* destBuffer, int maxBytesToRead, int timeOutMilliseconds) - { - int bytesRead = -1; - blocked = true; - const uint32 timeoutEnd = getTimeoutEnd (timeOutMilliseconds); - - if (pipeIn == -1) - { - pipeIn = openPipe (createdPipe ? pipeInName : pipeOutName, O_RDWR | O_NONBLOCK, timeoutEnd); - - if (pipeIn == -1) - { - blocked = false; - return -1; - } - } - - bytesRead = 0; - - while (bytesRead < maxBytesToRead) - { - const int bytesThisTime = maxBytesToRead - bytesRead; - const int numRead = (int) ::read (pipeIn, destBuffer, (size_t) bytesThisTime); - - if (numRead <= 0) - { - if (errno != EWOULDBLOCK || stopReadOperation || hasExpired (timeoutEnd)) - { - bytesRead = -1; - break; - } - - const int maxWaitingTime = 30; - waitForInput (pipeIn, timeoutEnd == 0 ? maxWaitingTime - : jmin (maxWaitingTime, - (int) (timeoutEnd - Time::getMillisecondCounter()))); - continue; - } - - bytesRead += numRead; - destBuffer += numRead; - } - - blocked = false; - return bytesRead; - } - - int write (const char* sourceBuffer, int numBytesToWrite, int timeOutMilliseconds) - { - int bytesWritten = -1; - const uint32 timeoutEnd = getTimeoutEnd (timeOutMilliseconds); - - if (pipeOut == -1) - { - pipeOut = openPipe (createdPipe ? pipeOutName : pipeInName, O_WRONLY, timeoutEnd); - - if (pipeOut == -1) - return -1; - } - - bytesWritten = 0; - - while (bytesWritten < numBytesToWrite && ! hasExpired (timeoutEnd)) - { - const int bytesThisTime = numBytesToWrite - bytesWritten; - const int numWritten = (int) ::write (pipeOut, sourceBuffer, (size_t) bytesThisTime); - - if (numWritten <= 0) - { - bytesWritten = -1; - break; - } - - bytesWritten += numWritten; - sourceBuffer += numWritten; - } - - return bytesWritten; - } - - bool createFifos() const - { - return (mkfifo (pipeInName .toUTF8(), 0666) == 0 || errno == EEXIST) - && (mkfifo (pipeOutName.toUTF8(), 0666) == 0 || errno == EEXIST); - } - - const String pipeInName, pipeOutName; - int pipeIn, pipeOut; - - const bool createdPipe; - bool volatile blocked, stopReadOperation; - -private: - static void signalHandler (int) {} - - static uint32 getTimeoutEnd (const int timeOutMilliseconds) - { - return timeOutMilliseconds >= 0 ? Time::getMillisecondCounter() + (uint32) timeOutMilliseconds : 0; - } - - static bool hasExpired (const uint32 timeoutEnd) - { - return timeoutEnd != 0 && Time::getMillisecondCounter() >= timeoutEnd; - } - JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR (Pimpl) }; -NamedPipe::NamedPipe() +void NamedPipe::close() { -} - -NamedPipe::~NamedPipe() -{ - close(); -} - -void NamedPipe::cancelPendingReads() -{ - while (pimpl != nullptr && pimpl->blocked) + if (pimpl != nullptr) { pimpl->stopReadOperation = true; char buffer[1] = { 0 }; - int bytesWritten = (int) ::write (pimpl->pipeIn, buffer, 1); - (void) bytesWritten; + ::write (pimpl->pipeIn, buffer, 1); - int timeout = 2000; - while (pimpl->blocked && --timeout >= 0) - Thread::sleep (2); + ScopedWriteLock sl (lock); + pimpl = nullptr; } - - if (pimpl != nullptr) - pimpl->stopReadOperation = false; -} - -void NamedPipe::close() -{ - cancelPendingReads(); - ScopedPointer deleter (pimpl); // (clears the pimpl member variable before deleting it) } bool NamedPipe::openInternal (const String& pipeName, const bool createPipe) { - close(); - #if JUCE_IOS pimpl = new Pimpl (File::getSpecialLocation (File::tempDirectory) .getChildFile (File::createLegalFileName (pipeName)).getFullPathName(), createPipe); @@ -239,15 +204,12 @@ bool NamedPipe::openInternal (const String& pipeName, const bool createPipe) int NamedPipe::read (void* destBuffer, int maxBytesToRead, int timeOutMilliseconds) { + ScopedReadLock sl (lock); return pimpl != nullptr ? pimpl->read (static_cast (destBuffer), maxBytesToRead, timeOutMilliseconds) : -1; } int NamedPipe::write (const void* sourceBuffer, int numBytesToWrite, int timeOutMilliseconds) { + ScopedReadLock sl (lock); return pimpl != nullptr ? pimpl->write (static_cast (sourceBuffer), numBytesToWrite, timeOutMilliseconds) : -1; } - -bool NamedPipe::isOpen() const -{ - return pimpl != nullptr; -} diff --git a/modules/juce_core/native/juce_win32_Files.cpp b/modules/juce_core/native/juce_win32_Files.cpp index 588a8bdd2c..d0f86dc085 100644 --- a/modules/juce_core/native/juce_win32_Files.cpp +++ b/modules/juce_core/native/juce_win32_Files.cpp @@ -731,18 +731,16 @@ void File::revealToUser() const class NamedPipe::Pimpl { public: - Pimpl (const String& file, const bool createPipe) - : pipeH (INVALID_HANDLE_VALUE), + Pimpl (const String& pipeName, const bool createPipe) + : filename ("\\\\.\\pipe\\" + File::createLegalFileName (pipeName)), + pipeH (INVALID_HANDLE_VALUE), cancelEvent (CreateEvent (0, FALSE, FALSE, 0)), - connected (false), - ownsPipe (createPipe) + connected (false), ownsPipe (createPipe), shouldStop (false) { - pipeH = createPipe ? CreateNamedPipe (file.toWideCharPointer(), - PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED, 0, - PIPE_UNLIMITED_INSTANCES, 4096, 4096, 0, 0) - : CreateFile (file.toWideCharPointer(), - GENERIC_READ | GENERIC_WRITE, 0, 0, - OPEN_EXISTING, FILE_FLAG_OVERLAPPED, 0); + if (createPipe) + pipeH = CreateNamedPipe (filename.toWideCharPointer(), + PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED, 0, + PIPE_UNLIMITED_INSTANCES, 4096, 4096, 0, 0); } ~Pimpl() @@ -758,32 +756,47 @@ public: bool connect (const int timeOutMs) { if (! ownsPipe) - return true; + { + if (pipeH != INVALID_HANDLE_VALUE) + return true; + + const Time timeOutEnd (Time::getCurrentTime() + RelativeTime::milliseconds (timeOutMs)); + + for (;;) + { + { + const ScopedLock sl (createFileLock); + + if (pipeH == INVALID_HANDLE_VALUE) + pipeH = CreateFile (filename.toWideCharPointer(), + GENERIC_READ | GENERIC_WRITE, 0, 0, + OPEN_EXISTING, FILE_FLAG_OVERLAPPED, 0); + } + + if (pipeH != INVALID_HANDLE_VALUE) + return true; + + if (shouldStop || (timeOutMs >= 0 && Time::getCurrentTime() > timeOutEnd)) + return false; + + Thread::sleep (1); + } + } if (! connected) { - OVERLAPPED over = { 0 }; - over.hEvent = CreateEvent (0, TRUE, FALSE, 0); + OverlappedEvent over; - if (ConnectNamedPipe (pipeH, &over) == 0) + if (ConnectNamedPipe (pipeH, &over.over) == 0) { - const DWORD err = GetLastError(); - - if (err == ERROR_IO_PENDING || err == ERROR_PIPE_LISTENING) + switch (GetLastError()) { - HANDLE handles[] = { over.hEvent, cancelEvent }; - - if (WaitForMultipleObjects (2, handles, FALSE, - timeOutMs >= 0 ? timeOutMs : INFINITE) == WAIT_OBJECT_0) - connected = true; - } - else if (err == ERROR_PIPE_CONNECTED) - { - connected = true; + case ERROR_PIPE_CONNECTED: connected = true; break; + case ERROR_IO_PENDING: + case ERROR_PIPE_LISTENING: connected = waitForIO (over, timeOutMs); break; + default: break; } } - - CloseHandle (over.hEvent); } return connected; @@ -791,179 +804,150 @@ public: void disconnectPipe() { - if (connected) + if (ownsPipe && connected) { DisconnectNamedPipe (pipeH); connected = false; } } - HANDLE pipeH, cancelEvent; - bool connected, ownsPipe; -}; - -NamedPipe::NamedPipe() -{ -} - -NamedPipe::~NamedPipe() -{ - close(); -} - -bool NamedPipe::isOpen() const -{ - return pimpl != nullptr; -} - -void NamedPipe::cancelPendingReads() -{ - if (pimpl != nullptr) - SetEvent (pimpl->cancelEvent); -} - -void NamedPipe::close() -{ - cancelPendingReads(); - - const ScopedLock sl (lock); - ScopedPointer deleter (pimpl); // (clears the pimpl member variable before deleting it) -} - -bool NamedPipe::openInternal (const String& pipeName, const bool createPipe) -{ - close(); - - pimpl = new Pimpl ("\\\\.\\pipe\\" + File::createLegalFileName (pipeName), createPipe); - - if (pimpl->pipeH != INVALID_HANDLE_VALUE) - return true; - - pimpl = nullptr; - return false; -} - -int NamedPipe::read (void* destBuffer, const int maxBytesToRead, const int timeOutMilliseconds) -{ - const ScopedLock sl (lock); - int bytesRead = -1; - bool waitAgain = true; - - while (waitAgain && pimpl != nullptr) + int read (void* destBuffer, const int maxBytesToRead, const int timeOutMilliseconds) { - waitAgain = false; - - if (! pimpl->connect (timeOutMilliseconds)) - break; - - if (maxBytesToRead <= 0) - return 0; - - OVERLAPPED over = { 0 }; - over.hEvent = CreateEvent (0, TRUE, FALSE, 0); - - unsigned long numRead; - - if (ReadFile (pimpl->pipeH, destBuffer, (DWORD) maxBytesToRead, &numRead, &over)) - { - bytesRead = (int) numRead; - } - else + while (connect (timeOutMilliseconds)) { + if (maxBytesToRead <= 0) + return 0; + + OverlappedEvent over; + unsigned long numRead; + + if (ReadFile (pipeH, destBuffer, (DWORD) maxBytesToRead, &numRead, &over.over)) + return (int) numRead; + const DWORD lastError = GetLastError(); if (lastError == ERROR_IO_PENDING) { - HANDLE handles[] = { over.hEvent, pimpl->cancelEvent }; - DWORD waitResult = WaitForMultipleObjects (2, handles, FALSE, - timeOutMilliseconds >= 0 ? timeOutMilliseconds - : INFINITE); - if (waitResult != WAIT_OBJECT_0) - { - // if the operation timed out, let's cancel it... - CancelIo (pimpl->pipeH); - WaitForSingleObject (over.hEvent, INFINITE); // makes sure cancel is complete - } + if (! waitForIO (over, timeOutMilliseconds)) + return -1; - if (GetOverlappedResult (pimpl->pipeH, &over, &numRead, FALSE)) - { - bytesRead = (int) numRead; - } - else if ((GetLastError() == ERROR_BROKEN_PIPE || GetLastError() == ERROR_PIPE_NOT_CONNECTED) && pimpl->ownsPipe) - { - pimpl->disconnectPipe(); - waitAgain = true; - } + if (GetOverlappedResult (pipeH, &over.over, &numRead, FALSE)) + return (int) numRead; } - else if (pimpl->ownsPipe) - { - waitAgain = true; - if (lastError == ERROR_BROKEN_PIPE || lastError == ERROR_PIPE_NOT_CONNECTED) - pimpl->disconnectPipe(); - else - Sleep (5); - } + if (ownsPipe && (GetLastError() == ERROR_BROKEN_PIPE || GetLastError() == ERROR_PIPE_NOT_CONNECTED)) + disconnectPipe(); + else + break; } - CloseHandle (over.hEvent); + return -1; } - return bytesRead; -} - -int NamedPipe::write (const void* sourceBuffer, int numBytesToWrite, int timeOutMilliseconds) -{ - int bytesWritten = -1; - - if (pimpl != nullptr) + int write (const void* sourceBuffer, int numBytesToWrite, int timeOutMilliseconds) { - if (! pimpl->connect (timeOutMilliseconds)) - { - pimpl = nullptr; - } - else + if (connect (timeOutMilliseconds)) { if (numBytesToWrite <= 0) return 0; - OVERLAPPED over = { 0 }; - over.hEvent = CreateEvent (0, TRUE, FALSE, 0); - + OverlappedEvent over; unsigned long numWritten; - if (WriteFile (pimpl->pipeH, sourceBuffer, (DWORD) numBytesToWrite, &numWritten, &over)) + if (WriteFile (pipeH, sourceBuffer, (DWORD) numBytesToWrite, &numWritten, &over.over)) + return (int) numWritten; + + if (GetLastError() == ERROR_IO_PENDING) { - bytesWritten = (int) numWritten; + if (! waitForIO (over, timeOutMilliseconds)) + return -1; + + if (GetOverlappedResult (pipeH, &over.over, &numWritten, FALSE)) + return (int) numWritten; + + if (GetLastError() == ERROR_BROKEN_PIPE && ownsPipe) + disconnectPipe(); } - else if (GetLastError() == ERROR_IO_PENDING) - { - HANDLE handles[] = { over.hEvent, pimpl->cancelEvent }; - DWORD waitResult; - - waitResult = WaitForMultipleObjects (2, handles, FALSE, - timeOutMilliseconds >= 0 ? timeOutMilliseconds - : INFINITE); - - if (waitResult != WAIT_OBJECT_0) - { - CancelIo (pimpl->pipeH); - WaitForSingleObject (over.hEvent, INFINITE); - } - - if (GetOverlappedResult (pimpl->pipeH, &over, &numWritten, FALSE)) - { - bytesWritten = (int) numWritten; - } - else if (GetLastError() == ERROR_BROKEN_PIPE && pimpl->ownsPipe) - { - pimpl->disconnectPipe(); - } - } - - CloseHandle (over.hEvent); } + + return -1; } - return bytesWritten; + const String filename; + HANDLE pipeH, cancelEvent; + bool connected, ownsPipe, shouldStop; + CriticalSection createFileLock; + +private: + struct OverlappedEvent + { + OverlappedEvent() + { + zerostruct (over); + over.hEvent = CreateEvent (0, TRUE, FALSE, 0); + } + + ~OverlappedEvent() + { + CloseHandle (over.hEvent); + } + + OVERLAPPED over; + }; + + bool waitForIO (OverlappedEvent& over, int timeOutMilliseconds) + { + if (shouldStop) + return false; + + HANDLE handles[] = { over.over.hEvent, cancelEvent }; + DWORD waitResult = WaitForMultipleObjects (2, handles, FALSE, + timeOutMilliseconds >= 0 ? timeOutMilliseconds + : INFINITE); + + if (waitResult == WAIT_OBJECT_0) + return true; + + CancelIo (pipeH); + return false; + } + + JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR (Pimpl); +}; + +void NamedPipe::close() +{ + if (pimpl != nullptr) + { + pimpl->shouldStop = true; + SetEvent (pimpl->cancelEvent); + + ScopedWriteLock sl (lock); + pimpl = nullptr; + } +} + +bool NamedPipe::openInternal (const String& pipeName, const bool createPipe) +{ + pimpl = new Pimpl (pipeName, createPipe); + + if (createPipe && pimpl->pipeH == INVALID_HANDLE_VALUE) + { + pimpl = nullptr; + return false; + } + + return true; +} + +int NamedPipe::read (void* destBuffer, int maxBytesToRead, int timeOutMilliseconds) +{ + ScopedReadLock sl (lock); + return pimpl != nullptr ? pimpl->read (destBuffer, maxBytesToRead, timeOutMilliseconds) : -1; +} + +int NamedPipe::write (const void* sourceBuffer, int numBytesToWrite, int timeOutMilliseconds) +{ + ScopedReadLock sl (lock); + return pimpl != nullptr ? pimpl->write (sourceBuffer, numBytesToWrite, timeOutMilliseconds) : -1; } diff --git a/modules/juce_core/network/juce_NamedPipe.cpp b/modules/juce_core/network/juce_NamedPipe.cpp index da13493f95..902a2df26e 100644 --- a/modules/juce_core/network/juce_NamedPipe.cpp +++ b/modules/juce_core/network/juce_NamedPipe.cpp @@ -23,14 +23,34 @@ ============================================================================== */ +NamedPipe::NamedPipe() +{ +} + +NamedPipe::~NamedPipe() +{ + close(); +} + bool NamedPipe::openExisting (const String& pipeName) { + close(); + + ScopedWriteLock sl (lock); currentPipeName = pipeName; return openInternal (pipeName, false); } +bool NamedPipe::isOpen() const +{ + return pimpl != nullptr; +} + bool NamedPipe::createNewPipe (const String& pipeName) { + close(); + + ScopedWriteLock sl (lock); currentPipeName = pipeName; return openInternal (pipeName, true); } diff --git a/modules/juce_core/network/juce_NamedPipe.h b/modules/juce_core/network/juce_NamedPipe.h index 1651e55ae8..ebb850bda5 100644 --- a/modules/juce_core/network/juce_NamedPipe.h +++ b/modules/juce_core/network/juce_NamedPipe.h @@ -26,6 +26,7 @@ #ifndef __JUCE_NAMEDPIPE_JUCEHEADER__ #define __JUCE_NAMEDPIPE_JUCEHEADER__ +#include "../threads/juce_ReadWriteLock.h" //============================================================================== /** @@ -85,15 +86,12 @@ public: */ int write (const void* sourceBuffer, int numBytesToWrite, int timeOutMilliseconds); - /** If any threads are currently blocked on a read operation, this tells them to abort. */ - void cancelPendingReads(); - private: //============================================================================== class Pimpl; ScopedPointer pimpl; String currentPipeName; - CriticalSection lock; + ReadWriteLock lock; bool openInternal (const String& pipeName, const bool createPipe); diff --git a/modules/juce_events/interprocess/juce_InterprocessConnection.cpp b/modules/juce_events/interprocess/juce_InterprocessConnection.cpp index 3d34fa3c96..9055599bad 100644 --- a/modules/juce_events/interprocess/juce_InterprocessConnection.cpp +++ b/modules/juce_events/interprocess/juce_InterprocessConnection.cpp @@ -191,8 +191,7 @@ struct ConnectionStateMessage : public MessageManager::MessageBase void messageCallback() { - InterprocessConnection* const ipc = owner; - if (ipc != nullptr) + if (InterprocessConnection* const ipc = owner) { if (connectionMade) ipc->connectionMade(); @@ -300,6 +299,7 @@ bool InterprocessConnection::readNextMessageInt() } else if (bytes < 0) { + if (socket != nullptr) { const ScopedLock sl (pipeAndSocketLock); socket = nullptr;