MySQL 8.0.39
Source Code Documentation
poll_io_service.h
Go to the documentation of this file.
1/*
2 Copyright (c) 2019, 2024, Oracle and/or its affiliates.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is designed to work with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have either included with
14 the program or referenced in the documentation.
15
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License for more details.
20
21 You should have received a copy of the GNU General Public License
22 along with this program; if not, write to the Free Software
23 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
24*/
25
26#ifndef MYSQL_HARNESS_NET_TS_IMPL_POLL_IO_SERVICE_H_
27#define MYSQL_HARNESS_NET_TS_IMPL_POLL_IO_SERVICE_H_
28
29#include <array>
30#include <list>
31#include <mutex>
32#include <optional>
33#include <system_error>
34#include <vector>
35
40
41namespace net {
42
43/**
44 * io_service based on the poll() system-call.
45 *
46 *
47 */
48
49// https://daniel.haxx.se/blog/2016/10/11/poll-on-mac-10-12-is-broken/
50// https://daniel.haxx.se/blog/2012/10/10/wsapoll-is-broken/
51// http://www.greenend.org.uk/rjk/tech/poll.html
53 public:
54 ~poll_io_service() override { close(); }
55
56 static constexpr const short kSettableEvents = POLLIN | POLLOUT;
57 static constexpr const short kAlwaysEnabledEvents = POLLHUP | POLLERR;
58 static constexpr const short kAllEvents =
60
61 bool is_open() const noexcept {
64 }
65
67 if (is_open()) {
70 }
71
72#if defined(_WIN32)
73 // on windows we build a AF_INET socketpair()
74 auto pipe_res = impl::socket::socketpair(AF_INET, SOCK_STREAM, 0);
75#else
76 auto pipe_res = impl::socket::socketpair(AF_UNIX, SOCK_STREAM, 0);
77#endif
78 if (!pipe_res) return stdx::make_unexpected(pipe_res.error());
79
80 wakeup_fds_ = *pipe_res;
81
82 // set both ends of the pipe non-blocking as
83 //
84 // - read() shouldn't block if pipe is empty
85 // - write() shouldn't block if pipe is full as it only matters there is
86 // something in the pipe to wakeup the poll_one()
87 auto non_block_wakeup_0_res =
89 if (!non_block_wakeup_0_res) return non_block_wakeup_0_res;
90 auto non_block_wakeup_1_res =
92 if (!non_block_wakeup_1_res) return non_block_wakeup_1_res;
93
95
96 return {};
97 }
98
103 }
106
108 }
109
110 return {};
111 }
112
115 switch (event) {
117 return POLLIN;
119 return POLLOUT;
121 return POLLERR | POLLHUP;
122 default:
124 make_error_code(std::errc::invalid_argument));
125 }
126 }
127
128 // split fd-interest
129 //
130 // internally splits the fds into multiple buckets to reduce
131 // the search-space and lead to lower resize cost
133 public:
135
136 // we could use list, deque, vector, ... here
137 //
138 // container_type | concurrency | mem-usage | tps
139 // ---------------+-------------+-----------+------
140 // list: | 8000 | 137M | 56000
141 // vector | 8000 | 145M | 58000
142 using container_type = std::vector<element_type>;
143
145 auto &b = bucket(t.fd);
146
147 std::lock_guard<std::mutex> lk(mtx_);
148
149 auto it = std::find_if(b.begin(), b.end(), [fd = t.fd](auto fd_ev) {
150 return fd_ev.fd == fd;
151 });
152 if (it == b.end()) {
153 b.push_back(std::move(t));
154 } else {
155 it->event |= t.event;
156 }
157 }
158
160 auto &b = bucket(fd);
161
162 std::lock_guard<std::mutex> lk(mtx_);
163 const auto end = b.end();
164
165 for (auto cur = b.begin(); cur != end;) {
166 if (cur->fd == fd) {
167 cur = b.erase(cur);
168
169 return {};
170 } else {
171 ++cur;
172 }
173 }
174
175 // not found
177 make_error_code(std::errc::no_such_file_or_directory));
178 }
179
180 std::vector<pollfd> poll_fds() const {
181 std::vector<pollfd> fds;
182 {
183 std::lock_guard<std::mutex> lk(mtx_);
184 size_t count{};
185
186 for (const auto &b : buckets_) {
187 count += b.size();
188 }
189
190 // reserve a few more than needed.
191 fds.reserve(count);
192
193 for (const auto &b : buckets_) {
194 for (auto const &fd_int : b) {
195 if (fd_int.event != 0) {
196 fds.push_back(
197 {fd_int.fd,
198 static_cast<short>(fd_int.event & ~kAlwaysEnabledEvents),
199 0});
200 }
201 }
202 }
203 }
204
205 return fds;
206 }
207
209 short event) {
210 auto &b = bucket(fd);
211
212 std::lock_guard<std::mutex> lk(mtx_);
213 auto it = std::find_if(b.begin(), b.end(),
214 [fd](auto fd_ev) { return fd_ev.fd == fd; });
215 if (it == b.end()) {
217 make_error_code(std::errc::no_such_file_or_directory));
218 }
219
220 it->event &= ~event;
221
222 return {};
223 }
224
225 std::optional<int32_t> interest(native_handle_type fd) const {
226 auto &b = bucket(fd);
227
228 std::lock_guard<std::mutex> lk(mtx_);
229
230 for (auto const &fd_ev : b) {
231 if (fd_ev.fd == fd) return fd_ev.event;
232 }
233
234 return std::nullopt;
235 }
236
237 private:
239 size_t ndx = fd % buckets_.size();
240
241 return buckets_[ndx];
242 }
243
245 size_t ndx = fd % buckets_.size();
246
247 return buckets_[ndx];
248 }
249
250 // tps @8000 client connections
251 //
252 // cnt : tps
253 // ----:------
254 // 1: 32000
255 // 3: 45000
256 // 7: 54000
257 // 13: 56000
258 // 23: 57000
259 // 47: 58000
260 // 101: 58000
261 // 1009: 57000
262 static constexpr const size_t bucket_count_{101};
263
264 mutable std::mutex mtx_;
265 std::array<container_type, bucket_count_> buckets_;
266 };
267
272 make_error_code(std::errc::invalid_argument));
273 }
274
275 auto event_res = poll_event_from_wait_type(event);
276 if (!event_res) return stdx::make_unexpected(event_res.error());
277
278 fd_interests_.push_back({fd, event_res.value()});
279
280 return {};
281 }
282
283 /**
284 * remove fd from interest set.
285 */
287 native_handle_type fd) override {
290 make_error_code(std::errc::invalid_argument));
291 }
292
293 std::lock_guard<std::mutex> lk(mtx_);
294
295 auto res = fd_interests_.erase_all(fd);
296 if (res) {
297 // remove all events which are already fetched by poll_one()
298
299 auto end = triggered_events_.end();
300 for (auto cur = triggered_events_.begin(); cur != end;) {
301 if (cur->fd == fd) {
302 cur = triggered_events_.erase(cur);
303 } else {
304 ++cur;
305 }
306 }
307 }
308
309 return res;
310 }
311
312 /**
313 * get current fd-interest.
314 *
315 * @returns fd-interest as bitmask of raw POLL* flags
316 */
317 std::optional<int32_t> interest(native_handle_type fd) const {
318 return fd_interests_.interest(fd);
319 }
320
322 fd_event ev;
323
324 auto &head = triggered_events_.front();
325
326 ev.fd = head.fd;
327
328 // if there are multiple events: get OUT before IN.
329 if (head.event & POLLOUT) {
330 head.event &= ~POLLOUT;
331 ev.event = POLLOUT;
332 } else if (head.event & POLLIN) {
333 // disable HUP if it is sent together with IN
334 if (head.event & POLLHUP) head.event &= ~POLLHUP;
335
336 head.event &= ~POLLIN;
337 ev.event = POLLIN;
338 } else if (head.event & POLLERR) {
339 head.event &= ~POLLERR;
340 ev.event = POLLERR;
341 } else if (head.event & POLLHUP) {
342 head.event &= ~POLLHUP;
343 ev.event = POLLHUP;
344 }
345
346 if ((head.event & (POLLIN | POLLOUT | POLLERR | POLLHUP)) == 0) {
347 triggered_events_.pop_front();
348 }
349
350 return ev;
351 }
352
354 std::chrono::milliseconds timeout) {
355 // build fds for poll() from fd-interest
356
357 auto poll_fds = fd_interests_.poll_fds();
358 auto res = impl::poll::poll(poll_fds.data(), poll_fds.size(), timeout);
359 if (!res) return stdx::make_unexpected(res.error());
360
361 size_t num_revents = res.value(); // number of pollfds with revents
362
363 // translate poll()'s revents into triggered events.
364 std::lock_guard lk(mtx_);
365
366 for (auto ev : poll_fds) {
367 if (ev.revents != 0) {
368 --num_revents;
369
370 // If the caller wants (ev.events) only:
371 //
372 // - POLLIN|POLLOUT
373 //
374 // but poll() returns:
375 //
376 // - POLLHUP
377 //
378 // then return POLLIN|POLLOUT.
379 //
380 // This handles the connection close cases which is signaled as:
381 //
382 // - POLLIN|POLLHUP on the Unixes
383 // - POLLHUP on Windows.
384 //
385 // and the connect() failure case:
386 //
387 // - POLLHUP on FreeBSD/MacOSX
388 // - POLLOUT on Linux
389 //
390 // As the caller is only interested in POLLIN|POLLOUT, the POLLHUP would
391 // be unhandled and be reported on the next call of poll() again.
392 const auto revents =
393 ((ev.events & (POLLIN | POLLOUT)) && //
394 ((ev.revents & (POLLIN | POLLOUT | POLLHUP)) == POLLHUP))
395 ? ev.revents | (ev.events & (POLLIN | POLLOUT))
396 : ev.revents;
397
398 triggered_events_.emplace_back(ev.fd, revents);
399 if (ev.fd != wakeup_fds_.first) {
400 // mimik one-shot events.
401 //
402 // but don't remove interest in the wakeup file-descriptors
403 remove_fd_interest(ev.fd, revents);
404 }
405 }
406
407 if (0 == num_revents) break;
408 }
409
410 return pop_event();
411 }
412
414 std::chrono::milliseconds timeout) override {
415 if (!is_open()) {
417 make_error_code(std::errc::invalid_argument));
418 }
419
420 auto ev_res = [this]() -> stdx::expected<fd_event, std::error_code> {
421 std::lock_guard<std::mutex> lk(mtx_);
422
423 if (triggered_events_.empty()) {
424 // no event.
426 make_error_code(std::errc::no_such_file_or_directory));
427 }
428
429 return pop_event();
430 }();
431
432 if (!ev_res) {
433 if (ev_res.error() == std::errc::no_such_file_or_directory) {
434 ev_res = update_fd_events(timeout);
435 }
436
437 if (!ev_res) return stdx::make_unexpected(ev_res.error());
438 }
439
440 auto ev = *ev_res;
441
442 if (ev.fd == wakeup_fds_.first) {
443 on_notify();
445 }
446
447 return ev;
448 }
449
450 void on_notify() {
451 // 256 seems to be a nice sweetspot between "not run too many rounds" and
452 // "copy user space to kernel space"
453 std::array<uint8_t, 256> buf;
455 do {
456 res = impl::socket::recv(wakeup_fds_.first, buf.data(), buf.size(), 0);
457 } while (res ||
459 }
460
461 void notify() override {
462 // don't notify if there is no one listening
463 if (!is_open()) return;
464
466 do {
467 std::array<uint8_t, 1> buf = {{'.'}};
468 res = impl::socket::send(wakeup_fds_.second, buf.data(), buf.size(), 0);
469 // retry if interrupted
470 } while (res ==
472 }
473
474 /**
475 * remove interest of event from fd.
476 *
477 * mtx_ must be held, when called.
478 */
480 native_handle_type fd, short event) {
483 make_error_code(std::errc::invalid_argument));
484 }
485
487 }
488
489 private:
490 std::pair<impl::socket::native_handle_type, impl::socket::native_handle_type>
492
494
495 std::list<fd_event> triggered_events_;
496
497 // mutex for triggered_events
498 std::mutex mtx_;
499};
500} // namespace net
501
502#endif
Definition: io_service_base.h:87
impl::socket::native_handle_type native_handle_type
Definition: io_service_base.h:89
Definition: poll_io_service.h:132
stdx::expected< void, std::error_code > erase_fd_event(native_handle_type fd, short event)
Definition: poll_io_service.h:208
std::vector< pollfd > poll_fds() const
Definition: poll_io_service.h:180
std::vector< element_type > container_type
Definition: poll_io_service.h:142
static constexpr const size_t bucket_count_
Definition: poll_io_service.h:262
std::array< container_type, bucket_count_ > buckets_
Definition: poll_io_service.h:265
const container_type & bucket(native_handle_type fd) const
Definition: poll_io_service.h:244
stdx::expected< void, std::error_code > erase_all(native_handle_type fd)
Definition: poll_io_service.h:159
std::mutex mtx_
Definition: poll_io_service.h:264
std::optional< int32_t > interest(native_handle_type fd) const
Definition: poll_io_service.h:225
void push_back(element_type &&t)
Definition: poll_io_service.h:144
container_type & bucket(native_handle_type fd)
Definition: poll_io_service.h:238
io_service based on the poll() system-call.
Definition: poll_io_service.h:52
~poll_io_service() override
Definition: poll_io_service.h:54
stdx::expected< void, std::error_code > remove_fd_interest(native_handle_type fd, short event)
remove interest of event from fd.
Definition: poll_io_service.h:479
static stdx::expected< short, std::error_code > poll_event_from_wait_type(impl::socket::wait_type event)
Definition: poll_io_service.h:113
stdx::expected< fd_event, std::error_code > update_fd_events(std::chrono::milliseconds timeout)
Definition: poll_io_service.h:353
FdInterests fd_interests_
Definition: poll_io_service.h:493
stdx::expected< void, std::error_code > add_fd_interest(native_handle_type fd, impl::socket::wait_type event) override
Definition: poll_io_service.h:268
bool is_open() const noexcept
Definition: poll_io_service.h:61
static constexpr const short kAlwaysEnabledEvents
Definition: poll_io_service.h:57
std::optional< int32_t > interest(native_handle_type fd) const
get current fd-interest.
Definition: poll_io_service.h:317
static constexpr const short kAllEvents
Definition: poll_io_service.h:58
std::mutex mtx_
Definition: poll_io_service.h:498
void notify() override
Definition: poll_io_service.h:461
std::list< fd_event > triggered_events_
Definition: poll_io_service.h:495
stdx::expected< fd_event, std::error_code > poll_one(std::chrono::milliseconds timeout) override
Definition: poll_io_service.h:413
stdx::expected< fd_event, std::error_code > pop_event()
Definition: poll_io_service.h:321
stdx::expected< void, std::error_code > close()
Definition: poll_io_service.h:99
static constexpr const short kSettableEvents
Definition: poll_io_service.h:56
std::pair< impl::socket::native_handle_type, impl::socket::native_handle_type > wakeup_fds_
Definition: poll_io_service.h:491
stdx::expected< void, std::error_code > remove_fd(native_handle_type fd) override
remove fd from interest set.
Definition: poll_io_service.h:286
void on_notify()
Definition: poll_io_service.h:450
stdx::expected< void, std::error_code > open() noexcept override
open the io-service.
Definition: poll_io_service.h:66
constexpr const error_type & error() const &
Definition: expected.h:737
Definition: expected.h:944
static int count
Definition: myisam_ftdump.cc:43
static bool interrupted
Definition: mysqladmin.cc:66
Definition: buf0block_hint.cc:30
static bool timeout(bool(*wait_condition)())
Timeout function.
Definition: log0meb.cc:496
stdx::expected< size_t, std::error_code > poll(poll_fd *fds, size_t num_fds, std::chrono::milliseconds timeout)
Definition: poll.h:53
stdx::expected< std::pair< native_handle_type, native_handle_type >, error_type > socketpair(int family, int sock_type, int protocol)
socketpair().
Definition: socket.h:453
stdx::expected< size_t, error_type > send(native_handle_type native_handle, const void *buf, size_t buf_len, message_flags flags)
wrap send() in a portable way.
Definition: socket.h:273
stdx::expected< size_t, error_type > recv(native_handle_type native_handle, void *buf, size_t buf_len, message_flags flags)
wrap recv() in a portable way.
Definition: socket.h:199
stdx::expected< void, std::error_code > close(native_handle_type native_handle)
Definition: socket.h:75
wait_type
Definition: socket_constants.h:86
stdx::expected< bool, error_type > native_non_blocking(native_handle_type native_handle)
Definition: socket.h:106
constexpr const native_handle_type kInvalidSocket
Definition: socket_constants.h:52
Definition: buffer.h:45
std::error_condition make_error_condition(net::stream_errc e) noexcept
Definition: buffer.h:107
std::error_code make_error_code(net::stream_errc e) noexcept
Definition: buffer.h:103
constexpr auto make_unexpected(E &&e) -> unexpected< std::decay_t< E > >
Definition: expected.h:125
required string event
Definition: replication_group_member_actions.proto:32
Definition: io_service_base.h:69
native_handle_type fd
Definition: io_service_base.h:75
short event
Definition: io_service_base.h:76