From 41b38f59ed5cfcdfc6db1ffb435a86b26626a0b9 Mon Sep 17 00:00:00 2001 From: hogliux Date: Mon, 13 Apr 2015 12:56:23 +0100 Subject: [PATCH] Re-write DatagramSocket class and general clean-up in socket code --- modules/juce_core/network/juce_Socket.cpp | 385 +++++++++++----------- modules/juce_core/network/juce_Socket.h | 82 ++--- 2 files changed, 231 insertions(+), 236 deletions(-) diff --git a/modules/juce_core/network/juce_Socket.cpp b/modules/juce_core/network/juce_Socket.cpp index 08cd0214f2..ae9a750acf 100644 --- a/modules/juce_core/network/juce_Socket.cpp +++ b/modules/juce_core/network/juce_Socket.cpp @@ -75,9 +75,59 @@ namespace SocketHelpers : (setsockopt (handle, IPPROTO_TCP, TCP_NODELAY, (const char*) &one, sizeof (one)) == 0)); } + static void closeSocket (volatile int& handle, CriticalSection& readLock, + const bool isListener, int portNumber, bool& connected) noexcept + { + const SocketHandle h = handle; + handle = -1; + + #if JUCE_WINDOWS + ignoreUnused (portNumber, isListener, readLock); + + if (h != SOCKET_ERROR || connected) + closesocket (h); + + // make sure any read process finishes before we delete the socket + CriticalSection::ScopedLockType lock(readLock); + connected = false; + #else + if (connected) + { + connected = false; + + if (isListener) + { + // need to do this to interrupt the accept() function.. + StreamingSocket temp; + temp.connect (IPAddress::local().toString(), portNumber, 1000); + } + } + + if (h != -1) + { + // unblock any pending read requests + ::shutdown (h, SHUT_RDWR); + { + // see man-page of recv on linux about a race condition where the + // shutdown command is lost if the receiving thread does not have + // a chance to process before close is called. On Mac OS X shutdown + // does not unblock a select call, so using a lock here will dead-lock + // both threads. + #if JUCE_LINUX + CriticalSection::ScopedLockType lock (readLock); + ::close (h); + #else + ::close (h); + CriticalSection::ScopedLockType lock(readLock); + #endif + } + } + #endif + } + static bool bindSocketToPort (const SocketHandle handle, const int port) noexcept { - if (handle <= 0 || port <= 0) + if (handle <= 0 || port < 0) return false; struct sockaddr_in servTmpAddr; @@ -89,32 +139,57 @@ namespace SocketHelpers return bind (handle, (struct sockaddr*) &servTmpAddr, sizeof (struct sockaddr_in)) >= 0; } + static int getBoundPort (const SocketHandle handle) noexcept + { + if (handle <= 0) + return -1; + + struct sockaddr_in sin; + socklen_t len = sizeof(sin); + + if (getsockname (handle, (struct sockaddr *)&sin, &len) == 0) + return ntohs (sin.sin_port); + + return -1; + } + static int readSocket (const SocketHandle handle, void* const destBuffer, const ssize_t maxBytesToRead, bool volatile& connected, const bool blockUntilSpecifiedAmountHasArrived, - String* senderIP) noexcept + CriticalSection& readLock, + String* senderIP = nullptr, + int* senderPort = nullptr) noexcept { ssize_t bytesRead = 0; while (bytesRead < maxBytesToRead) { - ssize_t bytesThisTime; + ssize_t bytesThisTime = -1; char* const buffer = static_cast (destBuffer) + bytesRead; const size_t numToRead = (size_t) (maxBytesToRead - bytesRead); - if (senderIP == nullptr) { - bytesThisTime = ::recv (handle, buffer, numToRead, 0); - } - else - { - sockaddr_in client; - socklen_t clientLen = sizeof (sockaddr); + // avoid race-condition + CriticalSection::ScopedTryLockType lock (readLock); - bytesThisTime = (int) ::recvfrom (handle, buffer, numToRead, 0, (sockaddr*) &client, &clientLen); + if (lock.isLocked()) + { + if (senderIP == nullptr || senderPort == nullptr) + { + bytesThisTime = ::recv (handle, buffer, numToRead, 0); + } + else + { + sockaddr_in client; + socklen_t clientLen = sizeof (sockaddr); - *senderIP = String::fromUTF8 (inet_ntoa (client.sin_addr), 16); + bytesThisTime = (int) ::recvfrom (handle, buffer, numToRead, 0, (sockaddr*) &client, &clientLen); + + *senderIP = String::fromUTF8 (inet_ntoa (client.sin_addr), 16); + *senderPort = ntohs (client.sin_port); + } + } } if (bytesThisTime <= 0 || ! connected) @@ -134,8 +209,17 @@ namespace SocketHelpers return (int) bytesRead; } - static int waitForReadiness (const SocketHandle handle, const bool forReading, const int timeoutMsecs) noexcept + static int waitForReadiness (const volatile int& handle, CriticalSection& readLock, + const bool forReading, const int timeoutMsecs) noexcept { + // avoid race-condition + CriticalSection::ScopedTryLockType lock (readLock); + + if (! lock.isLocked()) + return -1; + + int h = handle; + struct timeval timeout; struct timeval* timeoutp; @@ -152,20 +236,20 @@ namespace SocketHelpers fd_set rset, wset; FD_ZERO (&rset); - FD_SET (handle, &rset); + FD_SET (h, &rset); FD_ZERO (&wset); - FD_SET (handle, &wset); + FD_SET (h, &wset); fd_set* const prset = forReading ? &rset : nullptr; fd_set* const pwset = forReading ? nullptr : &wset; #if JUCE_WINDOWS - if (select ((int) handle + 1, prset, pwset, 0, timeoutp) < 0) + if (select ((int) h + 1, prset, pwset, 0, timeoutp) < 0) return -1; #else { int result; - while ((result = select (handle + 1, prset, pwset, 0, timeoutp)) < 0 + while ((result = select (h + 1, prset, pwset, 0, timeoutp)) < 0 && errno == EINTR) { } @@ -175,16 +259,20 @@ namespace SocketHelpers } #endif + // we are closing + if (handle < 0) + return -1; + { int opt; juce_socklen_t len = sizeof (opt); - if (getsockopt (handle, SOL_SOCKET, SO_ERROR, (char*) &opt, &len) < 0 + if (getsockopt (h, SOL_SOCKET, SO_ERROR, (char*) &opt, &len) < 0 || opt != 0) return -1; } - return FD_ISSET (handle, forReading ? &rset : &wset) ? 1 : 0; + return FD_ISSET (h, forReading ? &rset : &wset) ? 1 : 0; } static bool setSocketBlockingState (const SocketHandle handle, const bool shouldBlock) noexcept @@ -207,12 +295,7 @@ namespace SocketHelpers #endif } - static bool connectSocket (int volatile& handle, - const bool isDatagram, - struct addrinfo** const serverAddress, - const String& hostName, - const int portNumber, - const int timeOutMillisecs) noexcept + static addrinfo* getAddressInfo (const bool isDatagram, const String& hostName, int portNumber) { struct addrinfo hints; zerostruct (hints); @@ -222,52 +305,57 @@ namespace SocketHelpers hints.ai_flags = AI_NUMERICSERV; struct addrinfo* info = nullptr; - if (getaddrinfo (hostName.toUTF8(), String (portNumber).toUTF8(), &hints, &info) != 0 - || info == nullptr) - return false; + if (getaddrinfo (hostName.toUTF8(), String (portNumber).toUTF8(), &hints, &info) == 0 + && info != nullptr) + return info; - if (handle < 0) - handle = (int) socket (info->ai_family, info->ai_socktype, 0); + return nullptr; + } - if (handle < 0) + static bool connectSocket (int volatile& handle, + CriticalSection& readLock, + const String& hostName, + const int portNumber, + const int timeOutMillisecs) noexcept + { + if (struct addrinfo* info = getAddressInfo (false, hostName, portNumber)) { + if (handle < 0) + handle = (int) socket (info->ai_family, info->ai_socktype, 0); + + if (handle < 0) + { + freeaddrinfo (info); + return false; + } + + setSocketBlockingState (handle, false); + const int result = ::connect (handle, info->ai_addr, (socklen_t) info->ai_addrlen); freeaddrinfo (info); - return false; - } - if (isDatagram) - { - if (*serverAddress != nullptr) - freeaddrinfo (*serverAddress); + if (result < 0) + { + #if JUCE_WINDOWS + if (result == SOCKET_ERROR && WSAGetLastError() == WSAEWOULDBLOCK) + #else + if (errno == EINPROGRESS) + #endif + { + if (waitForReadiness (handle, readLock, false, timeOutMillisecs) != 1) + { + setSocketBlockingState (handle, true); + return false; + } + } + } + + setSocketBlockingState (handle, true); + resetSocketOptions (handle, false, false); - *serverAddress = info; return true; } - setSocketBlockingState (handle, false); - const int result = ::connect (handle, info->ai_addr, (socklen_t) info->ai_addrlen); - freeaddrinfo (info); - - if (result < 0) - { - #if JUCE_WINDOWS - if (result == SOCKET_ERROR && WSAGetLastError() == WSAEWOULDBLOCK) - #else - if (errno == EINPROGRESS) - #endif - { - if (waitForReadiness (handle, false, timeOutMillisecs) != 1) - { - setSocketBlockingState (handle, true); - return false; - } - } - } - - setSocketBlockingState (handle, true); - resetSocketOptions (handle, false, false); - - return true; + return false; } static void makeReusable (int handle) noexcept @@ -306,7 +394,8 @@ StreamingSocket::~StreamingSocket() //============================================================================== int StreamingSocket::read (void* destBuffer, const int maxBytesToRead, bool shouldBlock) { - return (connected && ! isListener) ? SocketHelpers::readSocket (handle, destBuffer, maxBytesToRead, connected, shouldBlock, nullptr) + return (connected && ! isListener) ? SocketHelpers::readSocket (handle, destBuffer, maxBytesToRead, + connected, shouldBlock, readLock) : -1; } @@ -322,7 +411,7 @@ int StreamingSocket::write (const void* sourceBuffer, const int numBytesToWrite) int StreamingSocket::waitUntilReady (const bool readyForReading, const int timeoutMsecs) const { - return connected ? SocketHelpers::waitForReadiness (handle, readyForReading, timeoutMsecs) + return connected ? SocketHelpers::waitForReadiness (handle, readLock, readyForReading, timeoutMsecs) : -1; } @@ -332,6 +421,11 @@ bool StreamingSocket::bindToPort (const int port) return SocketHelpers::bindSocketToPort (handle, port); } +int StreamingSocket::getBoundPort() const noexcept +{ + return SocketHelpers::getBoundPort (handle); +} + bool StreamingSocket::connect (const String& remoteHostName, const int remotePortNumber, const int timeOutMillisecs) @@ -349,7 +443,7 @@ bool StreamingSocket::connect (const String& remoteHostName, portNumber = remotePortNumber; isListener = false; - connected = SocketHelpers::connectSocket (handle, false, nullptr, remoteHostName, + connected = SocketHelpers::connectSocket (handle, readLock, remoteHostName, remotePortNumber, timeOutMillisecs); if (! (connected && SocketHelpers::resetSocketOptions (handle, false, false))) @@ -363,30 +457,7 @@ bool StreamingSocket::connect (const String& remoteHostName, void StreamingSocket::close() { - #if JUCE_WINDOWS - if (handle != SOCKET_ERROR || connected) - closesocket (handle); - - connected = false; - #else - if (connected) - { - connected = false; - - if (isListener) - { - // need to do this to interrupt the accept() function.. - StreamingSocket temp; - temp.connect (IPAddress::local().toString(), portNumber, 1000); - } - } - - if (handle != -1) - { - ::shutdown (handle, SHUT_RDWR); - ::close (handle); - } - #endif + SocketHelpers::closeSocket (handle, readLock, isListener, portNumber, connected); hostName.clear(); portNumber = 0; @@ -463,140 +534,86 @@ bool StreamingSocket::isLocal() const noexcept //============================================================================== //============================================================================== -DatagramSocket::DatagramSocket (const int localPortNumber, const bool canBroadcast) - : portNumber (0), - handle (-1), - connected (true), - allowBroadcast (canBroadcast), - serverAddress (nullptr) +DatagramSocket::DatagramSocket (const bool canBroadcast) + : handle (-1), + isBound (false), + lastServerPort (-1), + lastServerAddress (nullptr) { SocketHelpers::initSockets(); handle = (int) socket (AF_INET, SOCK_DGRAM, 0); + SocketHelpers::resetSocketOptions (handle, true, canBroadcast); SocketHelpers::makeReusable (handle); - bindToPort (localPortNumber); -} - -DatagramSocket::DatagramSocket (const String& host, const int portNum, - const int h, const int localPortNumber) - : hostName (host), - portNumber (portNum), - handle (h), - connected (true), - allowBroadcast (false), - serverAddress (nullptr) -{ - SocketHelpers::initSockets(); - - SocketHelpers::resetSocketOptions (h, true, allowBroadcast); - bindToPort (localPortNumber); } DatagramSocket::~DatagramSocket() { - close(); + if (lastServerAddress != nullptr) + freeaddrinfo (static_cast (lastServerAddress)); - if (serverAddress != nullptr) - freeaddrinfo (static_cast (serverAddress)); + bool connected = false; + SocketHelpers::closeSocket (handle, readLock, false, 0, connected); } -void DatagramSocket::close() -{ - #if JUCE_WINDOWS - closesocket (handle); - connected = false; - #else - connected = false; - ::close (handle); - #endif - - hostName.clear(); - portNumber = 0; - handle = -1; -} bool DatagramSocket::bindToPort (const int port) { - return SocketHelpers::bindSocketToPort (handle, port); -} - -bool DatagramSocket::connect (const String& remoteHostName, - const int remotePortNumber, - const int timeOutMillisecs) -{ - if (connected) - close(); - - hostName = remoteHostName; - portNumber = remotePortNumber; - - connected = SocketHelpers::connectSocket (handle, true, (struct addrinfo**) &serverAddress, - remoteHostName, remotePortNumber, - timeOutMillisecs); - - if (! (connected && SocketHelpers::resetSocketOptions (handle, true, allowBroadcast))) + if (SocketHelpers::bindSocketToPort (handle, port)) { - close(); - return false; + isBound = true; + return true; } - return true; + return false; } -DatagramSocket* DatagramSocket::waitForNextConnection() const +int DatagramSocket::getBoundPort() const noexcept { - while (waitUntilReady (true, -1) == 1) - { - struct sockaddr_storage address; - juce_socklen_t len = sizeof (address); - char buf[1]; - - if (recvfrom (handle, buf, 0, 0, (struct sockaddr*) &address, &len) > 0) - return new DatagramSocket (inet_ntoa (((struct sockaddr_in*) &address)->sin_addr), - ntohs (((struct sockaddr_in*) &address)->sin_port), - -1, -1); - } - - return nullptr; + return isBound ? SocketHelpers::getBoundPort (handle) : -1; } //============================================================================== int DatagramSocket::waitUntilReady (const bool readyForReading, const int timeoutMsecs) const { - return connected ? SocketHelpers::waitForReadiness (handle, readyForReading, timeoutMsecs) - : -1; + return SocketHelpers::waitForReadiness (handle, readLock, readyForReading, timeoutMsecs); } int DatagramSocket::read (void* destBuffer, int maxBytesToRead, bool shouldBlock) { - return connected ? SocketHelpers::readSocket (handle, destBuffer, maxBytesToRead, - connected, shouldBlock, nullptr) - : -1; + bool connected = true; + return isBound ? SocketHelpers::readSocket (handle, destBuffer, maxBytesToRead, + connected, shouldBlock, readLock) : -1; } -int DatagramSocket::read (void* destBuffer, int maxBytesToRead, bool shouldBlock, String& senderIPAddress) +int DatagramSocket::read (void* destBuffer, int maxBytesToRead, bool shouldBlock, String& senderIPAddress, int& senderPort) { - return connected ? SocketHelpers::readSocket (handle, destBuffer, maxBytesToRead, - connected, shouldBlock, &senderIPAddress) - : -1; + bool connected = true; + return isBound ? SocketHelpers::readSocket (handle, destBuffer, maxBytesToRead, connected, + shouldBlock, readLock, &senderIPAddress, &senderPort) : -1; } -int DatagramSocket::write (const void* sourceBuffer, const int numBytesToWrite) +int DatagramSocket::write (const String& remoteHostname, int remotePortNumber, + const void* sourceBuffer, int numBytesToWrite) { - // You need to call connect() first to set the server address.. - jassert (serverAddress != nullptr && connected); + struct addrinfo*& info = reinterpret_cast (lastServerAddress); - return connected ? (int) ::sendto (handle, (const char*) sourceBuffer, - (size_t) numBytesToWrite, 0, - static_cast (serverAddress)->ai_addr, - (juce_socklen_t) static_cast (serverAddress)->ai_addrlen) - : -1; -} + // getaddrinfo can be quite slow so cache the result of the address lookup + if (info == nullptr || remoteHostname != lastServerHost || remotePortNumber != lastServerPort) + { + if (info != nullptr) + freeaddrinfo (info); -bool DatagramSocket::isLocal() const noexcept -{ - return hostName == "127.0.0.1"; + if ((info = SocketHelpers::getAddressInfo (true, remoteHostname, remotePortNumber)) == nullptr) + return -1; + + lastServerHost = remoteHostname; + lastServerPort = remotePortNumber; + } + + return (int) ::sendto (handle, (const char*) sourceBuffer, + (size_t) numBytesToWrite, 0, info->ai_addr, info->ai_addrlen); } #if JUCE_MSVC diff --git a/modules/juce_core/network/juce_Socket.h b/modules/juce_core/network/juce_Socket.h index 97826aa3e1..778da333b0 100644 --- a/modules/juce_core/network/juce_Socket.h +++ b/modules/juce_core/network/juce_Socket.h @@ -65,6 +65,13 @@ public: */ bool bindToPort (int localPortNumber); + /** Returns the local port number to which this socket is currently bound. + + This is useful if you need to know to which port the OS has actually bound your + socket when calling the constructor or bindToPort with zero as the + localPortNumber argument. Returns -1 if the function fails. */ + int getBoundPort() const noexcept; + /** Tries to connect the socket to hostname:port. If timeOutMillisecs is 0, then this method will block until the operating system @@ -164,6 +171,7 @@ private: String hostName; int volatile portNumber, handle; bool connected, isListener; + mutable CriticalSection readLock; StreamingSocket (const String& hostname, int portNumber, int handle); @@ -185,22 +193,16 @@ class JUCE_API DatagramSocket public: //============================================================================== /** - Creates an (uninitialised) datagram socket. + Creates a datagram socket. - The localPortNumber is the port on which to bind this socket. If this value is 0, - the port number is assigned by the operating system. - - To use the socket for sending, call the connect() method. This will not immediately - make a connection, but will save the destination you've provided. After this, you can - call read() or write(). + You first need to bind this socket to a port with bindToPort if you intend to read + from this socket. If enableBroadcasting is true, the socket will be allowed to send broadcast messages (may require extra privileges on linux) - - To wait for other sockets to connect to this one, call waitForNextConnection(). */ - DatagramSocket (int localPortNumber, - bool enableBroadcasting = false); + DatagramSocket (bool enableBroadcasting = false); + /** Destructor. */ ~DatagramSocket(); @@ -208,37 +210,21 @@ public: //============================================================================== /** Binds the socket to the specified local port. + The localPortNumber is the port on which to bind this socket. If this value is 0, + the port number is assigned by the operating system. + @returns true on success; false may indicate that another socket is already bound on the same port */ bool bindToPort (int localPortNumber); - /** Tries to connect the socket to hostname:port. + /** Returns the local port number to which this socket is currently bound. - If timeOutMillisecs is 0, then this method will block until the operating system - rejects the connection (which could take a long time). + This is useful if you need to know to which port the OS has actually bound your + socket when bindToPort was called with zero. - @returns true if it succeeds. - @see isConnected - */ - bool connect (const String& remoteHostname, - int remotePortNumber, - int timeOutMillisecs = 3000); - - /** True if the socket is currently connected. */ - bool isConnected() const noexcept { return connected; } - - /** Closes the connection. */ - void close(); - - /** Returns the name of the currently connected host. */ - const String& getHostName() const noexcept { return hostName; } - - /** Returns the port number that's currently open. */ - int getPort() const noexcept { return portNumber; } - - /** True if the socket is connected to this machine rather than over the network. */ - bool isLocal() const noexcept; + Returns -1 if the socket didn't bind to any port yet or an error occured. */ + int getBoundPort() const noexcept; /** Returns the OS's socket handle that's currently open. */ int getRawSocketHandle() const noexcept { return handle; } @@ -284,7 +270,7 @@ public: */ int read (void* destBuffer, int maxBytesToRead, bool blockUntilSpecifiedAmountHasArrived, - String& senderIPAddress); + String& senderIPAddress, int& senderPortNumber); /** Writes bytes to the socket from a buffer. @@ -293,25 +279,17 @@ public: @returns the number of bytes written, or -1 if there was an error. */ - int write (const void* sourceBuffer, int numBytesToWrite); - - //============================================================================== - /** This waits for incoming data to be sent, and returns a socket that can be used - to read it. - - The object that gets returned is owned by the caller, and can't be used for - sending, but can be used to read the data. - */ - DatagramSocket* waitForNextConnection() const; + int write (const String& remoteHostname, int remotePortNumber, + const void* sourceBuffer, int numBytesToWrite); private: //============================================================================== - String hostName; - int volatile portNumber, handle; - bool connected, allowBroadcast; - void* serverAddress; - - DatagramSocket (const String& hostname, int portNumber, int handle, int localPortNumber); + int handle; + bool isBound; + String lastServerHost; + int lastServerPort; + void* lastServerAddress; + mutable CriticalSection readLock; JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR (DatagramSocket) };