fix(PollSet): Integrate windows epoll #2091, #3649

This commit is contained in:
Alex Fabijanic
2022-07-06 11:13:50 +02:00
parent 81696487a0
commit 31a49c0af2
36 changed files with 4269 additions and 3690 deletions

View File

@@ -19,8 +19,14 @@
#if defined(POCO_HAVE_FD_EPOLL)
#include <sys/epoll.h>
#include <sys/eventfd.h>
#ifdef POCO_OS_FAMILY_WINDOWS
#include "Poco/Net/ServerSocket.h"
#include "Poco/Net/SocketAddress.h"
#include "wepoll.h"
#else
#include <sys/epoll.h>
#include <sys/eventfd.h>
#endif
#elif defined(POCO_HAVE_FD_POLL)
#ifndef _WIN32
#include <poll.h>
@@ -35,10 +41,25 @@ namespace Net {
#if defined(POCO_HAVE_FD_EPOLL)
//
// Implementation using epoll (Linux) or wepoll (Windows)
//
#ifdef WEPOLL_H_
namespace {
int close(HANDLE h)
{
return epoll_close(h);
}
}
#endif // WEPOLL_H_
//
// Linux implementation using epoll
//
class PollSetImpl
{
public:
@@ -47,9 +68,9 @@ public:
using SocketMode = std::pair<Socket, int>;
using SocketMap = std::map<void*, SocketMode>;
PollSetImpl(): _epollfd(epoll_create(1)),
_events(1024),
_eventfd(eventfd(0, 0))
PollSetImpl(): _events(1024),
_eventfd(eventfd(_port)),
_epollfd(epoll_create(1))
{
int err = addFD(_eventfd, PollSet::POLL_READ, EPOLL_CTL_ADD);
if ((err) || (_epollfd < 0))
@@ -60,8 +81,12 @@ public:
~PollSetImpl()
{
::close(_eventfd.exchange(0));
if (_epollfd >= 0) ::close(_epollfd);
#ifdef WEPOLL_H_
if (_eventfd >= 0) eventfd(_port, _eventfd);
#else
if (_eventfd > 0) close(_eventfd.exchange(0));
#endif
if (_epollfd >= 0) close(_epollfd);
}
void add(const Socket& socket, int mode)
@@ -118,8 +143,13 @@ public:
_epollfd = epoll_create(1);
if (_epollfd < 0) SocketImpl::error();
}
#ifdef WEPOLL_H_
eventfd(_port, _eventfd);
_eventfd = eventfd(_port);
#else
close(_eventfd.exchange(0));
_eventfd = eventfd(0, 0);
#endif
addFD(_eventfd, PollSet::POLL_READ, EPOLL_CTL_ADD);
}
@@ -132,7 +162,8 @@ public:
do
{
Poco::Timestamp start;
rc = epoll_wait(_epollfd, &_events[0], _events.size(), remainingTime.totalMilliseconds());
rc = epoll_wait(_epollfd, &_events[0],
static_cast<int>(_events.size()), static_cast<int>(remainingTime.totalMilliseconds()));
if (rc == 0) return result;
// if we are hitting the events limit, resize it; even without resizing, the subseqent
@@ -174,11 +205,15 @@ public:
void wakeUp()
{
#ifdef WEPOLL_H_
StreamSocket ss(SocketAddress("127.0.0.1", _port));
#else
uint64_t val = 1;
// This is guaranteed to write into a valid fd,
// or 0 (meaning PollSet is being destroyed).
// Errors are ignored.
write(_eventfd, &val, sizeof(val));
#endif
}
int count() const
@@ -207,7 +242,7 @@ private:
int updateImpl(const Socket& socket, int mode)
{
SocketImpl* sockImpl = socket.impl();
int ret = addFD(sockImpl->sockfd(), mode, EPOLL_CTL_MOD, sockImpl);
int ret = addFD(static_cast<int>(sockImpl->sockfd()), mode, EPOLL_CTL_MOD, sockImpl);
if (ret == 0) socketMapUpdate(socket, mode);
return ret;
}
@@ -216,7 +251,7 @@ private:
{
SocketImpl* sockImpl = socket.impl();
int newMode = getNewMode(sockImpl, mode);
int ret = addFD(sockImpl->sockfd(), newMode, EPOLL_CTL_ADD, sockImpl);
int ret = addFD(static_cast<int>(sockImpl->sockfd()), newMode, EPOLL_CTL_ADD, sockImpl);
if (ret == 0) socketMapUpdate(socket, newMode);
return ret;
}
@@ -235,11 +270,38 @@ private:
return epoll_ctl(_epollfd, op, fd, &ev);
}
#ifdef WEPOLL_H_
int eventfd(int& port, int rmFD = 0)
{
if (rmFD == 0)
{
_pSocket = new ServerSocket(SocketAddress("127.0.0.1", 0));
port = _pSocket->address().port();
return static_cast<int>(_pSocket->impl()->sockfd());
}
else
{
delete _pSocket;
_pSocket = 0;
port = 0;
}
return 0;
}
#endif // WEPOLL_H_
mutable Mutex _mutex;
std::atomic<int> _epollfd;
SocketMap _socketMap;
std::vector<struct epoll_event> _events;
int _port = 0;
std::atomic<int> _eventfd;
#ifdef WEPOLL_H_
std::atomic <HANDLE> _epollfd;
ServerSocket* _pSocket;
#else
std::atomic<int> _epollfd;
#endif
};
@@ -359,11 +421,7 @@ public:
do
{
Poco::Timestamp start;
#ifdef _WIN32
rc = WSAPoll(&_pollfds[0], static_cast<ULONG>(_pollfds.size()), static_cast<INT>(remainingTime.totalMilliseconds()));
#else
rc = ::poll(&_pollfds[0], _pollfds.size(), remainingTime.totalMilliseconds());
#endif
if (rc < 0 && SocketImpl::lastError() == POCO_EINTR)
{
Poco::Timestamp end;
@@ -393,11 +451,7 @@ public:
std::map<poco_socket_t, Socket>::const_iterator its = _socketMap.find(it->fd);
if (its != _socketMap.end())
{
if ((it->revents & POLLIN)
#ifdef _WIN32
|| (it->revents & POLLHUP)
#endif
)
if (it->revents & POLLIN)
result[its->second] |= PollSet::POLL_READ;
if (it->revents & POLLOUT)
result[its->second] |= PollSet::POLL_WRITE;
@@ -440,302 +494,7 @@ private:
std::map<poco_socket_t, int> _addMap;
std::set<poco_socket_t> _removeSet;
std::vector<pollfd> _pollfds;
Poco::Pipe _pipe;
/// Add _pipe to head of _pollfds used to wake up poll blocking
};
#else
#ifdef POCO_OS_FAMILY_WINDOWS
//
// Windows-specific implementation using select()
// The size of select set is determined at compile
// time (see FD_SETSIZE in SocketDefs.h).
//
// This implementation works around that limit by
// having multiple socket descriptor sets and,
// when needed, calling select() multiple times.
// To avoid multiple sets situtation, the FD_SETSIZE
// can be increased, however then Poco::Net library
// must be recompiled in order for the new setting
// to be in effect.
//
class PollSetImpl
{
public:
PollSetImpl() : _fdRead(1, {0, {0}}),
_fdWrite(1, {0, {0}}),
_fdExcept(1, {0, {0}}),
_pFDRead(std::make_unique<fd_set>()),
_pFDWrite(std::make_unique<fd_set>()),
_pFDExcept(std::make_unique<fd_set>()),
_nfd(0)
{
}
void add(const Socket& socket, int mode)
{
Poco::Net::SocketImpl* pImpl = socket.impl();
poco_check_ptr(pImpl);
Poco::FastMutex::ScopedLock lock(_mutex);
_map[socket] = mode;
setMode(pImpl->sockfd(), mode);
}
void remove(const Socket& socket)
{
Poco::Net::SocketImpl* pImpl = socket.impl();
poco_check_ptr(pImpl);
Poco::FastMutex::ScopedLock lock(_mutex);
remove(pImpl->sockfd());
_map.erase(socket);
}
bool has(const Socket& socket) const
{
Poco::FastMutex::ScopedLock lock(_mutex);
return _map.find(socket) != _map.end();
}
bool empty() const
{
Poco::FastMutex::ScopedLock lock(_mutex);
return _map.empty();
}
void update(const Socket& socket, int mode)
{
Poco::Net::SocketImpl* pImpl = socket.impl();
poco_check_ptr(pImpl);
SOCKET fd = pImpl->sockfd();
Poco::FastMutex::ScopedLock lock(_mutex);
_map[socket] = mode;
setMode(fd, mode);
if (!(mode & PollSet::POLL_READ)) remove(fd, _fdRead);
if (!(mode & PollSet::POLL_WRITE)) remove(fd, _fdWrite);
if (!(mode & PollSet::POLL_ERROR)) remove(fd, _fdExcept);
}
void clear()
{
Poco::FastMutex::ScopedLock lock(_mutex);
_map.clear();
for (auto& fd : _fdRead) std::memset(&fd, 0, sizeof(fd));
for (auto& fd : _fdWrite) std::memset(&fd, 0, sizeof(fd));
for (auto& fd : _fdExcept) std::memset(&fd, 0, sizeof(fd));
_nfd = 0;
}
PollSet::SocketModeMap poll(const Poco::Timespan& timeout)
{
Poco::Timestamp start;
poco_assert_dbg(_fdRead.size() == _fdWrite.size());
poco_assert_dbg(_fdWrite.size() == _fdExcept.size());
PollSet::SocketModeMap result;
if (_nfd == 0) return result;
Poco::Timespan remainingTime(timeout);
struct timeval tv {0, 1000};
Poco::FastMutex::ScopedLock lock(_mutex);
auto readIt = _fdRead.begin();
auto writeIt = _fdWrite.begin();
auto exceptIt = _fdExcept.begin();
do
{
std::memcpy(_pFDRead.get(), &*readIt, sizeof(fd_set));
std::memcpy(_pFDWrite.get(), &*writeIt, sizeof(fd_set));
std::memcpy(_pFDExcept.get(), &*exceptIt, sizeof(fd_set));
int rc;
do
{
rc = ::select((int)_nfd + 1, _pFDRead.get(), _pFDWrite.get(), _pFDExcept.get(), &tv);
} while (rc < 0 && SocketImpl::lastError() == POCO_EINTR);
if (rc < 0) SocketImpl::error();
else if (rc > 0)
{
for (auto it = _map.begin(); it != _map.end(); ++it)
{
poco_socket_t fd = it->first.impl()->sockfd();
if (fd != POCO_INVALID_SOCKET)
{
if (FD_ISSET(fd, _pFDRead.get()))
{
result[it->first] |= PollSet::POLL_READ;
}
if (FD_ISSET(fd, _pFDWrite.get()))
{
result[it->first] |= PollSet::POLL_WRITE;
}
if (FD_ISSET(fd, _pFDExcept.get()))
{
result[it->first] |= PollSet::POLL_ERROR;
}
}
}
}
Timespan elapsed = Timestamp() - start;
if (++readIt == _fdRead.end())
{
if ((rc > 0) || (elapsed.totalMilliseconds() > timeout.totalMilliseconds()))
break;
readIt = _fdRead.begin();
writeIt = _fdWrite.begin();
exceptIt = _fdExcept.begin();
}
else
{
++writeIt;
++exceptIt;
}
Poco::UInt64 tOut = (((Poco::UInt64)tv.tv_sec * 1000000) + tv.tv_usec) * 2;
Poco::Timespan left = timeout - elapsed;
if (tOut > left.totalMicroseconds())
tOut = left.totalMicroseconds();
tv.tv_sec = static_cast<long>(tOut / 1000000);
tv.tv_usec = tOut % 1000000;
} while (true);
return result;
}
int count() const
{
Poco::FastMutex::ScopedLock lock(_mutex);
return static_cast<int>(_map.size());
}
void wakeUp()
{
// TODO
}
private:
void setMode(std::vector<fd_set>& fdSet, SOCKET fd)
{
SOCKET* pFD = 0;
for (auto& fdr : fdSet)
{
SOCKET* begin = fdr.fd_array;
SOCKET* end = fdr.fd_array + fdr.fd_count;
pFD = std::find(begin, end, fd);
if (end != pFD)
{
FD_SET(fd, &fdr);
if (fd > _nfd) _nfd = fd;
return;
}
}
// not found, insert at first free location
for (auto& fdr : fdSet)
{
if (fdr.fd_count < FD_SETSIZE)
{
fdr.fd_count++;
fdr.fd_array[fdr.fd_count-1] = fd;
if (fd > _nfd) _nfd = fd;
return;
}
}
// all fd sets are full; insert another one
fdSet.push_back({0, {0}});
fd_set& fds = fdSet.back();
fds.fd_count = 1;
fds.fd_array[0] = fd;
if (fd > _nfd) _nfd = fd;
}
void setMode(SOCKET fd, int mode)
{
if (mode & PollSet::POLL_READ) setMode(_fdRead, fd);
if (mode & PollSet::POLL_WRITE) setMode(_fdWrite, fd);
if (mode & PollSet::POLL_ERROR) setMode(_fdExcept, fd);
}
void remove(SOCKET fd, std::vector<fd_set>& fdSets)
{
bool newNFD = false;
for (auto& fdSet : fdSets)
{
if (fdSet.fd_count)
{
newNFD = (fd == _nfd);
int i = 0;
for (; i < fdSet.fd_count; ++i)
{
if (fdSet.fd_array[i] == fd)
{
if (i == (fdSet.fd_count-1))
{
fdSet.fd_array[i] = 0;
}
else
{
for (; i < fdSet.fd_count-1; ++i)
{
fdSet.fd_array[i] = fdSet.fd_array[i+1];
if (newNFD && fdSet.fd_array[i] > _nfd)
_nfd = fdSet.fd_array[i];
}
}
fdSet.fd_array[fdSet.fd_count-1] = 0;
fdSet.fd_count--;
break;
}
if (newNFD && fdSet.fd_array[i] > _nfd)
_nfd = fdSet.fd_array[i];
}
}
}
if (newNFD)
{
findNFD(_fdRead);
findNFD(_fdWrite);
findNFD(_fdExcept);
}
}
void findNFD(std::vector<fd_set>& fdSets)
{
for (auto& fdSet : fdSets)
{
for (int i = 0; i < fdSet.fd_count; ++i)
{
if (fdSet.fd_array[i] > _nfd)
_nfd = fdSet.fd_array[i];
}
}
}
void remove(SOCKET fd)
{
remove(fd, _fdRead);
remove(fd, _fdWrite);
remove(fd, _fdExcept);
}
mutable Poco::FastMutex _mutex;
PollSet::SocketModeMap _map;
SOCKET _nfd;
std::vector<fd_set> _fdRead;
std::vector<fd_set> _fdWrite;
std::vector<fd_set> _fdExcept;
std::unique_ptr<fd_set> _pFDRead;
std::unique_ptr<fd_set> _pFDWrite;
std::unique_ptr<fd_set> _pFDExcept;
Poco::Pipe _pipe;
};
@@ -890,9 +649,6 @@ private:
};
#endif // POCO_OS_FAMILY_WINDOWS
#endif