26#ifndef MYSQL_HARNESS_NET_TS_IMPL_LINUX_EPOLL_IO_SERVICE_H_
27#define MYSQL_HARNESS_NET_TS_IMPL_LINUX_EPOLL_IO_SERVICE_H_
37#include <system_error>
38#include <unordered_map>
40#if defined(USE_EVENTFD)
41#include <sys/eventfd.h>
85#if defined(USE_EVENTFD)
86 notify_fd_ = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
103 auto non_block_wakeup_0_res =
105 if (!non_block_wakeup_0_res)
return non_block_wakeup_0_res;
106 auto non_block_wakeup_1_res =
108 if (!non_block_wakeup_1_res)
return non_block_wakeup_1_res;
127 }
while (res == -1 && errno == EINTR);
129 std::array<uint8_t, 256>
buf;
136 }
while (res > 0 || (res == -1 && errno == EINTR));
162 }
while ((res == -1) && (errno == EINTR));
168 }
while ((res == -1) && (errno == EINTR));
203 uint32_t new_events{};
206 new_events = EPOLLIN;
209 new_events = EPOLLOUT;
212 new_events = EPOLLERR | EPOLLHUP;
218 new_events |= EPOLLET;
221 new_events |= EPOLLONESHOT;
226 std::lock_guard<std::mutex> lk(b.mtx_);
227 const auto it = b.interest_.find(fd);
229 auto old_events = (it == b.interest_.end()) ? 0 : it->second;
230 auto merged_events = new_events | old_events;
239 if (!ctl_res)
return ctl_res;
243 if (!ctl_res)
return ctl_res;
247 if (it != b.interest_.end()) {
248 it->second = merged_events;
250 b.interest_.emplace(fd, merged_events);
261 std::lock_guard<std::mutex> lk(b.mtx_);
264 const auto it = b.interest_.find(fd);
265 if (it != b.interest_.end()) {
269 if (!epoll_ctl_res)
return epoll_ctl_res;
272 b.interest_.erase(it);
287 std::lock_guard<std::mutex> lk(b.mtx_);
289 const auto it = b.interest_.find(fd);
290 if (it == b.interest_.end()) {
301 const auto updated_fd_events =
interest.second & ~fd_events;
306 ev.events = updated_fd_events & ~kAlwaysEnabledEvents;
311 }
else if ((updated_fd_events &
kAllEvents) == 0) {
317 interest.second = updated_fd_events;
329 std::lock_guard<std::mutex> lk(b.mtx_);
331 const auto it = b.interest_.find(fd);
332 if (it == b.interest_.end()) {
340 if (!(
interest.second & EPOLLONESHOT)) {
371 if (fd_events != 0 &&
372 (fd_events & fd_interest) == 0) {
373 std::cerr <<
"after_event_fired(" << fd <<
", "
374 << std::bitset<32>(fd_events) <<
") not in "
375 << std::bitset<32>(fd_interest) << std::endl;
381 const auto updated_fd_events =
interest.second & ~fd_events;
395 ev.events = updated_fd_events & ~kAlwaysEnabledEvents;
400 }
else if ((updated_fd_events &
kAllEvents) == 0) {
407 interest.second = updated_fd_events;
415 std::lock_guard<std::mutex> lk(b.mtx_);
417 const auto it = b.interest_.find(fd);
418 if (it != b.interest_.end()) {
431 std::unordered_map<impl::socket::native_handle_type, uint32_t>
interest_;
436 const size_t ndx = fd %
buckets_.size();
442 const size_t ndx = fd %
buckets_.size();
470 if (ev.data.fd == fd) {
510 if (ev.events & EPOLLOUT) {
513 }
else if (ev.events & EPOLLIN) {
516 }
else if (ev.events & EPOLLERR) {
519 }
else if (ev.events & EPOLLHUP) {
525 if ((
fd_events_[ndx].events & (EPOLLIN | EPOLLOUT | EPOLLERR | EPOLLHUP)) ==
534 std::chrono::milliseconds
timeout) {
548 std::copy_n(evs.begin(), *res,
fd_events_.begin());
563 oss <<
"after_event_fired(" << ev.data.fd <<
", "
564 << std::bitset<32>(ev.events) <<
") " << after_res.error() <<
" "
565 << after_res.error().message() << std::endl;
566 std::cerr << oss.str();
584 std::chrono::milliseconds
timeout)
override {
602 if (ev_res.error() == std::errc::no_such_file_or_directory) {
653 std::pair<impl::file::file_handle_type, impl::file::file_handle_type>
Definition: io_service_base.h:87
impl::socket::native_handle_type native_handle_type
Definition: io_service_base.h:89
Definition: linux_epoll_io_service.h:198
stdx::expected< void, std::error_code > erase(int epfd, native_handle_type fd)
Definition: linux_epoll_io_service.h:256
stdx::expected< void, std::error_code > merge(int epfd, native_handle_type fd, impl::socket::wait_type wt, bool oneshot)
Definition: linux_epoll_io_service.h:200
std::optional< int32_t > interest(native_handle_type fd) const
Definition: linux_epoll_io_service.h:412
const locked_bucket & bucket(native_handle_type fd) const
Definition: linux_epoll_io_service.h:441
stdx::expected< void, std::error_code > after_event_fired(int epfd, native_handle_type fd, uint32_t revent)
update registered fd-interest after a oneshot event fired.
Definition: linux_epoll_io_service.h:325
std::array< locked_bucket, 101 > buckets_
Definition: linux_epoll_io_service.h:448
stdx::expected< void, std::error_code > remove_fd_interest(int epfd, native_handle_type fd, uint32_t revent)
Definition: linux_epoll_io_service.h:283
locked_bucket & bucket(native_handle_type fd)
Definition: linux_epoll_io_service.h:435
Definition: linux_epoll_io_service.h:58
stdx::expected< fd_event, std::error_code > update_fd_events(std::chrono::milliseconds timeout)
Definition: linux_epoll_io_service.h:533
~linux_epoll_io_service() override
Definition: linux_epoll_io_service.h:67
stdx::expected< void, std::error_code > add_fd_interest(native_handle_type fd, impl::socket::wait_type wt) override
Definition: linux_epoll_io_service.h:451
static constexpr const int kAlwaysEnabledEvents
Definition: linux_epoll_io_service.h:63
impl::file::file_handle_type notify_fd_
Definition: linux_epoll_io_service.h:656
stdx::expected< void, std::error_code > add_fd_interest_permanent(native_handle_type fd, impl::socket::wait_type wt)
Definition: linux_epoll_io_service.h:456
size_t fd_events_size_
Definition: linux_epoll_io_service.h:650
static constexpr const int kSettableEvents
Definition: linux_epoll_io_service.h:62
void notify() override
notify the poll_one() that something may have changed.
Definition: linux_epoll_io_service.h:145
stdx::expected< void, std::error_code > after_event_fired(int epfd, native_handle_type fd, uint32_t revents)
Definition: linux_epoll_io_service.h:658
std::mutex fd_events_mtx_
Definition: linux_epoll_io_service.h:647
bool is_open() const noexcept
Definition: linux_epoll_io_service.h:69
FdInterest registered_events_
Definition: linux_epoll_io_service.h:628
void on_notify()
Definition: linux_epoll_io_service.h:116
impl::file::file_handle_type epfd_
Definition: linux_epoll_io_service.h:651
stdx::expected< fd_event, std::error_code > pop_event()
Definition: linux_epoll_io_service.h:500
stdx::expected< fd_event, std::error_code > poll_one(std::chrono::milliseconds timeout) override
poll one event from the registered fd-interest.
Definition: linux_epoll_io_service.h:583
stdx::expected< void, std::error_code > close()
Definition: linux_epoll_io_service.h:172
static constexpr const int kAllEvents
Definition: linux_epoll_io_service.h:64
stdx::expected< void, std::error_code > open() noexcept override
open the io-service.
Definition: linux_epoll_io_service.h:76
std::array< epoll_event, 8192 > fd_events_
Definition: linux_epoll_io_service.h:648
stdx::expected< void, std::error_code > remove_fd(native_handle_type fd) override
Definition: linux_epoll_io_service.h:461
size_t fd_events_processed_
Definition: linux_epoll_io_service.h:649
stdx::expected< void, std::error_code > remove_fd_interest(native_handle_type fd, uint32_t revents)
Definition: linux_epoll_io_service.h:486
std::optional< int32_t > interest(native_handle_type fd) const
get current fd-interest.
Definition: linux_epoll_io_service.h:496
std::pair< impl::file::file_handle_type, impl::file::file_handle_type > wakeup_fds_
Definition: linux_epoll_io_service.h:654
Definition: expected.h:286
static char buf[MAX_BUF]
Definition: conf_to_src.cc:73
if(!(yy_init))
Definition: lexyy.cc:1144
Header for compiler-dependent features.
#define MY_COMPILER_GCC_DIAGNOSTIC_IGNORE(X)
Definition: my_compiler.h:250
#define MY_COMPILER_DIAGNOSTIC_PUSH()
save the compiler's diagnostic (enabled warnings, errors, ...) state
Definition: my_compiler.h:285
#define MY_COMPILER_DIAGNOSTIC_POP()
restore the compiler's diagnostic (enabled warnings, errors, ...) state
Definition: my_compiler.h:286
static bool interrupted
Definition: mysqladmin.cc:72
Definition: buf0block_hint.cc:30
static bool timeout(bool(*wait_condition)())
Timeout function.
Definition: log0meb.cc:498
stdx::expected< void, std::error_code > ctl(int epfd, Cmd cmd, int fd, epoll_event *ev)
Definition: linux_epoll.h:71
stdx::expected< int, std::error_code > create()
Definition: linux_epoll.h:60
stdx::expected< size_t, std::error_code > wait(int epfd, epoll_event *fd_events, size_t num_fd_events, std::chrono::milliseconds timeout)
Definition: linux_epoll.h:81
int file_handle_type
Definition: file.h:53
constexpr file_handle_type kInvalidHandle
Definition: file.h:54
stdx::expected< std::pair< file_handle_type, file_handle_type >, std::error_code > pipe(int flags=0)
create pipe.
Definition: file.h:144
stdx::expected< void, std::error_code > close(file_handle_type native_handle)
close file handle.
Definition: file.h:239
wait_type
Definition: socket_constants.h:86
stdx::expected< bool, error_type > native_non_blocking(native_handle_type native_handle)
Definition: socket.h:106
int native_handle_type
Definition: socket_constants.h:51
stdx::expected< size_t, std::error_code > write(SyncWriteStream &stream, const ConstBufferSequence &buffers)
Definition: buffer.h:977
stdx::expected< size_t, std::error_code > read(SyncReadStream &stream, const MutableBufferSequence &buffers)
Definition: buffer.h:835
std::error_code make_error_code(net::stream_errc e) noexcept
Definition: buffer.h:103
unexpected(E) -> unexpected< E >
std::basic_ostringstream< char, std::char_traits< char >, ut::allocator< char > > ostringstream
Specialization of basic_ostringstream which uses ut::allocator.
Definition: ut0new.h:2871
static void swap(String &a, String &b) noexcept
Definition: sql_string.h:663
Definition: io_service_base.h:69
native_handle_type fd
Definition: io_service_base.h:75
Definition: linux_epoll_io_service.h:429
std::mutex mtx_
Definition: linux_epoll_io_service.h:430
std::unordered_map< impl::socket::native_handle_type, uint32_t > interest_
Definition: linux_epoll_io_service.h:431