MySQL 8.0.40
Source Code Documentation
kqueue_io_service.h
Go to the documentation of this file.
1/*
2 Copyright (c) 2019, 2024, Oracle and/or its affiliates.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is designed to work with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have either included with
14 the program or referenced in the documentation.
15
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License for more details.
20
21 You should have received a copy of the GNU General Public License
22 along with this program; if not, write to the Free Software
23 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
24*/
25
26#ifndef MYSQL_HARNESS_NET_TS_IMPL_KQUEUE_IO_SERVICE_H_
27#define MYSQL_HARNESS_NET_TS_IMPL_KQUEUE_IO_SERVICE_H_
28
29#include "my_config.h" // HAVE_KQUEUE
30
31#ifdef HAVE_KQUEUE
32#include <array>
33
36
37namespace net {
38class kqueue_io_service : public IoServiceBase {
39 public:
41
42 ~kqueue_io_service() override { close(); }
43
44 bool is_open() const noexcept {
45 return epfd_ != impl::file::kInvalidHandle &&
46 wakeup_fds_.first != impl::file::kInvalidHandle &&
47 wakeup_fds_.second != impl::file::kInvalidHandle;
48 }
49
51 if (is_open()) {
54 }
55
56 auto res = impl::kqueue::create();
57 if (!res) return stdx::make_unexpected(res.error());
58
59 epfd_ = *res;
60
61 auto pipe_res = impl::file::pipe(O_NONBLOCK);
62 if (!pipe_res) return stdx::make_unexpected(pipe_res.error());
63
64 wakeup_fds_ = *pipe_res;
65
66 // set both ends of the pipe non-blocking as
67 //
68 // - read() shouldn't block is pipe is empty
69 // - write() shouldn't block is pipe is full as it only matters there is
70 // something in the pipe to wakeup the poll_one()
71 auto non_block_wakeup_0_res =
72 impl::socket::native_non_blocking(wakeup_fds_.first, true);
73 if (!non_block_wakeup_0_res) return non_block_wakeup_0_res;
74 auto non_block_wakeup_1_res =
75 impl::socket::native_non_blocking(wakeup_fds_.second, true);
76 if (!non_block_wakeup_1_res) return non_block_wakeup_1_res;
77
78 return {};
79 }
80
81 void on_notify() {
82 std::array<uint8_t, 256> buf;
83 ssize_t res;
84 do {
85 res = ::read(wakeup_fds_.first, buf.data(), buf.size());
86 } while (res != -1 || errno == EINTR);
87 }
88
89 void notify() override {
90 ssize_t res;
91 do {
92 res = ::write(wakeup_fds_.second, ".", 1);
93 // retry if interrupted
94 } while ((res == -1) && (errno == EINTR));
95 }
96
98 if (wakeup_fds_.first != impl::file::kInvalidHandle) {
99 impl::file::close(wakeup_fds_.first);
100 wakeup_fds_.first = impl::file::kInvalidHandle;
101 }
102
103 if (wakeup_fds_.second != impl::file::kInvalidHandle) {
104 impl::file::close(wakeup_fds_.second);
105 wakeup_fds_.second = impl::file::kInvalidHandle;
106 }
107
108 if (epfd_ != impl::file::kInvalidHandle) {
109 impl::file::close(epfd_);
111 }
112
113 return {};
114 }
115
118 struct kevent ev;
119
120 short filter{0};
121
122 switch (wt) {
124 filter = EVFILT_READ;
125 break;
127 filter = EVFILT_WRITE;
128 break;
129 default:
131 break;
132 }
133
134 // edge-triggered: EV_CLEAR
135 EV_SET(&ev, fd, filter, EV_ADD | EV_ONESHOT | EV_CLEAR, 0, 0, NULL);
136
137 changelist_.push_back(ev);
138
139 return {};
140 }
141
142 stdx::expected<void, std::error_code> queue_remove_fd_interest(
143 native_handle_type fd, short filter) {
144 struct kevent ev;
145
146 EV_SET(&ev, fd, filter, EV_DELETE, 0, 0, NULL);
147
148 changelist_.push_back(ev);
149
150 return {};
151 }
152
154 const struct kevent & /*ev*/) {
155 // as ONESHOT is used, there is no need to remove-fd-interest again
156 return {};
157 }
158
159 // TODO: should be renamed to "before_close()" as it is a no-op on kqueue,
160 // but a requirement on linux epoll
162 native_handle_type /* fd */) override {
163#if 0
164 struct kevent ev;
165
166 EV_SET(&ev, fd, 0, EV_DELETE, 0, 0, NULL);
167
168 changelist_.push_back(ev);
169#endif
170
171 return {};
172 }
173
174 /**
175 * @returns a fdevent or std::error_code
176 * @retval fd_event on success
177 * @retval std::error_code on failure, std::error_code(success) if no events
178 * where registered.
179 */
181 std::chrono::milliseconds timeout) override {
182 if (!is_open()) {
184 make_error_code(std::errc::invalid_argument));
185 }
186
187 if (fd_events_processed_ == fd_events_size_) {
188 struct timespec ts, *p_ts{};
189
190 if (timeout.count() != -1) {
191 auto secs = std::chrono::duration_cast<std::chrono::seconds>(timeout);
192 timeout -= secs;
193
194 ts = {secs.count(),
195 std::chrono::duration_cast<std::chrono::nanoseconds>(timeout)
196 .count()};
197
198 p_ts = &ts;
199 }
200
201 auto res =
202 impl::kqueue::kevent(epfd_, changelist_.data(), changelist_.size(),
203 fd_events_.data(), fd_events_.size(), p_ts);
204 if (!res) return stdx::make_unexpected(res.error());
205
206 changelist_.clear();
207
208 fd_events_processed_ = 0;
209 fd_events_size_ = *res;
210
211 if (fd_events_size_ == 0) {
212 return stdx::make_unexpected(make_error_code(std::errc::timed_out));
213 }
214 }
215
216 const auto ev = fd_events_[fd_events_processed_++];
217
218 // ev.flags may also set EV_EOF
219
220 if (ev.flags & EV_ERROR) {
221 if (ev.data == 0) {
223 } else {
224 // .data is errno
225 //
226 // if EV_RECEIPT is set, ev.data will be 0 in case of OK
227 // eitherwise ...
228 // - ENOENT
229
230 return fd_event{static_cast<native_handle_type>(ev.ident), POLLERR};
231 }
232 }
233
234 if (static_cast<impl::file::file_handle_type>(ev.ident) ==
235 wakeup_fds_.first) {
236 // wakeup fd fired
237 //
238 // - don't remove the interest for it
239 // - report to the caller that we don't have an event yet by saying we got
240 // interrupted
241 on_notify();
242
244 }
245
246 after_event_fired(ev);
247
248 short events{};
249 if (ev.filter == EVFILT_READ) {
250 events = POLLIN;
251 } else if (ev.filter == EVFILT_WRITE) {
252 events = POLLOUT;
253 }
254
255 // ev.ident is a uintptr_t ... as it supports many kinds of event-sources
256 // ... but we only added a 'int' as file-handle
257 return fd_event{static_cast<native_handle_type>(ev.ident), events};
258 }
259
260 private:
261 std::array<struct kevent, 16> fd_events_;
262 size_t fd_events_processed_{0};
263 size_t fd_events_size_{0};
264 int epfd_{-1};
265 std::vector<struct kevent> changelist_;
266
267 std::pair<impl::file::file_handle_type, impl::file::file_handle_type>
269};
270} // namespace net
271#endif
272
273#endif
Definition: expected.h:944
static bool interrupted
Definition: mysqladmin.cc:66
Definition: buf0block_hint.cc:30
bool terminate(THD *thd)
Drop all DD tables in case there is an error while upgrading server.
Definition: upgrade.cc:686
static bool timeout(bool(*wait_condition)())
Timeout function.
Definition: log0meb.cc:496
int file_handle_type
Definition: file.h:53
constexpr file_handle_type kInvalidHandle
Definition: file.h:54
stdx::expected< std::pair< file_handle_type, file_handle_type >, std::error_code > pipe(int flags=0)
create pipe.
Definition: file.h:144
stdx::expected< void, std::error_code > close(file_handle_type native_handle)
close file handle.
Definition: file.h:239
wait_type
Definition: socket_constants.h:86
stdx::expected< bool, error_type > native_non_blocking(native_handle_type native_handle)
Definition: socket.h:106
int native_handle_type
Definition: socket_constants.h:51
Definition: buffer.h:45
std::enable_if_t< is_const_buffer_sequence< ConstBufferSequence >::value, stdx::expected< size_t, std::error_code > > write(SyncWriteStream &stream, const ConstBufferSequence &buffers)
Definition: buffer.h:992
std::enable_if_t< is_mutable_buffer_sequence< MutableBufferSequence >::value, stdx::expected< size_t, std::error_code > > read(SyncReadStream &stream, const MutableBufferSequence &buffers)
Definition: buffer.h:838
std::error_code make_error_code(net::stream_errc e) noexcept
Definition: buffer.h:103
static mysql_service_status_t create(const char *service_names[], reference_caching_channel *out_channel) noexcept
Definition: component.cc:36
stdx::expected< int, std::error_code > open(const char *fname, int flags, mode_t mode) noexcept
Definition: file_handle.cc:79
constexpr auto make_unexpected(E &&e) -> unexpected< std::decay_t< E > >
Definition: expected.h:125
static bool notify(SvcTypes svc_type, Notification_context &ctx)
Auxiliary function to engage the service registry to notify a set of listeners.
Definition: notification.cc:87
#define NULL
Definition: types.h:55