26#ifndef MYSQL_HARNESS_NET_TS_IMPL_LINUX_EPOLL_IO_SERVICE_H_ 
   27#define MYSQL_HARNESS_NET_TS_IMPL_LINUX_EPOLL_IO_SERVICE_H_ 
   37#include <system_error> 
   38#include <unordered_map> 
   40#if defined(USE_EVENTFD) 
   41#include <sys/eventfd.h> 
   85#if defined(USE_EVENTFD) 
   86    notify_fd_ = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
 
  103    auto non_block_wakeup_0_res =
 
  105    if (!non_block_wakeup_0_res) 
return non_block_wakeup_0_res;
 
  106    auto non_block_wakeup_1_res =
 
  108    if (!non_block_wakeup_1_res) 
return non_block_wakeup_1_res;
 
  127      } 
while (res == -1 && errno == EINTR);
 
  129      std::array<uint8_t, 256> 
buf;
 
  136      } 
while (res > 0 || (res == -1 && errno == EINTR));
 
  162      } 
while ((res == -1) && (errno == EINTR));
 
  168      } 
while ((res == -1) && (errno == EINTR));
 
  203      uint32_t new_events{};
 
  206          new_events = EPOLLIN;
 
  209          new_events = EPOLLOUT;
 
  212          new_events = EPOLLERR | EPOLLHUP;
 
  218      new_events |= EPOLLET;
 
  221        new_events |= EPOLLONESHOT;
 
  226      std::lock_guard<std::mutex> lk(b.mtx_);
 
  227      const auto it = b.interest_.find(fd);
 
  229      auto old_events = (it == b.interest_.end()) ? 0 : it->second;
 
  230      auto merged_events = new_events | old_events;
 
  239        if (!ctl_res) 
return ctl_res;
 
  243        if (!ctl_res) 
return ctl_res;
 
  247      if (it != b.interest_.end()) {
 
  248        it->second = merged_events;
 
  250        b.interest_.emplace(fd, merged_events);
 
  261      std::lock_guard<std::mutex> lk(b.mtx_);
 
  264      const auto it = b.interest_.find(fd);
 
  265      if (it != b.interest_.end()) {
 
  269          if (!epoll_ctl_res) 
return epoll_ctl_res;
 
  272        b.interest_.erase(it);
 
  287      std::lock_guard<std::mutex> lk(b.mtx_);
 
  289      const auto it = b.interest_.find(fd);
 
  290      if (it == b.interest_.end()) {
 
  301      const auto updated_fd_events = 
interest.second & ~fd_events;
 
  306        ev.events = updated_fd_events & ~kAlwaysEnabledEvents;
 
  311      } 
else if ((updated_fd_events & 
kAllEvents) == 0) {
 
  317      interest.second = updated_fd_events;
 
  329      std::lock_guard<std::mutex> lk(b.mtx_);
 
  331      const auto it = b.interest_.find(fd);
 
  332      if (it == b.interest_.end()) {
 
  340      if (!(
interest.second & EPOLLONESHOT)) {
 
  371      if (fd_events != 0 &&  
 
  372          (fd_events & fd_interest) == 0) {
 
  373        std::cerr << 
"after_event_fired(" << fd << 
", " 
  374                  << std::bitset<32>(fd_events) << 
") not in " 
  375                  << std::bitset<32>(fd_interest) << std::endl;
 
  381      const auto updated_fd_events = 
interest.second & ~fd_events;
 
  395        ev.events = updated_fd_events & ~kAlwaysEnabledEvents;
 
  400      } 
else if ((updated_fd_events & 
kAllEvents) == 0) {
 
  407      interest.second = updated_fd_events;
 
  415      std::lock_guard<std::mutex> lk(b.mtx_);
 
  417      const auto it = b.interest_.find(fd);
 
  418      if (it != b.interest_.end()) {
 
  431      std::unordered_map<impl::socket::native_handle_type, uint32_t> 
interest_;
 
  436      const size_t ndx = fd % 
buckets_.size();
 
  442      const size_t ndx = fd % 
buckets_.size();
 
  470        if (ev.data.fd == fd) {
 
  510    if (ev.events & EPOLLOUT) {
 
  513    } 
else if (ev.events & EPOLLIN) {
 
  516    } 
else if (ev.events & EPOLLERR) {
 
  519    } 
else if (ev.events & EPOLLHUP) {
 
  525    if ((
fd_events_[ndx].events & (EPOLLIN | EPOLLOUT | EPOLLERR | EPOLLHUP)) ==
 
  548    std::copy_n(evs.begin(), *res, 
fd_events_.begin());
 
  563        oss << 
"after_event_fired(" << ev.data.fd << 
", " 
  564            << std::bitset<32>(ev.events) << 
") " << after_res.error() << 
" " 
  565            << after_res.error().message() << std::endl;
 
  566        std::cerr << oss.str();
 
  602      if (ev_res.error() == std::errc::no_such_file_or_directory) {
 
  653  std::pair<impl::file::file_handle_type, impl::file::file_handle_type>
 
Definition: io_service_base.h:87
impl::socket::native_handle_type native_handle_type
Definition: io_service_base.h:89
Definition: linux_epoll_io_service.h:198
stdx::expected< void, std::error_code > erase(int epfd, native_handle_type fd)
Definition: linux_epoll_io_service.h:256
stdx::expected< void, std::error_code > merge(int epfd, native_handle_type fd, impl::socket::wait_type wt, bool oneshot)
Definition: linux_epoll_io_service.h:200
std::optional< int32_t > interest(native_handle_type fd) const
Definition: linux_epoll_io_service.h:412
const locked_bucket & bucket(native_handle_type fd) const
Definition: linux_epoll_io_service.h:441
stdx::expected< void, std::error_code > after_event_fired(int epfd, native_handle_type fd, uint32_t revent)
update registered fd-interest after a oneshot event fired.
Definition: linux_epoll_io_service.h:325
std::array< locked_bucket, 101 > buckets_
Definition: linux_epoll_io_service.h:448
stdx::expected< void, std::error_code > remove_fd_interest(int epfd, native_handle_type fd, uint32_t revent)
Definition: linux_epoll_io_service.h:283
locked_bucket & bucket(native_handle_type fd)
Definition: linux_epoll_io_service.h:435
Definition: linux_epoll_io_service.h:58
stdx::expected< fd_event, std::error_code > update_fd_events(std::chrono::milliseconds timeout)
Definition: linux_epoll_io_service.h:533
~linux_epoll_io_service() override
Definition: linux_epoll_io_service.h:67
stdx::expected< void, std::error_code > add_fd_interest(native_handle_type fd, impl::socket::wait_type wt) override
Definition: linux_epoll_io_service.h:451
static constexpr const int kAlwaysEnabledEvents
Definition: linux_epoll_io_service.h:63
impl::file::file_handle_type notify_fd_
Definition: linux_epoll_io_service.h:656
stdx::expected< void, std::error_code > add_fd_interest_permanent(native_handle_type fd, impl::socket::wait_type wt)
Definition: linux_epoll_io_service.h:456
size_t fd_events_size_
Definition: linux_epoll_io_service.h:650
static constexpr const int kSettableEvents
Definition: linux_epoll_io_service.h:62
void notify() override
notify the poll_one() that something may have changed.
Definition: linux_epoll_io_service.h:145
stdx::expected< void, std::error_code > after_event_fired(int epfd, native_handle_type fd, uint32_t revents)
Definition: linux_epoll_io_service.h:658
std::mutex fd_events_mtx_
Definition: linux_epoll_io_service.h:647
bool is_open() const noexcept
Definition: linux_epoll_io_service.h:69
FdInterest registered_events_
Definition: linux_epoll_io_service.h:628
void on_notify()
Definition: linux_epoll_io_service.h:116
impl::file::file_handle_type epfd_
Definition: linux_epoll_io_service.h:651
stdx::expected< fd_event, std::error_code > pop_event()
Definition: linux_epoll_io_service.h:500
stdx::expected< fd_event, std::error_code > poll_one(std::chrono::milliseconds timeout) override
poll one event from the registered fd-interest.
Definition: linux_epoll_io_service.h:583
stdx::expected< void, std::error_code > close()
Definition: linux_epoll_io_service.h:172
static constexpr const int kAllEvents
Definition: linux_epoll_io_service.h:64
stdx::expected< void, std::error_code > open() noexcept override
open the io-service.
Definition: linux_epoll_io_service.h:76
std::array< epoll_event, 8192 > fd_events_
Definition: linux_epoll_io_service.h:648
stdx::expected< void, std::error_code > remove_fd(native_handle_type fd) override
Definition: linux_epoll_io_service.h:461
size_t fd_events_processed_
Definition: linux_epoll_io_service.h:649
stdx::expected< void, std::error_code > remove_fd_interest(native_handle_type fd, uint32_t revents)
Definition: linux_epoll_io_service.h:486
std::optional< int32_t > interest(native_handle_type fd) const
get current fd-interest.
Definition: linux_epoll_io_service.h:496
std::pair< impl::file::file_handle_type, impl::file::file_handle_type > wakeup_fds_
Definition: linux_epoll_io_service.h:654
Definition: expected.h:286
static char buf[MAX_BUF]
Definition: conf_to_src.cc:74
if(!(yy_init))
Definition: lexyy.cc:1144
Header for compiler-dependent features.
#define MY_COMPILER_GCC_DIAGNOSTIC_IGNORE(X)
Definition: my_compiler.h:242
#define MY_COMPILER_DIAGNOSTIC_PUSH()
save the compiler's diagnostic (enabled warnings, errors, ...) state
Definition: my_compiler.h:277
#define MY_COMPILER_DIAGNOSTIC_POP()
restore the compiler's diagnostic (enabled warnings, errors, ...) state
Definition: my_compiler.h:278
static bool interrupted
Definition: mysqladmin.cc:73
Definition: buf0block_hint.cc:30
static bool timeout(bool(*wait_condition)())
Timeout function.
Definition: log0meb.cc:498
std::chrono::milliseconds milliseconds
Definition: authorize_manager.cc:67
stdx::expected< void, std::error_code > ctl(int epfd, Cmd cmd, int fd, epoll_event *ev)
Definition: linux_epoll.h:71
stdx::expected< int, std::error_code > create()
Definition: linux_epoll.h:60
stdx::expected< size_t, std::error_code > wait(int epfd, epoll_event *fd_events, size_t num_fd_events, std::chrono::milliseconds timeout)
Definition: linux_epoll.h:81
int file_handle_type
Definition: file.h:53
constexpr file_handle_type kInvalidHandle
Definition: file.h:54
stdx::expected< std::pair< file_handle_type, file_handle_type >, std::error_code > pipe(int flags=0)
create pipe.
Definition: file.h:144
stdx::expected< void, std::error_code > close(file_handle_type native_handle)
close file handle.
Definition: file.h:239
wait_type
Definition: socket_constants.h:86
stdx::expected< bool, error_type > native_non_blocking(native_handle_type native_handle)
Definition: socket.h:106
int native_handle_type
Definition: socket_constants.h:51
stdx::expected< size_t, std::error_code > write(SyncWriteStream &stream, const ConstBufferSequence &buffers)
Definition: buffer.h:977
stdx::expected< size_t, std::error_code > read(SyncReadStream &stream, const MutableBufferSequence &buffers)
Definition: buffer.h:835
std::error_code make_error_code(net::stream_errc e) noexcept
Definition: buffer.h:103
unexpected(E) -> unexpected< E >
std::basic_ostringstream< char, std::char_traits< char >, ut::allocator< char > > ostringstream
Specialization of basic_ostringstream which uses ut::allocator.
Definition: ut0new.h:2876
static void swap(String &a, String &b) noexcept
Definition: sql_string.h:650
Definition: io_service_base.h:69
native_handle_type fd
Definition: io_service_base.h:75
Definition: linux_epoll_io_service.h:429
std::mutex mtx_
Definition: linux_epoll_io_service.h:430
std::unordered_map< impl::socket::native_handle_type, uint32_t > interest_
Definition: linux_epoll_io_service.h:431