1
0
Fork 0
mirror of https://github.com/juce-framework/JUCE.git synced 2026-01-10 23:44:24 +00:00

Re-implemented NamedPipe, to make the win32 version better match the posix one.

This commit is contained in:
jules 2012-12-09 12:52:41 +00:00
parent f5bcfccf6c
commit 41f3b6c485
5 changed files with 280 additions and 316 deletions

View file

@ -31,7 +31,7 @@ public:
pipeOutName (pipePath + "_out"),
pipeIn (-1), pipeOut (-1),
createdPipe (createPipe),
blocked (false), stopReadOperation (false)
stopReadOperation (false)
{
signal (SIGPIPE, signalHandler);
siginterrupt (SIGPIPE, 1);
@ -49,6 +49,98 @@ public:
}
}
int read (char* destBuffer, int maxBytesToRead, int timeOutMilliseconds)
{
const uint32 timeoutEnd = getTimeoutEnd (timeOutMilliseconds);
if (pipeIn == -1)
{
pipeIn = openPipe (createdPipe ? pipeInName : pipeOutName, O_RDWR | O_NONBLOCK, timeoutEnd);
if (pipeIn == -1)
return -1;
}
int bytesRead = 0;
while (bytesRead < maxBytesToRead)
{
const int bytesThisTime = maxBytesToRead - bytesRead;
const int numRead = (int) ::read (pipeIn, destBuffer, (size_t) bytesThisTime);
if (numRead <= 0)
{
if (errno != EWOULDBLOCK || stopReadOperation || hasExpired (timeoutEnd))
return -1;
const int maxWaitingTime = 30;
waitForInput (pipeIn, timeoutEnd == 0 ? maxWaitingTime
: jmin (maxWaitingTime,
(int) (timeoutEnd - Time::getMillisecondCounter())));
continue;
}
bytesRead += numRead;
destBuffer += numRead;
}
return bytesRead;
}
int write (const char* sourceBuffer, int numBytesToWrite, int timeOutMilliseconds)
{
const uint32 timeoutEnd = getTimeoutEnd (timeOutMilliseconds);
if (pipeOut == -1)
{
pipeOut = openPipe (createdPipe ? pipeOutName : pipeInName, O_WRONLY, timeoutEnd);
if (pipeOut == -1)
return -1;
}
int bytesWritten = 0;
while (bytesWritten < numBytesToWrite && ! hasExpired (timeoutEnd))
{
const int bytesThisTime = numBytesToWrite - bytesWritten;
const int numWritten = (int) ::write (pipeOut, sourceBuffer, (size_t) bytesThisTime);
if (numWritten <= 0)
return -1;
bytesWritten += numWritten;
sourceBuffer += numWritten;
}
return bytesWritten;
}
bool createFifos() const
{
return (mkfifo (pipeInName .toUTF8(), 0666) == 0 || errno == EEXIST)
&& (mkfifo (pipeOutName.toUTF8(), 0666) == 0 || errno == EEXIST);
}
const String pipeInName, pipeOutName;
int pipeIn, pipeOut;
const bool createdPipe;
bool stopReadOperation;
private:
static void signalHandler (int) {}
static uint32 getTimeoutEnd (const int timeOutMilliseconds)
{
return timeOutMilliseconds >= 0 ? Time::getMillisecondCounter() + (uint32) timeOutMilliseconds : 0;
}
static bool hasExpired (const uint32 timeoutEnd)
{
return timeoutEnd != 0 && Time::getMillisecondCounter() >= timeoutEnd;
}
static int openPipe (const String& name, int flags, const uint32 timeoutEnd)
{
for (;;)
@ -75,152 +167,25 @@ public:
select (handle + 1, &rset, nullptr, 0, &timeout);
}
int read (char* destBuffer, int maxBytesToRead, int timeOutMilliseconds)
{
int bytesRead = -1;
blocked = true;
const uint32 timeoutEnd = getTimeoutEnd (timeOutMilliseconds);
if (pipeIn == -1)
{
pipeIn = openPipe (createdPipe ? pipeInName : pipeOutName, O_RDWR | O_NONBLOCK, timeoutEnd);
if (pipeIn == -1)
{
blocked = false;
return -1;
}
}
bytesRead = 0;
while (bytesRead < maxBytesToRead)
{
const int bytesThisTime = maxBytesToRead - bytesRead;
const int numRead = (int) ::read (pipeIn, destBuffer, (size_t) bytesThisTime);
if (numRead <= 0)
{
if (errno != EWOULDBLOCK || stopReadOperation || hasExpired (timeoutEnd))
{
bytesRead = -1;
break;
}
const int maxWaitingTime = 30;
waitForInput (pipeIn, timeoutEnd == 0 ? maxWaitingTime
: jmin (maxWaitingTime,
(int) (timeoutEnd - Time::getMillisecondCounter())));
continue;
}
bytesRead += numRead;
destBuffer += numRead;
}
blocked = false;
return bytesRead;
}
int write (const char* sourceBuffer, int numBytesToWrite, int timeOutMilliseconds)
{
int bytesWritten = -1;
const uint32 timeoutEnd = getTimeoutEnd (timeOutMilliseconds);
if (pipeOut == -1)
{
pipeOut = openPipe (createdPipe ? pipeOutName : pipeInName, O_WRONLY, timeoutEnd);
if (pipeOut == -1)
return -1;
}
bytesWritten = 0;
while (bytesWritten < numBytesToWrite && ! hasExpired (timeoutEnd))
{
const int bytesThisTime = numBytesToWrite - bytesWritten;
const int numWritten = (int) ::write (pipeOut, sourceBuffer, (size_t) bytesThisTime);
if (numWritten <= 0)
{
bytesWritten = -1;
break;
}
bytesWritten += numWritten;
sourceBuffer += numWritten;
}
return bytesWritten;
}
bool createFifos() const
{
return (mkfifo (pipeInName .toUTF8(), 0666) == 0 || errno == EEXIST)
&& (mkfifo (pipeOutName.toUTF8(), 0666) == 0 || errno == EEXIST);
}
const String pipeInName, pipeOutName;
int pipeIn, pipeOut;
const bool createdPipe;
bool volatile blocked, stopReadOperation;
private:
static void signalHandler (int) {}
static uint32 getTimeoutEnd (const int timeOutMilliseconds)
{
return timeOutMilliseconds >= 0 ? Time::getMillisecondCounter() + (uint32) timeOutMilliseconds : 0;
}
static bool hasExpired (const uint32 timeoutEnd)
{
return timeoutEnd != 0 && Time::getMillisecondCounter() >= timeoutEnd;
}
JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR (Pimpl)
};
NamedPipe::NamedPipe()
void NamedPipe::close()
{
}
NamedPipe::~NamedPipe()
{
close();
}
void NamedPipe::cancelPendingReads()
{
while (pimpl != nullptr && pimpl->blocked)
if (pimpl != nullptr)
{
pimpl->stopReadOperation = true;
char buffer[1] = { 0 };
int bytesWritten = (int) ::write (pimpl->pipeIn, buffer, 1);
(void) bytesWritten;
::write (pimpl->pipeIn, buffer, 1);
int timeout = 2000;
while (pimpl->blocked && --timeout >= 0)
Thread::sleep (2);
ScopedWriteLock sl (lock);
pimpl = nullptr;
}
if (pimpl != nullptr)
pimpl->stopReadOperation = false;
}
void NamedPipe::close()
{
cancelPendingReads();
ScopedPointer<Pimpl> deleter (pimpl); // (clears the pimpl member variable before deleting it)
}
bool NamedPipe::openInternal (const String& pipeName, const bool createPipe)
{
close();
#if JUCE_IOS
pimpl = new Pimpl (File::getSpecialLocation (File::tempDirectory)
.getChildFile (File::createLegalFileName (pipeName)).getFullPathName(), createPipe);
@ -239,15 +204,12 @@ bool NamedPipe::openInternal (const String& pipeName, const bool createPipe)
int NamedPipe::read (void* destBuffer, int maxBytesToRead, int timeOutMilliseconds)
{
ScopedReadLock sl (lock);
return pimpl != nullptr ? pimpl->read (static_cast <char*> (destBuffer), maxBytesToRead, timeOutMilliseconds) : -1;
}
int NamedPipe::write (const void* sourceBuffer, int numBytesToWrite, int timeOutMilliseconds)
{
ScopedReadLock sl (lock);
return pimpl != nullptr ? pimpl->write (static_cast <const char*> (sourceBuffer), numBytesToWrite, timeOutMilliseconds) : -1;
}
bool NamedPipe::isOpen() const
{
return pimpl != nullptr;
}

View file

@ -731,18 +731,16 @@ void File::revealToUser() const
class NamedPipe::Pimpl
{
public:
Pimpl (const String& file, const bool createPipe)
: pipeH (INVALID_HANDLE_VALUE),
Pimpl (const String& pipeName, const bool createPipe)
: filename ("\\\\.\\pipe\\" + File::createLegalFileName (pipeName)),
pipeH (INVALID_HANDLE_VALUE),
cancelEvent (CreateEvent (0, FALSE, FALSE, 0)),
connected (false),
ownsPipe (createPipe)
connected (false), ownsPipe (createPipe), shouldStop (false)
{
pipeH = createPipe ? CreateNamedPipe (file.toWideCharPointer(),
PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED, 0,
PIPE_UNLIMITED_INSTANCES, 4096, 4096, 0, 0)
: CreateFile (file.toWideCharPointer(),
GENERIC_READ | GENERIC_WRITE, 0, 0,
OPEN_EXISTING, FILE_FLAG_OVERLAPPED, 0);
if (createPipe)
pipeH = CreateNamedPipe (filename.toWideCharPointer(),
PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED, 0,
PIPE_UNLIMITED_INSTANCES, 4096, 4096, 0, 0);
}
~Pimpl()
@ -758,32 +756,47 @@ public:
bool connect (const int timeOutMs)
{
if (! ownsPipe)
return true;
{
if (pipeH != INVALID_HANDLE_VALUE)
return true;
const Time timeOutEnd (Time::getCurrentTime() + RelativeTime::milliseconds (timeOutMs));
for (;;)
{
{
const ScopedLock sl (createFileLock);
if (pipeH == INVALID_HANDLE_VALUE)
pipeH = CreateFile (filename.toWideCharPointer(),
GENERIC_READ | GENERIC_WRITE, 0, 0,
OPEN_EXISTING, FILE_FLAG_OVERLAPPED, 0);
}
if (pipeH != INVALID_HANDLE_VALUE)
return true;
if (shouldStop || (timeOutMs >= 0 && Time::getCurrentTime() > timeOutEnd))
return false;
Thread::sleep (1);
}
}
if (! connected)
{
OVERLAPPED over = { 0 };
over.hEvent = CreateEvent (0, TRUE, FALSE, 0);
OverlappedEvent over;
if (ConnectNamedPipe (pipeH, &over) == 0)
if (ConnectNamedPipe (pipeH, &over.over) == 0)
{
const DWORD err = GetLastError();
if (err == ERROR_IO_PENDING || err == ERROR_PIPE_LISTENING)
switch (GetLastError())
{
HANDLE handles[] = { over.hEvent, cancelEvent };
if (WaitForMultipleObjects (2, handles, FALSE,
timeOutMs >= 0 ? timeOutMs : INFINITE) == WAIT_OBJECT_0)
connected = true;
}
else if (err == ERROR_PIPE_CONNECTED)
{
connected = true;
case ERROR_PIPE_CONNECTED: connected = true; break;
case ERROR_IO_PENDING:
case ERROR_PIPE_LISTENING: connected = waitForIO (over, timeOutMs); break;
default: break;
}
}
CloseHandle (over.hEvent);
}
return connected;
@ -791,179 +804,150 @@ public:
void disconnectPipe()
{
if (connected)
if (ownsPipe && connected)
{
DisconnectNamedPipe (pipeH);
connected = false;
}
}
HANDLE pipeH, cancelEvent;
bool connected, ownsPipe;
};
NamedPipe::NamedPipe()
{
}
NamedPipe::~NamedPipe()
{
close();
}
bool NamedPipe::isOpen() const
{
return pimpl != nullptr;
}
void NamedPipe::cancelPendingReads()
{
if (pimpl != nullptr)
SetEvent (pimpl->cancelEvent);
}
void NamedPipe::close()
{
cancelPendingReads();
const ScopedLock sl (lock);
ScopedPointer<Pimpl> deleter (pimpl); // (clears the pimpl member variable before deleting it)
}
bool NamedPipe::openInternal (const String& pipeName, const bool createPipe)
{
close();
pimpl = new Pimpl ("\\\\.\\pipe\\" + File::createLegalFileName (pipeName), createPipe);
if (pimpl->pipeH != INVALID_HANDLE_VALUE)
return true;
pimpl = nullptr;
return false;
}
int NamedPipe::read (void* destBuffer, const int maxBytesToRead, const int timeOutMilliseconds)
{
const ScopedLock sl (lock);
int bytesRead = -1;
bool waitAgain = true;
while (waitAgain && pimpl != nullptr)
int read (void* destBuffer, const int maxBytesToRead, const int timeOutMilliseconds)
{
waitAgain = false;
if (! pimpl->connect (timeOutMilliseconds))
break;
if (maxBytesToRead <= 0)
return 0;
OVERLAPPED over = { 0 };
over.hEvent = CreateEvent (0, TRUE, FALSE, 0);
unsigned long numRead;
if (ReadFile (pimpl->pipeH, destBuffer, (DWORD) maxBytesToRead, &numRead, &over))
{
bytesRead = (int) numRead;
}
else
while (connect (timeOutMilliseconds))
{
if (maxBytesToRead <= 0)
return 0;
OverlappedEvent over;
unsigned long numRead;
if (ReadFile (pipeH, destBuffer, (DWORD) maxBytesToRead, &numRead, &over.over))
return (int) numRead;
const DWORD lastError = GetLastError();
if (lastError == ERROR_IO_PENDING)
{
HANDLE handles[] = { over.hEvent, pimpl->cancelEvent };
DWORD waitResult = WaitForMultipleObjects (2, handles, FALSE,
timeOutMilliseconds >= 0 ? timeOutMilliseconds
: INFINITE);
if (waitResult != WAIT_OBJECT_0)
{
// if the operation timed out, let's cancel it...
CancelIo (pimpl->pipeH);
WaitForSingleObject (over.hEvent, INFINITE); // makes sure cancel is complete
}
if (! waitForIO (over, timeOutMilliseconds))
return -1;
if (GetOverlappedResult (pimpl->pipeH, &over, &numRead, FALSE))
{
bytesRead = (int) numRead;
}
else if ((GetLastError() == ERROR_BROKEN_PIPE || GetLastError() == ERROR_PIPE_NOT_CONNECTED) && pimpl->ownsPipe)
{
pimpl->disconnectPipe();
waitAgain = true;
}
if (GetOverlappedResult (pipeH, &over.over, &numRead, FALSE))
return (int) numRead;
}
else if (pimpl->ownsPipe)
{
waitAgain = true;
if (lastError == ERROR_BROKEN_PIPE || lastError == ERROR_PIPE_NOT_CONNECTED)
pimpl->disconnectPipe();
else
Sleep (5);
}
if (ownsPipe && (GetLastError() == ERROR_BROKEN_PIPE || GetLastError() == ERROR_PIPE_NOT_CONNECTED))
disconnectPipe();
else
break;
}
CloseHandle (over.hEvent);
return -1;
}
return bytesRead;
}
int NamedPipe::write (const void* sourceBuffer, int numBytesToWrite, int timeOutMilliseconds)
{
int bytesWritten = -1;
if (pimpl != nullptr)
int write (const void* sourceBuffer, int numBytesToWrite, int timeOutMilliseconds)
{
if (! pimpl->connect (timeOutMilliseconds))
{
pimpl = nullptr;
}
else
if (connect (timeOutMilliseconds))
{
if (numBytesToWrite <= 0)
return 0;
OVERLAPPED over = { 0 };
over.hEvent = CreateEvent (0, TRUE, FALSE, 0);
OverlappedEvent over;
unsigned long numWritten;
if (WriteFile (pimpl->pipeH, sourceBuffer, (DWORD) numBytesToWrite, &numWritten, &over))
if (WriteFile (pipeH, sourceBuffer, (DWORD) numBytesToWrite, &numWritten, &over.over))
return (int) numWritten;
if (GetLastError() == ERROR_IO_PENDING)
{
bytesWritten = (int) numWritten;
if (! waitForIO (over, timeOutMilliseconds))
return -1;
if (GetOverlappedResult (pipeH, &over.over, &numWritten, FALSE))
return (int) numWritten;
if (GetLastError() == ERROR_BROKEN_PIPE && ownsPipe)
disconnectPipe();
}
else if (GetLastError() == ERROR_IO_PENDING)
{
HANDLE handles[] = { over.hEvent, pimpl->cancelEvent };
DWORD waitResult;
waitResult = WaitForMultipleObjects (2, handles, FALSE,
timeOutMilliseconds >= 0 ? timeOutMilliseconds
: INFINITE);
if (waitResult != WAIT_OBJECT_0)
{
CancelIo (pimpl->pipeH);
WaitForSingleObject (over.hEvent, INFINITE);
}
if (GetOverlappedResult (pimpl->pipeH, &over, &numWritten, FALSE))
{
bytesWritten = (int) numWritten;
}
else if (GetLastError() == ERROR_BROKEN_PIPE && pimpl->ownsPipe)
{
pimpl->disconnectPipe();
}
}
CloseHandle (over.hEvent);
}
return -1;
}
return bytesWritten;
const String filename;
HANDLE pipeH, cancelEvent;
bool connected, ownsPipe, shouldStop;
CriticalSection createFileLock;
private:
struct OverlappedEvent
{
OverlappedEvent()
{
zerostruct (over);
over.hEvent = CreateEvent (0, TRUE, FALSE, 0);
}
~OverlappedEvent()
{
CloseHandle (over.hEvent);
}
OVERLAPPED over;
};
bool waitForIO (OverlappedEvent& over, int timeOutMilliseconds)
{
if (shouldStop)
return false;
HANDLE handles[] = { over.over.hEvent, cancelEvent };
DWORD waitResult = WaitForMultipleObjects (2, handles, FALSE,
timeOutMilliseconds >= 0 ? timeOutMilliseconds
: INFINITE);
if (waitResult == WAIT_OBJECT_0)
return true;
CancelIo (pipeH);
return false;
}
JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR (Pimpl);
};
void NamedPipe::close()
{
if (pimpl != nullptr)
{
pimpl->shouldStop = true;
SetEvent (pimpl->cancelEvent);
ScopedWriteLock sl (lock);
pimpl = nullptr;
}
}
bool NamedPipe::openInternal (const String& pipeName, const bool createPipe)
{
pimpl = new Pimpl (pipeName, createPipe);
if (createPipe && pimpl->pipeH == INVALID_HANDLE_VALUE)
{
pimpl = nullptr;
return false;
}
return true;
}
int NamedPipe::read (void* destBuffer, int maxBytesToRead, int timeOutMilliseconds)
{
ScopedReadLock sl (lock);
return pimpl != nullptr ? pimpl->read (destBuffer, maxBytesToRead, timeOutMilliseconds) : -1;
}
int NamedPipe::write (const void* sourceBuffer, int numBytesToWrite, int timeOutMilliseconds)
{
ScopedReadLock sl (lock);
return pimpl != nullptr ? pimpl->write (sourceBuffer, numBytesToWrite, timeOutMilliseconds) : -1;
}

View file

@ -23,14 +23,34 @@
==============================================================================
*/
NamedPipe::NamedPipe()
{
}
NamedPipe::~NamedPipe()
{
close();
}
bool NamedPipe::openExisting (const String& pipeName)
{
close();
ScopedWriteLock sl (lock);
currentPipeName = pipeName;
return openInternal (pipeName, false);
}
bool NamedPipe::isOpen() const
{
return pimpl != nullptr;
}
bool NamedPipe::createNewPipe (const String& pipeName)
{
close();
ScopedWriteLock sl (lock);
currentPipeName = pipeName;
return openInternal (pipeName, true);
}

View file

@ -26,6 +26,7 @@
#ifndef __JUCE_NAMEDPIPE_JUCEHEADER__
#define __JUCE_NAMEDPIPE_JUCEHEADER__
#include "../threads/juce_ReadWriteLock.h"
//==============================================================================
/**
@ -85,15 +86,12 @@ public:
*/
int write (const void* sourceBuffer, int numBytesToWrite, int timeOutMilliseconds);
/** If any threads are currently blocked on a read operation, this tells them to abort. */
void cancelPendingReads();
private:
//==============================================================================
class Pimpl;
ScopedPointer<Pimpl> pimpl;
String currentPipeName;
CriticalSection lock;
ReadWriteLock lock;
bool openInternal (const String& pipeName, const bool createPipe);

View file

@ -191,8 +191,7 @@ struct ConnectionStateMessage : public MessageManager::MessageBase
void messageCallback()
{
InterprocessConnection* const ipc = owner;
if (ipc != nullptr)
if (InterprocessConnection* const ipc = owner)
{
if (connectionMade)
ipc->connectionMade();
@ -300,6 +299,7 @@ bool InterprocessConnection::readNextMessageInt()
}
else if (bytes < 0)
{
if (socket != nullptr)
{
const ScopedLock sl (pipeAndSocketLock);
socket = nullptr;