MySQL 8.4.3
Source Code Documentation
poll_io_service.h
Go to the documentation of this file.
1/*
2 Copyright (c) 2019, 2024, Oracle and/or its affiliates.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is designed to work with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have either included with
14 the program or referenced in the documentation.
15
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License for more details.
20
21 You should have received a copy of the GNU General Public License
22 along with this program; if not, write to the Free Software
23 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
24*/
25
26#ifndef MYSQL_HARNESS_NET_TS_IMPL_POLL_IO_SERVICE_H_
27#define MYSQL_HARNESS_NET_TS_IMPL_POLL_IO_SERVICE_H_
28
29#include <array>
30#include <list>
31#include <mutex>
32#include <optional>
33#include <system_error>
34#include <vector>
35
40
41namespace net {
42
43/**
44 * io_service based on the poll() system-call.
45 *
46 *
47 */
48
49// https://daniel.haxx.se/blog/2016/10/11/poll-on-mac-10-12-is-broken/
50// https://daniel.haxx.se/blog/2012/10/10/wsapoll-is-broken/
51// http://www.greenend.org.uk/rjk/tech/poll.html
53 public:
54 ~poll_io_service() override { close(); }
55
56 static constexpr const short kSettableEvents = POLLIN | POLLOUT;
57 static constexpr const short kAlwaysEnabledEvents = POLLHUP | POLLERR;
58 static constexpr const short kAllEvents =
60
61 bool is_open() const noexcept {
64 }
65
67 if (is_open()) {
69 }
70
71#if defined(_WIN32)
72 // on windows we build a AF_INET socketpair()
73 auto pipe_res = impl::socket::socketpair(AF_INET, SOCK_STREAM, 0);
74#else
75 auto pipe_res = impl::socket::socketpair(AF_UNIX, SOCK_STREAM, 0);
76#endif
77 if (!pipe_res) return stdx::unexpected(pipe_res.error());
78
79 wakeup_fds_ = *pipe_res;
80
81 // set both ends of the pipe non-blocking as
82 //
83 // - read() shouldn't block if pipe is empty
84 // - write() shouldn't block if pipe is full as it only matters there is
85 // something in the pipe to wakeup the poll_one()
86 auto non_block_wakeup_0_res =
88 if (!non_block_wakeup_0_res) return non_block_wakeup_0_res;
89 auto non_block_wakeup_1_res =
91 if (!non_block_wakeup_1_res) return non_block_wakeup_1_res;
92
94
95 return {};
96 }
97
102 }
105
107 }
108
109 return {};
110 }
111
114 switch (event) {
116 return POLLIN;
118 return POLLOUT;
120 return POLLERR | POLLHUP;
121 default:
122 return stdx::unexpected(make_error_code(std::errc::invalid_argument));
123 }
124 }
125
126 // split fd-interest
127 //
128 // internally splits the fds into multiple buckets to reduce
129 // the search-space and lead to lower resize cost
131 public:
133
134 // we could use list, deque, vector, ... here
135 //
136 // container_type | concurrency | mem-usage | tps
137 // ---------------+-------------+-----------+------
138 // list: | 8000 | 137M | 56000
139 // vector | 8000 | 145M | 58000
140 using container_type = std::vector<element_type>;
141
143 auto &b = bucket(t.fd);
144
145 std::lock_guard<std::mutex> lk(mtx_);
146
147 auto it = std::find_if(b.begin(), b.end(), [fd = t.fd](auto fd_ev) {
148 return fd_ev.fd == fd;
149 });
150 if (it == b.end()) {
151 b.push_back(std::move(t));
152 } else {
153 it->event |= t.event;
154 }
155 }
156
158 auto &b = bucket(fd);
159
160 std::lock_guard<std::mutex> lk(mtx_);
161 const auto end = b.end();
162
163 for (auto cur = b.begin(); cur != end;) {
164 if (cur->fd == fd) {
165 cur = b.erase(cur);
166
167 return {};
168 } else {
169 ++cur;
170 }
171 }
172
173 // not found
174 return stdx::unexpected(
175 make_error_code(std::errc::no_such_file_or_directory));
176 }
177
178 std::vector<pollfd> poll_fds() const {
179 std::vector<pollfd> fds;
180 {
181 std::lock_guard<std::mutex> lk(mtx_);
182 size_t count{};
183
184 for (const auto &b : buckets_) {
185 count += b.size();
186 }
187
188 // reserve a few more than needed.
189 fds.reserve(count);
190
191 for (const auto &b : buckets_) {
192 for (auto const &fd_int : b) {
193 if (fd_int.event != 0) {
194 fds.push_back(
195 {fd_int.fd,
196 static_cast<short>(fd_int.event & ~kAlwaysEnabledEvents),
197 0});
198 }
199 }
200 }
201 }
202
203 return fds;
204 }
205
207 short event) {
208 auto &b = bucket(fd);
209
210 std::lock_guard<std::mutex> lk(mtx_);
211 auto it = std::find_if(b.begin(), b.end(),
212 [fd](auto fd_ev) { return fd_ev.fd == fd; });
213 if (it == b.end()) {
214 return stdx::unexpected(
215 make_error_code(std::errc::no_such_file_or_directory));
216 }
217
218 it->event &= ~event;
219
220 return {};
221 }
222
223 std::optional<int32_t> interest(native_handle_type fd) const {
224 auto &b = bucket(fd);
225
226 std::lock_guard<std::mutex> lk(mtx_);
227
228 for (auto const &fd_ev : b) {
229 if (fd_ev.fd == fd) return fd_ev.event;
230 }
231
232 return std::nullopt;
233 }
234
235 private:
237 size_t ndx = fd % buckets_.size();
238
239 return buckets_[ndx];
240 }
241
243 size_t ndx = fd % buckets_.size();
244
245 return buckets_[ndx];
246 }
247
248 // tps @8000 client connections
249 //
250 // cnt : tps
251 // ----:------
252 // 1: 32000
253 // 3: 45000
254 // 7: 54000
255 // 13: 56000
256 // 23: 57000
257 // 47: 58000
258 // 101: 58000
259 // 1009: 57000
260 static constexpr const size_t bucket_count_{101};
261
262 mutable std::mutex mtx_;
263 std::array<container_type, bucket_count_> buckets_;
264 };
265
269 return stdx::unexpected(make_error_code(std::errc::invalid_argument));
270 }
271
272 auto event_res = poll_event_from_wait_type(event);
273 if (!event_res) return stdx::unexpected(event_res.error());
274
275 fd_interests_.push_back({fd, event_res.value()});
276
277 return {};
278 }
279
280 /**
281 * remove fd from interest set.
282 */
284 native_handle_type fd) override {
286 return stdx::unexpected(make_error_code(std::errc::invalid_argument));
287 }
288
289 std::lock_guard<std::mutex> lk(mtx_);
290
291 auto res = fd_interests_.erase_all(fd);
292 if (res) {
293 // remove all events which are already fetched by poll_one()
294
295 auto end = triggered_events_.end();
296 for (auto cur = triggered_events_.begin(); cur != end;) {
297 if (cur->fd == fd) {
298 cur = triggered_events_.erase(cur);
299 } else {
300 ++cur;
301 }
302 }
303 }
304
305 return res;
306 }
307
308 /**
309 * get current fd-interest.
310 *
311 * @returns fd-interest as bitmask of raw POLL* flags
312 */
313 std::optional<int32_t> interest(native_handle_type fd) const {
314 return fd_interests_.interest(fd);
315 }
316
318 fd_event ev;
319
320 auto &head = triggered_events_.front();
321
322 ev.fd = head.fd;
323
324 // if there are multiple events: get OUT before IN.
325 if (head.event & POLLOUT) {
326 head.event &= ~POLLOUT;
327 ev.event = POLLOUT;
328 } else if (head.event & POLLIN) {
329 // disable HUP if it is sent together with IN
330 if (head.event & POLLHUP) head.event &= ~POLLHUP;
331
332 head.event &= ~POLLIN;
333 ev.event = POLLIN;
334 } else if (head.event & POLLERR) {
335 head.event &= ~POLLERR;
336 ev.event = POLLERR;
337 } else if (head.event & POLLHUP) {
338 head.event &= ~POLLHUP;
339 ev.event = POLLHUP;
340 }
341
342 if ((head.event & (POLLIN | POLLOUT | POLLERR | POLLHUP)) == 0) {
343 triggered_events_.pop_front();
344 }
345
346 return ev;
347 }
348
350 std::chrono::milliseconds timeout) {
351 // build fds for poll() from fd-interest
352
353 auto poll_fds = fd_interests_.poll_fds();
354 auto res = impl::poll::poll(poll_fds.data(), poll_fds.size(), timeout);
355 if (!res) return stdx::unexpected(res.error());
356
357 size_t num_revents = res.value(); // number of pollfds with revents
358
359 // translate poll()'s revents into triggered events.
360 std::lock_guard lk(mtx_);
361
362 for (auto ev : poll_fds) {
363 if (ev.revents != 0) {
364 --num_revents;
365
366 // If the caller wants (ev.events) only:
367 //
368 // - POLLIN|POLLOUT
369 //
370 // but poll() returns:
371 //
372 // - POLLHUP
373 //
374 // then return POLLIN|POLLOUT.
375 //
376 // This handles the connection close cases which is signaled as:
377 //
378 // - POLLIN|POLLHUP on the Unixes
379 // - POLLHUP on Windows.
380 //
381 // and the connect() failure case:
382 //
383 // - POLLHUP on FreeBSD/MacOSX
384 // - POLLOUT on Linux
385 //
386 // As the caller is only interested in POLLIN|POLLOUT, the POLLHUP would
387 // be unhandled and be reported on the next call of poll() again.
388 const auto revents =
389 ((ev.events & (POLLIN | POLLOUT)) && //
390 ((ev.revents & (POLLIN | POLLOUT | POLLHUP)) == POLLHUP))
391 ? ev.revents | (ev.events & (POLLIN | POLLOUT))
392 : ev.revents;
393
394 triggered_events_.emplace_back(ev.fd, revents);
395 if (ev.fd != wakeup_fds_.first) {
396 // mimik one-shot events.
397 //
398 // but don't remove interest in the wakeup file-descriptors
399 remove_fd_interest(ev.fd, revents);
400 }
401 }
402
403 if (0 == num_revents) break;
404 }
405
406 return pop_event();
407 }
408
410 std::chrono::milliseconds timeout) override {
411 if (!is_open()) {
412 return stdx::unexpected(make_error_code(std::errc::invalid_argument));
413 }
414
415 auto ev_res = [this]() -> stdx::expected<fd_event, std::error_code> {
416 std::lock_guard<std::mutex> lk(mtx_);
417
418 if (triggered_events_.empty()) {
419 // no event.
420 return stdx::unexpected(
421 make_error_code(std::errc::no_such_file_or_directory));
422 }
423
424 return pop_event();
425 }();
426
427 if (!ev_res) {
428 if (ev_res.error() == std::errc::no_such_file_or_directory) {
429 ev_res = update_fd_events(timeout);
430 }
431
432 if (!ev_res) return stdx::unexpected(ev_res.error());
433 }
434
435 auto ev = *ev_res;
436
437 if (ev.fd == wakeup_fds_.first) {
438 on_notify();
440 }
441
442 return ev;
443 }
444
445 void on_notify() {
446 // 256 seems to be a nice sweetspot between "not run too many rounds" and
447 // "copy user space to kernel space"
448 std::array<uint8_t, 256> buf;
450 do {
451 res = impl::socket::recv(wakeup_fds_.first, buf.data(), buf.size(), 0);
452 } while (res ||
454 }
455
456 void notify() override {
457 // don't notify if there is no one listening
458 if (!is_open()) return;
459
461 do {
462 std::array<uint8_t, 1> buf = {{'.'}};
463 res = impl::socket::send(wakeup_fds_.second, buf.data(), buf.size(), 0);
464 // retry if interrupted
466 }
467
468 /**
469 * remove interest of event from fd.
470 *
471 * mtx_ must be held, when called.
472 */
474 native_handle_type fd, short event) {
476 return stdx::unexpected(make_error_code(std::errc::invalid_argument));
477 }
478
480 }
481
482 private:
483 std::pair<impl::socket::native_handle_type, impl::socket::native_handle_type>
485
487
488 std::list<fd_event> triggered_events_;
489
490 // mutex for triggered_events
491 std::mutex mtx_;
492};
493} // namespace net
494
495#endif
Definition: io_service_base.h:87
impl::socket::native_handle_type native_handle_type
Definition: io_service_base.h:89
Definition: poll_io_service.h:130
stdx::expected< void, std::error_code > erase_fd_event(native_handle_type fd, short event)
Definition: poll_io_service.h:206
std::vector< pollfd > poll_fds() const
Definition: poll_io_service.h:178
std::vector< element_type > container_type
Definition: poll_io_service.h:140
static constexpr const size_t bucket_count_
Definition: poll_io_service.h:260
std::array< container_type, bucket_count_ > buckets_
Definition: poll_io_service.h:263
const container_type & bucket(native_handle_type fd) const
Definition: poll_io_service.h:242
stdx::expected< void, std::error_code > erase_all(native_handle_type fd)
Definition: poll_io_service.h:157
std::mutex mtx_
Definition: poll_io_service.h:262
std::optional< int32_t > interest(native_handle_type fd) const
Definition: poll_io_service.h:223
void push_back(element_type &&t)
Definition: poll_io_service.h:142
container_type & bucket(native_handle_type fd)
Definition: poll_io_service.h:236
io_service based on the poll() system-call.
Definition: poll_io_service.h:52
~poll_io_service() override
Definition: poll_io_service.h:54
stdx::expected< void, std::error_code > remove_fd_interest(native_handle_type fd, short event)
remove interest of event from fd.
Definition: poll_io_service.h:473
static stdx::expected< short, std::error_code > poll_event_from_wait_type(impl::socket::wait_type event)
Definition: poll_io_service.h:112
stdx::expected< fd_event, std::error_code > update_fd_events(std::chrono::milliseconds timeout)
Definition: poll_io_service.h:349
FdInterests fd_interests_
Definition: poll_io_service.h:486
stdx::expected< void, std::error_code > add_fd_interest(native_handle_type fd, impl::socket::wait_type event) override
Definition: poll_io_service.h:266
bool is_open() const noexcept
Definition: poll_io_service.h:61
static constexpr const short kAlwaysEnabledEvents
Definition: poll_io_service.h:57
std::optional< int32_t > interest(native_handle_type fd) const
get current fd-interest.
Definition: poll_io_service.h:313
static constexpr const short kAllEvents
Definition: poll_io_service.h:58
std::mutex mtx_
Definition: poll_io_service.h:491
void notify() override
Definition: poll_io_service.h:456
std::list< fd_event > triggered_events_
Definition: poll_io_service.h:488
stdx::expected< fd_event, std::error_code > poll_one(std::chrono::milliseconds timeout) override
Definition: poll_io_service.h:409
stdx::expected< fd_event, std::error_code > pop_event()
Definition: poll_io_service.h:317
stdx::expected< void, std::error_code > close()
Definition: poll_io_service.h:98
static constexpr const short kSettableEvents
Definition: poll_io_service.h:56
std::pair< impl::socket::native_handle_type, impl::socket::native_handle_type > wakeup_fds_
Definition: poll_io_service.h:484
stdx::expected< void, std::error_code > remove_fd(native_handle_type fd) override
remove fd from interest set.
Definition: poll_io_service.h:283
void on_notify()
Definition: poll_io_service.h:445
stdx::expected< void, std::error_code > open() noexcept override
open the io-service.
Definition: poll_io_service.h:66
Definition: expected.h:286
constexpr const error_type & error() const &
Definition: expected.h:760
Definition: expected.h:112
static char buf[MAX_BUF]
Definition: conf_to_src.cc:73
static int count
Definition: myisam_ftdump.cc:45
static bool interrupted
Definition: mysqladmin.cc:72
Definition: buf0block_hint.cc:30
Container::const_iterator find_if(const Container &c, Find_if &&find_if)
Definition: generic.h:54
static bool timeout(bool(*wait_condition)())
Timeout function.
Definition: log0meb.cc:498
stdx::expected< size_t, std::error_code > poll(poll_fd *fds, size_t num_fds, std::chrono::milliseconds timeout)
Definition: poll.h:53
stdx::expected< std::pair< native_handle_type, native_handle_type >, error_type > socketpair(int family, int sock_type, int protocol)
socketpair().
Definition: socket.h:452
stdx::expected< size_t, error_type > send(native_handle_type native_handle, const void *buf, size_t buf_len, message_flags flags)
wrap send() in a portable way.
Definition: socket.h:273
stdx::expected< size_t, error_type > recv(native_handle_type native_handle, void *buf, size_t buf_len, message_flags flags)
wrap recv() in a portable way.
Definition: socket.h:199
stdx::expected< void, std::error_code > close(native_handle_type native_handle)
Definition: socket.h:75
wait_type
Definition: socket_constants.h:86
stdx::expected< bool, error_type > native_non_blocking(native_handle_type native_handle)
Definition: socket.h:106
constexpr const native_handle_type kInvalidSocket
Definition: socket_constants.h:52
Definition: buffer.h:45
std::error_condition make_error_condition(net::stream_errc e) noexcept
Definition: buffer.h:107
std::error_code make_error_code(net::stream_errc e) noexcept
Definition: buffer.h:103
unexpected(E) -> unexpected< E >
required string event
Definition: replication_group_member_actions.proto:32
Definition: io_service_base.h:69
native_handle_type fd
Definition: io_service_base.h:75
short event
Definition: io_service_base.h:76