1
0
Fork 0
mirror of https://github.com/juce-framework/JUCE.git synced 2026-01-27 02:20:05 +00:00

Re-write DatagramSocket class and general clean-up in socket code

This commit is contained in:
hogliux 2015-04-13 12:56:23 +01:00
parent 1ee737c7cd
commit 41b38f59ed
2 changed files with 231 additions and 236 deletions

View file

@ -75,9 +75,59 @@ namespace SocketHelpers
: (setsockopt (handle, IPPROTO_TCP, TCP_NODELAY, (const char*) &one, sizeof (one)) == 0));
}
static void closeSocket (volatile int& handle, CriticalSection& readLock,
const bool isListener, int portNumber, bool& connected) noexcept
{
const SocketHandle h = handle;
handle = -1;
#if JUCE_WINDOWS
ignoreUnused (portNumber, isListener, readLock);
if (h != SOCKET_ERROR || connected)
closesocket (h);
// make sure any read process finishes before we delete the socket
CriticalSection::ScopedLockType lock(readLock);
connected = false;
#else
if (connected)
{
connected = false;
if (isListener)
{
// need to do this to interrupt the accept() function..
StreamingSocket temp;
temp.connect (IPAddress::local().toString(), portNumber, 1000);
}
}
if (h != -1)
{
// unblock any pending read requests
::shutdown (h, SHUT_RDWR);
{
// see man-page of recv on linux about a race condition where the
// shutdown command is lost if the receiving thread does not have
// a chance to process before close is called. On Mac OS X shutdown
// does not unblock a select call, so using a lock here will dead-lock
// both threads.
#if JUCE_LINUX
CriticalSection::ScopedLockType lock (readLock);
::close (h);
#else
::close (h);
CriticalSection::ScopedLockType lock(readLock);
#endif
}
}
#endif
}
static bool bindSocketToPort (const SocketHandle handle, const int port) noexcept
{
if (handle <= 0 || port <= 0)
if (handle <= 0 || port < 0)
return false;
struct sockaddr_in servTmpAddr;
@ -89,32 +139,57 @@ namespace SocketHelpers
return bind (handle, (struct sockaddr*) &servTmpAddr, sizeof (struct sockaddr_in)) >= 0;
}
static int getBoundPort (const SocketHandle handle) noexcept
{
if (handle <= 0)
return -1;
struct sockaddr_in sin;
socklen_t len = sizeof(sin);
if (getsockname (handle, (struct sockaddr *)&sin, &len) == 0)
return ntohs (sin.sin_port);
return -1;
}
static int readSocket (const SocketHandle handle,
void* const destBuffer, const ssize_t maxBytesToRead,
bool volatile& connected,
const bool blockUntilSpecifiedAmountHasArrived,
String* senderIP) noexcept
CriticalSection& readLock,
String* senderIP = nullptr,
int* senderPort = nullptr) noexcept
{
ssize_t bytesRead = 0;
while (bytesRead < maxBytesToRead)
{
ssize_t bytesThisTime;
ssize_t bytesThisTime = -1;
char* const buffer = static_cast<char*> (destBuffer) + bytesRead;
const size_t numToRead = (size_t) (maxBytesToRead - bytesRead);
if (senderIP == nullptr)
{
bytesThisTime = ::recv (handle, buffer, numToRead, 0);
}
else
{
sockaddr_in client;
socklen_t clientLen = sizeof (sockaddr);
// avoid race-condition
CriticalSection::ScopedTryLockType lock (readLock);
bytesThisTime = (int) ::recvfrom (handle, buffer, numToRead, 0, (sockaddr*) &client, &clientLen);
if (lock.isLocked())
{
if (senderIP == nullptr || senderPort == nullptr)
{
bytesThisTime = ::recv (handle, buffer, numToRead, 0);
}
else
{
sockaddr_in client;
socklen_t clientLen = sizeof (sockaddr);
*senderIP = String::fromUTF8 (inet_ntoa (client.sin_addr), 16);
bytesThisTime = (int) ::recvfrom (handle, buffer, numToRead, 0, (sockaddr*) &client, &clientLen);
*senderIP = String::fromUTF8 (inet_ntoa (client.sin_addr), 16);
*senderPort = ntohs (client.sin_port);
}
}
}
if (bytesThisTime <= 0 || ! connected)
@ -134,8 +209,17 @@ namespace SocketHelpers
return (int) bytesRead;
}
static int waitForReadiness (const SocketHandle handle, const bool forReading, const int timeoutMsecs) noexcept
static int waitForReadiness (const volatile int& handle, CriticalSection& readLock,
const bool forReading, const int timeoutMsecs) noexcept
{
// avoid race-condition
CriticalSection::ScopedTryLockType lock (readLock);
if (! lock.isLocked())
return -1;
int h = handle;
struct timeval timeout;
struct timeval* timeoutp;
@ -152,20 +236,20 @@ namespace SocketHelpers
fd_set rset, wset;
FD_ZERO (&rset);
FD_SET (handle, &rset);
FD_SET (h, &rset);
FD_ZERO (&wset);
FD_SET (handle, &wset);
FD_SET (h, &wset);
fd_set* const prset = forReading ? &rset : nullptr;
fd_set* const pwset = forReading ? nullptr : &wset;
#if JUCE_WINDOWS
if (select ((int) handle + 1, prset, pwset, 0, timeoutp) < 0)
if (select ((int) h + 1, prset, pwset, 0, timeoutp) < 0)
return -1;
#else
{
int result;
while ((result = select (handle + 1, prset, pwset, 0, timeoutp)) < 0
while ((result = select (h + 1, prset, pwset, 0, timeoutp)) < 0
&& errno == EINTR)
{
}
@ -175,16 +259,20 @@ namespace SocketHelpers
}
#endif
// we are closing
if (handle < 0)
return -1;
{
int opt;
juce_socklen_t len = sizeof (opt);
if (getsockopt (handle, SOL_SOCKET, SO_ERROR, (char*) &opt, &len) < 0
if (getsockopt (h, SOL_SOCKET, SO_ERROR, (char*) &opt, &len) < 0
|| opt != 0)
return -1;
}
return FD_ISSET (handle, forReading ? &rset : &wset) ? 1 : 0;
return FD_ISSET (h, forReading ? &rset : &wset) ? 1 : 0;
}
static bool setSocketBlockingState (const SocketHandle handle, const bool shouldBlock) noexcept
@ -207,12 +295,7 @@ namespace SocketHelpers
#endif
}
static bool connectSocket (int volatile& handle,
const bool isDatagram,
struct addrinfo** const serverAddress,
const String& hostName,
const int portNumber,
const int timeOutMillisecs) noexcept
static addrinfo* getAddressInfo (const bool isDatagram, const String& hostName, int portNumber)
{
struct addrinfo hints;
zerostruct (hints);
@ -222,52 +305,57 @@ namespace SocketHelpers
hints.ai_flags = AI_NUMERICSERV;
struct addrinfo* info = nullptr;
if (getaddrinfo (hostName.toUTF8(), String (portNumber).toUTF8(), &hints, &info) != 0
|| info == nullptr)
return false;
if (getaddrinfo (hostName.toUTF8(), String (portNumber).toUTF8(), &hints, &info) == 0
&& info != nullptr)
return info;
if (handle < 0)
handle = (int) socket (info->ai_family, info->ai_socktype, 0);
return nullptr;
}
if (handle < 0)
static bool connectSocket (int volatile& handle,
CriticalSection& readLock,
const String& hostName,
const int portNumber,
const int timeOutMillisecs) noexcept
{
if (struct addrinfo* info = getAddressInfo (false, hostName, portNumber))
{
if (handle < 0)
handle = (int) socket (info->ai_family, info->ai_socktype, 0);
if (handle < 0)
{
freeaddrinfo (info);
return false;
}
setSocketBlockingState (handle, false);
const int result = ::connect (handle, info->ai_addr, (socklen_t) info->ai_addrlen);
freeaddrinfo (info);
return false;
}
if (isDatagram)
{
if (*serverAddress != nullptr)
freeaddrinfo (*serverAddress);
if (result < 0)
{
#if JUCE_WINDOWS
if (result == SOCKET_ERROR && WSAGetLastError() == WSAEWOULDBLOCK)
#else
if (errno == EINPROGRESS)
#endif
{
if (waitForReadiness (handle, readLock, false, timeOutMillisecs) != 1)
{
setSocketBlockingState (handle, true);
return false;
}
}
}
setSocketBlockingState (handle, true);
resetSocketOptions (handle, false, false);
*serverAddress = info;
return true;
}
setSocketBlockingState (handle, false);
const int result = ::connect (handle, info->ai_addr, (socklen_t) info->ai_addrlen);
freeaddrinfo (info);
if (result < 0)
{
#if JUCE_WINDOWS
if (result == SOCKET_ERROR && WSAGetLastError() == WSAEWOULDBLOCK)
#else
if (errno == EINPROGRESS)
#endif
{
if (waitForReadiness (handle, false, timeOutMillisecs) != 1)
{
setSocketBlockingState (handle, true);
return false;
}
}
}
setSocketBlockingState (handle, true);
resetSocketOptions (handle, false, false);
return true;
return false;
}
static void makeReusable (int handle) noexcept
@ -306,7 +394,8 @@ StreamingSocket::~StreamingSocket()
//==============================================================================
int StreamingSocket::read (void* destBuffer, const int maxBytesToRead, bool shouldBlock)
{
return (connected && ! isListener) ? SocketHelpers::readSocket (handle, destBuffer, maxBytesToRead, connected, shouldBlock, nullptr)
return (connected && ! isListener) ? SocketHelpers::readSocket (handle, destBuffer, maxBytesToRead,
connected, shouldBlock, readLock)
: -1;
}
@ -322,7 +411,7 @@ int StreamingSocket::write (const void* sourceBuffer, const int numBytesToWrite)
int StreamingSocket::waitUntilReady (const bool readyForReading,
const int timeoutMsecs) const
{
return connected ? SocketHelpers::waitForReadiness (handle, readyForReading, timeoutMsecs)
return connected ? SocketHelpers::waitForReadiness (handle, readLock, readyForReading, timeoutMsecs)
: -1;
}
@ -332,6 +421,11 @@ bool StreamingSocket::bindToPort (const int port)
return SocketHelpers::bindSocketToPort (handle, port);
}
int StreamingSocket::getBoundPort() const noexcept
{
return SocketHelpers::getBoundPort (handle);
}
bool StreamingSocket::connect (const String& remoteHostName,
const int remotePortNumber,
const int timeOutMillisecs)
@ -349,7 +443,7 @@ bool StreamingSocket::connect (const String& remoteHostName,
portNumber = remotePortNumber;
isListener = false;
connected = SocketHelpers::connectSocket (handle, false, nullptr, remoteHostName,
connected = SocketHelpers::connectSocket (handle, readLock, remoteHostName,
remotePortNumber, timeOutMillisecs);
if (! (connected && SocketHelpers::resetSocketOptions (handle, false, false)))
@ -363,30 +457,7 @@ bool StreamingSocket::connect (const String& remoteHostName,
void StreamingSocket::close()
{
#if JUCE_WINDOWS
if (handle != SOCKET_ERROR || connected)
closesocket (handle);
connected = false;
#else
if (connected)
{
connected = false;
if (isListener)
{
// need to do this to interrupt the accept() function..
StreamingSocket temp;
temp.connect (IPAddress::local().toString(), portNumber, 1000);
}
}
if (handle != -1)
{
::shutdown (handle, SHUT_RDWR);
::close (handle);
}
#endif
SocketHelpers::closeSocket (handle, readLock, isListener, portNumber, connected);
hostName.clear();
portNumber = 0;
@ -463,140 +534,86 @@ bool StreamingSocket::isLocal() const noexcept
//==============================================================================
//==============================================================================
DatagramSocket::DatagramSocket (const int localPortNumber, const bool canBroadcast)
: portNumber (0),
handle (-1),
connected (true),
allowBroadcast (canBroadcast),
serverAddress (nullptr)
DatagramSocket::DatagramSocket (const bool canBroadcast)
: handle (-1),
isBound (false),
lastServerPort (-1),
lastServerAddress (nullptr)
{
SocketHelpers::initSockets();
handle = (int) socket (AF_INET, SOCK_DGRAM, 0);
SocketHelpers::resetSocketOptions (handle, true, canBroadcast);
SocketHelpers::makeReusable (handle);
bindToPort (localPortNumber);
}
DatagramSocket::DatagramSocket (const String& host, const int portNum,
const int h, const int localPortNumber)
: hostName (host),
portNumber (portNum),
handle (h),
connected (true),
allowBroadcast (false),
serverAddress (nullptr)
{
SocketHelpers::initSockets();
SocketHelpers::resetSocketOptions (h, true, allowBroadcast);
bindToPort (localPortNumber);
}
DatagramSocket::~DatagramSocket()
{
close();
if (lastServerAddress != nullptr)
freeaddrinfo (static_cast <struct addrinfo*> (lastServerAddress));
if (serverAddress != nullptr)
freeaddrinfo (static_cast <struct addrinfo*> (serverAddress));
bool connected = false;
SocketHelpers::closeSocket (handle, readLock, false, 0, connected);
}
void DatagramSocket::close()
{
#if JUCE_WINDOWS
closesocket (handle);
connected = false;
#else
connected = false;
::close (handle);
#endif
hostName.clear();
portNumber = 0;
handle = -1;
}
bool DatagramSocket::bindToPort (const int port)
{
return SocketHelpers::bindSocketToPort (handle, port);
}
bool DatagramSocket::connect (const String& remoteHostName,
const int remotePortNumber,
const int timeOutMillisecs)
{
if (connected)
close();
hostName = remoteHostName;
portNumber = remotePortNumber;
connected = SocketHelpers::connectSocket (handle, true, (struct addrinfo**) &serverAddress,
remoteHostName, remotePortNumber,
timeOutMillisecs);
if (! (connected && SocketHelpers::resetSocketOptions (handle, true, allowBroadcast)))
if (SocketHelpers::bindSocketToPort (handle, port))
{
close();
return false;
isBound = true;
return true;
}
return true;
return false;
}
DatagramSocket* DatagramSocket::waitForNextConnection() const
int DatagramSocket::getBoundPort() const noexcept
{
while (waitUntilReady (true, -1) == 1)
{
struct sockaddr_storage address;
juce_socklen_t len = sizeof (address);
char buf[1];
if (recvfrom (handle, buf, 0, 0, (struct sockaddr*) &address, &len) > 0)
return new DatagramSocket (inet_ntoa (((struct sockaddr_in*) &address)->sin_addr),
ntohs (((struct sockaddr_in*) &address)->sin_port),
-1, -1);
}
return nullptr;
return isBound ? SocketHelpers::getBoundPort (handle) : -1;
}
//==============================================================================
int DatagramSocket::waitUntilReady (const bool readyForReading,
const int timeoutMsecs) const
{
return connected ? SocketHelpers::waitForReadiness (handle, readyForReading, timeoutMsecs)
: -1;
return SocketHelpers::waitForReadiness (handle, readLock, readyForReading, timeoutMsecs);
}
int DatagramSocket::read (void* destBuffer, int maxBytesToRead, bool shouldBlock)
{
return connected ? SocketHelpers::readSocket (handle, destBuffer, maxBytesToRead,
connected, shouldBlock, nullptr)
: -1;
bool connected = true;
return isBound ? SocketHelpers::readSocket (handle, destBuffer, maxBytesToRead,
connected, shouldBlock, readLock) : -1;
}
int DatagramSocket::read (void* destBuffer, int maxBytesToRead, bool shouldBlock, String& senderIPAddress)
int DatagramSocket::read (void* destBuffer, int maxBytesToRead, bool shouldBlock, String& senderIPAddress, int& senderPort)
{
return connected ? SocketHelpers::readSocket (handle, destBuffer, maxBytesToRead,
connected, shouldBlock, &senderIPAddress)
: -1;
bool connected = true;
return isBound ? SocketHelpers::readSocket (handle, destBuffer, maxBytesToRead, connected,
shouldBlock, readLock, &senderIPAddress, &senderPort) : -1;
}
int DatagramSocket::write (const void* sourceBuffer, const int numBytesToWrite)
int DatagramSocket::write (const String& remoteHostname, int remotePortNumber,
const void* sourceBuffer, int numBytesToWrite)
{
// You need to call connect() first to set the server address..
jassert (serverAddress != nullptr && connected);
struct addrinfo*& info = reinterpret_cast <struct addrinfo*&> (lastServerAddress);
return connected ? (int) ::sendto (handle, (const char*) sourceBuffer,
(size_t) numBytesToWrite, 0,
static_cast <const struct addrinfo*> (serverAddress)->ai_addr,
(juce_socklen_t) static_cast <const struct addrinfo*> (serverAddress)->ai_addrlen)
: -1;
}
// getaddrinfo can be quite slow so cache the result of the address lookup
if (info == nullptr || remoteHostname != lastServerHost || remotePortNumber != lastServerPort)
{
if (info != nullptr)
freeaddrinfo (info);
bool DatagramSocket::isLocal() const noexcept
{
return hostName == "127.0.0.1";
if ((info = SocketHelpers::getAddressInfo (true, remoteHostname, remotePortNumber)) == nullptr)
return -1;
lastServerHost = remoteHostname;
lastServerPort = remotePortNumber;
}
return (int) ::sendto (handle, (const char*) sourceBuffer,
(size_t) numBytesToWrite, 0, info->ai_addr, info->ai_addrlen);
}
#if JUCE_MSVC

View file

@ -65,6 +65,13 @@ public:
*/
bool bindToPort (int localPortNumber);
/** Returns the local port number to which this socket is currently bound.
This is useful if you need to know to which port the OS has actually bound your
socket when calling the constructor or bindToPort with zero as the
localPortNumber argument. Returns -1 if the function fails. */
int getBoundPort() const noexcept;
/** Tries to connect the socket to hostname:port.
If timeOutMillisecs is 0, then this method will block until the operating system
@ -164,6 +171,7 @@ private:
String hostName;
int volatile portNumber, handle;
bool connected, isListener;
mutable CriticalSection readLock;
StreamingSocket (const String& hostname, int portNumber, int handle);
@ -185,22 +193,16 @@ class JUCE_API DatagramSocket
public:
//==============================================================================
/**
Creates an (uninitialised) datagram socket.
Creates a datagram socket.
The localPortNumber is the port on which to bind this socket. If this value is 0,
the port number is assigned by the operating system.
To use the socket for sending, call the connect() method. This will not immediately
make a connection, but will save the destination you've provided. After this, you can
call read() or write().
You first need to bind this socket to a port with bindToPort if you intend to read
from this socket.
If enableBroadcasting is true, the socket will be allowed to send broadcast messages
(may require extra privileges on linux)
To wait for other sockets to connect to this one, call waitForNextConnection().
*/
DatagramSocket (int localPortNumber,
bool enableBroadcasting = false);
DatagramSocket (bool enableBroadcasting = false);
/** Destructor. */
~DatagramSocket();
@ -208,37 +210,21 @@ public:
//==============================================================================
/** Binds the socket to the specified local port.
The localPortNumber is the port on which to bind this socket. If this value is 0,
the port number is assigned by the operating system.
@returns true on success; false may indicate that another socket is already bound
on the same port
*/
bool bindToPort (int localPortNumber);
/** Tries to connect the socket to hostname:port.
/** Returns the local port number to which this socket is currently bound.
If timeOutMillisecs is 0, then this method will block until the operating system
rejects the connection (which could take a long time).
This is useful if you need to know to which port the OS has actually bound your
socket when bindToPort was called with zero.
@returns true if it succeeds.
@see isConnected
*/
bool connect (const String& remoteHostname,
int remotePortNumber,
int timeOutMillisecs = 3000);
/** True if the socket is currently connected. */
bool isConnected() const noexcept { return connected; }
/** Closes the connection. */
void close();
/** Returns the name of the currently connected host. */
const String& getHostName() const noexcept { return hostName; }
/** Returns the port number that's currently open. */
int getPort() const noexcept { return portNumber; }
/** True if the socket is connected to this machine rather than over the network. */
bool isLocal() const noexcept;
Returns -1 if the socket didn't bind to any port yet or an error occured. */
int getBoundPort() const noexcept;
/** Returns the OS's socket handle that's currently open. */
int getRawSocketHandle() const noexcept { return handle; }
@ -284,7 +270,7 @@ public:
*/
int read (void* destBuffer, int maxBytesToRead,
bool blockUntilSpecifiedAmountHasArrived,
String& senderIPAddress);
String& senderIPAddress, int& senderPortNumber);
/** Writes bytes to the socket from a buffer.
@ -293,25 +279,17 @@ public:
@returns the number of bytes written, or -1 if there was an error.
*/
int write (const void* sourceBuffer, int numBytesToWrite);
//==============================================================================
/** This waits for incoming data to be sent, and returns a socket that can be used
to read it.
The object that gets returned is owned by the caller, and can't be used for
sending, but can be used to read the data.
*/
DatagramSocket* waitForNextConnection() const;
int write (const String& remoteHostname, int remotePortNumber,
const void* sourceBuffer, int numBytesToWrite);
private:
//==============================================================================
String hostName;
int volatile portNumber, handle;
bool connected, allowBroadcast;
void* serverAddress;
DatagramSocket (const String& hostname, int portNumber, int handle, int localPortNumber);
int handle;
bool isBound;
String lastServerHost;
int lastServerPort;
void* lastServerAddress;
mutable CriticalSection readLock;
JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR (DatagramSocket)
};