MySQL 8.4.2
Source Code Documentation
kqueue_io_service.h
Go to the documentation of this file.
1/*
2 Copyright (c) 2019, 2024, Oracle and/or its affiliates.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is designed to work with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have either included with
14 the program or referenced in the documentation.
15
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License for more details.
20
21 You should have received a copy of the GNU General Public License
22 along with this program; if not, write to the Free Software
23 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
24*/
25
26#ifndef MYSQL_HARNESS_NET_TS_IMPL_KQUEUE_IO_SERVICE_H_
27#define MYSQL_HARNESS_NET_TS_IMPL_KQUEUE_IO_SERVICE_H_
28
29#include "my_config.h" // HAVE_KQUEUE
30
31#ifdef HAVE_KQUEUE
32#include <array>
33
36
37namespace net {
38class kqueue_io_service : public IoServiceBase {
39 public:
41
42 ~kqueue_io_service() override { close(); }
43
44 bool is_open() const noexcept {
45 return epfd_ != impl::file::kInvalidHandle &&
46 wakeup_fds_.first != impl::file::kInvalidHandle &&
47 wakeup_fds_.second != impl::file::kInvalidHandle;
48 }
49
51 if (is_open()) {
53 }
54
55 auto res = impl::kqueue::create();
56 if (!res) return stdx::unexpected(res.error());
57
58 epfd_ = *res;
59
60 auto pipe_res = impl::file::pipe(O_NONBLOCK);
61 if (!pipe_res) return stdx::unexpected(pipe_res.error());
62
63 wakeup_fds_ = *pipe_res;
64
65 // set both ends of the pipe non-blocking as
66 //
67 // - read() shouldn't block is pipe is empty
68 // - write() shouldn't block is pipe is full as it only matters there is
69 // something in the pipe to wakeup the poll_one()
70 auto non_block_wakeup_0_res =
71 impl::socket::native_non_blocking(wakeup_fds_.first, true);
72 if (!non_block_wakeup_0_res) return non_block_wakeup_0_res;
73 auto non_block_wakeup_1_res =
74 impl::socket::native_non_blocking(wakeup_fds_.second, true);
75 if (!non_block_wakeup_1_res) return non_block_wakeup_1_res;
76
77 return {};
78 }
79
80 void on_notify() {
81 std::array<uint8_t, 256> buf;
82 ssize_t res;
83 do {
84 res = ::read(wakeup_fds_.first, buf.data(), buf.size());
85 } while (res != -1 || errno == EINTR);
86 }
87
88 void notify() override {
89 ssize_t res;
90 do {
91 res = ::write(wakeup_fds_.second, ".", 1);
92 // retry if interrupted
93 } while ((res == -1) && (errno == EINTR));
94 }
95
97 if (wakeup_fds_.first != impl::file::kInvalidHandle) {
98 impl::file::close(wakeup_fds_.first);
99 wakeup_fds_.first = impl::file::kInvalidHandle;
100 }
101
102 if (wakeup_fds_.second != impl::file::kInvalidHandle) {
103 impl::file::close(wakeup_fds_.second);
104 wakeup_fds_.second = impl::file::kInvalidHandle;
105 }
106
107 if (epfd_ != impl::file::kInvalidHandle) {
108 impl::file::close(epfd_);
110 }
111
112 return {};
113 }
114
117 struct kevent ev;
118
119 short filter{0};
120
121 switch (wt) {
123 filter = EVFILT_READ;
124 break;
126 filter = EVFILT_WRITE;
127 break;
128 default:
129 std::terminate();
130 break;
131 }
132
133 // edge-triggered: EV_CLEAR
134 EV_SET(&ev, fd, filter, EV_ADD | EV_ONESHOT | EV_CLEAR, 0, 0, NULL);
135
136 changelist_.push_back(ev);
137
138 return {};
139 }
140
141 stdx::expected<void, std::error_code> queue_remove_fd_interest(
142 native_handle_type fd, short filter) {
143 struct kevent ev;
144
145 EV_SET(&ev, fd, filter, EV_DELETE, 0, 0, NULL);
146
147 changelist_.push_back(ev);
148
149 return {};
150 }
151
153 const struct kevent & /*ev*/) {
154 // as ONESHOT is used, there is no need to remove-fd-interest again
155 return {};
156 }
157
158 // TODO: should be renamed to "before_close()" as it is a no-op on kqueue,
159 // but a requirement on linux epoll
161 native_handle_type /* fd */) override {
162#if 0
163 struct kevent ev;
164
165 EV_SET(&ev, fd, 0, EV_DELETE, 0, 0, NULL);
166
167 changelist_.push_back(ev);
168#endif
169
170 return {};
171 }
172
173 /**
174 * @returns a fdevent or std::error_code
175 * @retval fd_event on success
176 * @retval std::error_code on failure, std::error_code(success) if no events
177 * where registered.
178 */
180 std::chrono::milliseconds timeout) override {
181 if (!is_open()) {
182 return stdx::unexpected(make_error_code(std::errc::invalid_argument));
183 }
184
185 if (fd_events_processed_ == fd_events_size_) {
186 struct timespec ts, *p_ts{};
187
188 if (timeout.count() != -1) {
189 auto secs = std::chrono::duration_cast<std::chrono::seconds>(timeout);
190 timeout -= secs;
191
192 ts = {secs.count(),
193 std::chrono::duration_cast<std::chrono::nanoseconds>(timeout)
194 .count()};
195
196 p_ts = &ts;
197 }
198
199 auto res =
200 impl::kqueue::kevent(epfd_, changelist_.data(), changelist_.size(),
201 fd_events_.data(), fd_events_.size(), p_ts);
202 if (!res) return stdx::unexpected(res.error());
203
204 changelist_.clear();
205
206 fd_events_processed_ = 0;
207 fd_events_size_ = *res;
208
209 if (fd_events_size_ == 0) {
210 return stdx::unexpected(make_error_code(std::errc::timed_out));
211 }
212 }
213
214 const auto ev = fd_events_[fd_events_processed_++];
215
216 // ev.flags may also set EV_EOF
217
218 if (ev.flags & EV_ERROR) {
219 if (ev.data == 0) {
221 } else {
222 // .data is errno
223 //
224 // if EV_RECEIPT is set, ev.data will be 0 in case of OK
225 // eitherwise ...
226 // - ENOENT
227
228 return fd_event{static_cast<native_handle_type>(ev.ident), POLLERR};
229 }
230 }
231
232 if (static_cast<impl::file::file_handle_type>(ev.ident) ==
233 wakeup_fds_.first) {
234 // wakeup fd fired
235 //
236 // - don't remove the interest for it
237 // - report to the caller that we don't have an event yet by saying we got
238 // interrupted
239 on_notify();
240
242 }
243
244 after_event_fired(ev);
245
246 short events{};
247 if (ev.filter == EVFILT_READ) {
248 events = POLLIN;
249 } else if (ev.filter == EVFILT_WRITE) {
250 events = POLLOUT;
251 }
252
253 // ev.ident is a uintptr_t ... as it supports many kinds of event-sources
254 // ... but we only added a 'int' as file-handle
255 return fd_event{static_cast<native_handle_type>(ev.ident), events};
256 }
257
258 private:
259 std::array<struct kevent, 16> fd_events_;
260 size_t fd_events_processed_{0};
261 size_t fd_events_size_{0};
262 int epfd_{-1};
263 std::vector<struct kevent> changelist_;
264
265 std::pair<impl::file::file_handle_type, impl::file::file_handle_type>
267};
268} // namespace net
269#endif
270
271#endif
Definition: expected.h:284
static char buf[MAX_BUF]
Definition: conf_to_src.cc:73
static bool interrupted
Definition: mysqladmin.cc:72
Definition: buf0block_hint.cc:30
static bool timeout(bool(*wait_condition)())
Timeout function.
Definition: log0meb.cc:498
int file_handle_type
Definition: file.h:53
constexpr file_handle_type kInvalidHandle
Definition: file.h:54
stdx::expected< std::pair< file_handle_type, file_handle_type >, std::error_code > pipe(int flags=0)
create pipe.
Definition: file.h:144
stdx::expected< void, std::error_code > close(file_handle_type native_handle)
close file handle.
Definition: file.h:239
wait_type
Definition: socket_constants.h:86
stdx::expected< bool, error_type > native_non_blocking(native_handle_type native_handle)
Definition: socket.h:106
int native_handle_type
Definition: socket_constants.h:51
Definition: buffer.h:45
std::enable_if_t< is_const_buffer_sequence_v< ConstBufferSequence >, stdx::expected< size_t, std::error_code > > write(SyncWriteStream &stream, const ConstBufferSequence &buffers)
Definition: buffer.h:992
std::enable_if_t< is_mutable_buffer_sequence< MutableBufferSequence >::value, stdx::expected< size_t, std::error_code > > read(SyncReadStream &stream, const MutableBufferSequence &buffers)
Definition: buffer.h:837
std::error_code make_error_code(net::stream_errc e) noexcept
Definition: buffer.h:103
static mysql_service_status_t create(const char *service_names[], reference_caching_channel *out_channel) noexcept
Definition: component.cc:45
stdx::expected< int, std::error_code > open(const char *fname, int flags, mode_t mode) noexcept
Definition: file_handle.cc:79
unexpected(E) -> unexpected< E >
static bool notify(SvcTypes svc_type, Notification_context &ctx)
Auxiliary function to engage the service registry to notify a set of listeners.
Definition: notification.cc:87
#define NULL
Definition: types.h:55