From 797dc34dea5e65bfda621d2b8f20ad6a5912d94b Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Thu, 12 Sep 2024 10:22:17 +0300 Subject: [PATCH 01/28] http/client: Coroutinize connection::write_body And brush up the if-else nesting a little bit to avoid co_return-s Signed-off-by: Pavel Emelyanov --- src/http/client.cc | 22 +++++++++------------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/src/http/client.cc b/src/http/client.cc index 4d618f7f8c..8048332b02 100644 --- a/src/http/client.cc +++ b/src/http/client.cc @@ -24,6 +24,7 @@ module; #endif #include +#include #include #include #include @@ -77,21 +78,16 @@ connection::connection(connected_socket&& fd, internal::client_ref cr) future<> connection::write_body(const request& req) { if (req.body_writer) { if (req.content_length != 0) { - return req.body_writer(internal::make_http_content_length_output_stream(_write_buf, req.content_length, req._bytes_written)).then([&req] { - if (req.content_length == req._bytes_written) { - return make_ready_future<>(); - } else { - return make_exception_future<>(std::runtime_error(format("partial request body write, need {} sent {}", req.content_length, req._bytes_written))); - } - }); + co_await req.body_writer(internal::make_http_content_length_output_stream(_write_buf, req.content_length, req._bytes_written)); + if (req.content_length != req._bytes_written) { + throw std::runtime_error(format("partial request body write, need {} sent {}", req.content_length, req._bytes_written)); + } + } else { + co_await req.body_writer(internal::make_http_chunked_output_stream(_write_buf)); + co_await _write_buf.write("0\r\n\r\n"); } - return req.body_writer(internal::make_http_chunked_output_stream(_write_buf)).then([this] { - return _write_buf.write("0\r\n\r\n"); - }); } else if (!req.content.empty()) { - return _write_buf.write(req.content); - } else { - return make_ready_future<>(); + co_await _write_buf.write(req.content); } } From 32365d41816bce8e3e2abb4c786f11e978692101 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Thu, 12 Sep 2024 10:23:44 +0300 Subject: [PATCH 02/28] http/client: Coroutinize connection::maybe_wait_for_continue For simplicity, wrap the entrance `if` around. Signed-off-by: Pavel Emelyanov --- src/http/client.cc | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/src/http/client.cc b/src/http/client.cc index 8048332b02..a58048cea7 100644 --- a/src/http/client.cc +++ b/src/http/client.cc @@ -92,19 +92,14 @@ future<> connection::write_body(const request& req) { } future connection::maybe_wait_for_continue(const request& req) { - if (req.get_header("Expect") == "") { - return make_ready_future(nullptr); + if (req.get_header("Expect") != "") { + co_await _write_buf.flush(); + reply_ptr rep = co_await recv_reply(); + if (rep->_status != reply::status_type::continue_) { + co_return rep; + } } - - return _write_buf.flush().then([this] { - return recv_reply().then([] (reply_ptr rep) { - if (rep->_status == reply::status_type::continue_) { - return make_ready_future(nullptr); - } else { - return make_ready_future(std::move(rep)); - } - }); - }); + co_return nullptr; } void connection::setup_request(request& req) { From e8df44b944a675761eb4189ef124006993c1a269 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Thu, 12 Sep 2024 10:24:13 +0300 Subject: [PATCH 03/28] http/client: Coroutinize connection::send_request_head Signed-off-by: Pavel Emelyanov --- src/http/client.cc | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/http/client.cc b/src/http/client.cc index a58048cea7..07d8d1c623 100644 --- a/src/http/client.cc +++ b/src/http/client.cc @@ -115,11 +115,9 @@ void connection::setup_request(request& req) { } future<> connection::send_request_head(const request& req) { - return _write_buf.write(req.request_line()).then([this, &req] { - return req.write_request_headers(_write_buf).then([this] { - return _write_buf.write("\r\n", 2); - }); - }); + co_await _write_buf.write(req.request_line()); + co_await req.write_request_headers(_write_buf); + co_await _write_buf.write("\r\n", 2); } future connection::recv_reply() { From dd51c2c2ce0f3406edf66fa3ec6ee8f9b9c2df76 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Thu, 12 Sep 2024 10:25:08 +0300 Subject: [PATCH 04/28] http/client: Coroutinize connection::recv_reply And leave indentation broken until next patch Signed-off-by: Pavel Emelyanov --- src/http/client.cc | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/src/http/client.cc b/src/http/client.cc index 07d8d1c623..75f7f1f17c 100644 --- a/src/http/client.cc +++ b/src/http/client.cc @@ -122,9 +122,8 @@ future<> connection::send_request_head(const request& req) { future connection::recv_reply() { http_response_parser parser; - return do_with(std::move(parser), [this] (auto& parser) { - parser.init(); - return _read_buf.consume(parser).then([this, &parser] { + parser.init(); + co_await _read_buf.consume(parser); if (parser.eof()) { http_log.trace("Parsing response EOFed"); throw std::system_error(ECONNABORTED, std::system_category()); @@ -140,9 +139,7 @@ future connection::recv_reply() { if ((resp->_version != "1.1") || seastar::internal::case_insensitive_cmp()(resp->get_header("Connection"), "close")) { _persistent = false; } - return make_ready_future(std::move(resp)); - }); - }); + co_return resp; } future connection::do_make_request(request& req) { From ad9833d83899c665fbea0894d0a8aad01921931f Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Thu, 12 Sep 2024 10:25:20 +0300 Subject: [PATCH 05/28] http/client: Restore indentation of connection::recv_reply Signed-off-by: Pavel Emelyanov --- src/http/client.cc | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/src/http/client.cc b/src/http/client.cc index 75f7f1f17c..67adaae162 100644 --- a/src/http/client.cc +++ b/src/http/client.cc @@ -124,22 +124,22 @@ future connection::recv_reply() { http_response_parser parser; parser.init(); co_await _read_buf.consume(parser); - if (parser.eof()) { - http_log.trace("Parsing response EOFed"); - throw std::system_error(ECONNABORTED, std::system_category()); - } - if (parser.failed()) { - http_log.trace("Parsing response failed"); - throw std::runtime_error("Invalid http server response"); - } + if (parser.eof()) { + http_log.trace("Parsing response EOFed"); + throw std::system_error(ECONNABORTED, std::system_category()); + } + if (parser.failed()) { + http_log.trace("Parsing response failed"); + throw std::runtime_error("Invalid http server response"); + } - auto resp = parser.get_parsed_response(); - sstring length_header = resp->get_header("Content-Length"); - resp->content_length = strtol(length_header.c_str(), nullptr, 10); - if ((resp->_version != "1.1") || seastar::internal::case_insensitive_cmp()(resp->get_header("Connection"), "close")) { - _persistent = false; - } - co_return resp; + auto resp = parser.get_parsed_response(); + sstring length_header = resp->get_header("Content-Length"); + resp->content_length = strtol(length_header.c_str(), nullptr, 10); + if ((resp->_version != "1.1") || seastar::internal::case_insensitive_cmp()(resp->get_header("Connection"), "close")) { + _persistent = false; + } + co_return resp; } future connection::do_make_request(request& req) { From 9b46ea5d4b7f2281f539db1e6621e3e71082bc8e Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Thu, 12 Sep 2024 10:26:21 +0300 Subject: [PATCH 06/28] http/client: Coroutinize connection::do_make_request Signed-off-by: Pavel Emelyanov --- src/http/client.cc | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/src/http/client.cc b/src/http/client.cc index 67adaae162..b6be5de7fc 100644 --- a/src/http/client.cc +++ b/src/http/client.cc @@ -144,19 +144,15 @@ future connection::recv_reply() { future connection::do_make_request(request& req) { setup_request(req); - return send_request_head(req).then([this, &req] { - return maybe_wait_for_continue(req).then([this, &req] (reply_ptr cont) { + co_await send_request_head(req); + reply_ptr cont = co_await maybe_wait_for_continue(req); if (cont) { - return make_ready_future(std::move(cont)); + co_return cont; } - return write_body(req).then([this] { - return _write_buf.flush().then([this] { - return recv_reply(); - }); - }); - }); - }); + co_await write_body(req); + co_await _write_buf.flush(); + co_return co_await recv_reply(); } future connection::make_request(request req) { From 86d6a8b74aacb592d5eb5b8f0837a92aa66b1fb1 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Thu, 12 Sep 2024 10:26:31 +0300 Subject: [PATCH 07/28] http/client: Restore indentation of connection::do_make_request Signed-off-by: Pavel Emelyanov --- src/http/client.cc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/http/client.cc b/src/http/client.cc index b6be5de7fc..13e110edbc 100644 --- a/src/http/client.cc +++ b/src/http/client.cc @@ -146,9 +146,9 @@ future connection::do_make_request(request& req) { setup_request(req); co_await send_request_head(req); reply_ptr cont = co_await maybe_wait_for_continue(req); - if (cont) { - co_return cont; - } + if (cont) { + co_return cont; + } co_await write_body(req); co_await _write_buf.flush(); From 14e3efcc95880d2b98e0001326cf8a811c35e95b Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Thu, 12 Sep 2024 10:27:53 +0300 Subject: [PATCH 08/28] http/client: Squash maybe_wait_for_continue into do_make_request The former helper was introduced to reduce the netsing of .then-chains. Now when it's a coroutinie, it's better be squashed with its only caller. Signed-off-by: Pavel Emelyanov --- include/seastar/http/client.hh | 1 - src/http/client.cc | 21 +++++++-------------- 2 files changed, 7 insertions(+), 15 deletions(-) diff --git a/include/seastar/http/client.hh b/include/seastar/http/client.hh index 09e3b8d2ed..2fb06b0ba8 100644 --- a/include/seastar/http/client.hh +++ b/include/seastar/http/client.hh @@ -129,7 +129,6 @@ private: future do_make_request(request& rq); void setup_request(request& rq); future<> send_request_head(const request& rq); - future maybe_wait_for_continue(const request& req); future<> write_body(const request& rq); future recv_reply(); diff --git a/src/http/client.cc b/src/http/client.cc index 13e110edbc..43bf19ccd7 100644 --- a/src/http/client.cc +++ b/src/http/client.cc @@ -91,17 +91,6 @@ future<> connection::write_body(const request& req) { } } -future connection::maybe_wait_for_continue(const request& req) { - if (req.get_header("Expect") != "") { - co_await _write_buf.flush(); - reply_ptr rep = co_await recv_reply(); - if (rep->_status != reply::status_type::continue_) { - co_return rep; - } - } - co_return nullptr; -} - void connection::setup_request(request& req) { if (req._version.empty()) { req._version = "1.1"; @@ -145,9 +134,13 @@ future connection::recv_reply() { future connection::do_make_request(request& req) { setup_request(req); co_await send_request_head(req); - reply_ptr cont = co_await maybe_wait_for_continue(req); - if (cont) { - co_return cont; + + if (req.get_header("Expect") != "") { + co_await _write_buf.flush(); + reply_ptr rep = co_await recv_reply(); + if (rep->_status != reply::status_type::continue_) { + co_return rep; + } } co_await write_body(req); From c5a63bea2f10920e1a727a659a8d2c17a199cc91 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Thu, 12 Sep 2024 10:28:17 +0300 Subject: [PATCH 09/28] http/client Squash send_request_head into do_make_request Same as previous patch -- the former method was a way to reduce indentation level for the latter. Now they look good enough when squashed. Signed-off-by: Pavel Emelyanov --- include/seastar/http/client.hh | 1 - src/http/client.cc | 10 +++------- 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/include/seastar/http/client.hh b/include/seastar/http/client.hh index 2fb06b0ba8..703b54bb4c 100644 --- a/include/seastar/http/client.hh +++ b/include/seastar/http/client.hh @@ -128,7 +128,6 @@ public: private: future do_make_request(request& rq); void setup_request(request& rq); - future<> send_request_head(const request& rq); future<> write_body(const request& rq); future recv_reply(); diff --git a/src/http/client.cc b/src/http/client.cc index 43bf19ccd7..65db709b4d 100644 --- a/src/http/client.cc +++ b/src/http/client.cc @@ -103,12 +103,6 @@ void connection::setup_request(request& req) { } } -future<> connection::send_request_head(const request& req) { - co_await _write_buf.write(req.request_line()); - co_await req.write_request_headers(_write_buf); - co_await _write_buf.write("\r\n", 2); -} - future connection::recv_reply() { http_response_parser parser; parser.init(); @@ -133,7 +127,9 @@ future connection::recv_reply() { future connection::do_make_request(request& req) { setup_request(req); - co_await send_request_head(req); + co_await _write_buf.write(req.request_line()); + co_await req.write_request_headers(_write_buf); + co_await _write_buf.write("\r\n", 2); if (req.get_header("Expect") != "") { co_await _write_buf.flush(); From 9b473bf650e6a4918c82b6c3edf46d241dffaf08 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Thu, 12 Sep 2024 10:29:45 +0300 Subject: [PATCH 10/28] http/client: Coroutinize connection::make_request Signed-off-by: Pavel Emelyanov --- src/http/client.cc | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/src/http/client.cc b/src/http/client.cc index 65db709b4d..17b105ebd0 100644 --- a/src/http/client.cc +++ b/src/http/client.cc @@ -145,11 +145,8 @@ future connection::do_make_request(request& req) { } future connection::make_request(request req) { - return do_with(std::move(req), [this] (auto& req) { - return do_make_request(req).then([] (reply_ptr rep) { - return make_ready_future(std::move(*rep)); - }); - }); + reply_ptr rep = co_await do_make_request(req); + co_return std::move(*rep); } input_stream connection::in(reply& rep) { From ffacf242e923c09eb8b3ebe7973bbeb0423ccbba Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Thu, 12 Sep 2024 10:30:22 +0300 Subject: [PATCH 11/28] http/client: Coroutinize connection::close There's a trickery there. After _closed future is resolved "this" may become freed already. So to make the last log message printed, the this->local_address was saved into .then capture. When coroutinized, that field is kept on stack. Signed-off-by: Pavel Emelyanov --- src/http/client.cc | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/http/client.cc b/src/http/client.cc index 17b105ebd0..7bbca95262 100644 --- a/src/http/client.cc +++ b/src/http/client.cc @@ -163,12 +163,10 @@ void connection::shutdown() noexcept { } future<> connection::close() { - return when_all(_read_buf.close(), _write_buf.close()).discard_result().then([this] { + co_await when_all(_read_buf.close(), _write_buf.close()); auto la = _fd.local_address(); - return std::move(_closed).then([la = std::move(la)] { + co_await std::move(_closed); http_log.trace("destroyed connection {}", la); - }); - }); } class basic_connection_factory : public connection_factory { From cb1b4b679f1ae021e3f6d9fa43deb4d28fc3ad44 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Thu, 12 Sep 2024 10:30:30 +0300 Subject: [PATCH 12/28] http/client: Restore indentation of connection::close Signed-off-by: Pavel Emelyanov --- src/http/client.cc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/http/client.cc b/src/http/client.cc index 7bbca95262..41ff41a524 100644 --- a/src/http/client.cc +++ b/src/http/client.cc @@ -164,9 +164,9 @@ void connection::shutdown() noexcept { future<> connection::close() { co_await when_all(_read_buf.close(), _write_buf.close()); - auto la = _fd.local_address(); - co_await std::move(_closed); - http_log.trace("destroyed connection {}", la); + auto la = _fd.local_address(); + co_await std::move(_closed); + http_log.trace("destroyed connection {}", la); } class basic_connection_factory : public connection_factory { From bd36cf365ad3d96b331e23936c5c5347266f9239 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Thu, 12 Sep 2024 10:33:03 +0300 Subject: [PATCH 13/28] http/client: Coroutinize client::make_connection Signed-off-by: Pavel Emelyanov --- src/http/client.cc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/http/client.cc b/src/http/client.cc index 41ff41a524..b4cd042cdc 100644 --- a/src/http/client.cc +++ b/src/http/client.cc @@ -237,11 +237,11 @@ future client::get_connection(abort_source* as) { future client::make_connection(abort_source* as) { _total_new_connections++; - return _new_connections->make(as).then([cr = internal::client_ref(this)] (connected_socket cs) mutable { + auto cr = internal::client_ref(this); + connected_socket cs = co_await _new_connections->make(as); http_log.trace("created new http connection {}", cs.local_address()); auto con = seastar::make_shared(std::move(cs), std::move(cr)); - return make_ready_future(std::move(con)); - }); + co_return con; } future<> client::put_connection(connection_ptr con) { From bdfdf166993c0f2782b63c6ab94309186fc4c317 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Thu, 12 Sep 2024 10:33:16 +0300 Subject: [PATCH 14/28] http/client: Restore indentation of client::make_connection Signed-off-by: Pavel Emelyanov --- src/http/client.cc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/http/client.cc b/src/http/client.cc index b4cd042cdc..6442c32e83 100644 --- a/src/http/client.cc +++ b/src/http/client.cc @@ -239,9 +239,9 @@ future client::make_connection(abort_source* as) { _total_new_connections++; auto cr = internal::client_ref(this); connected_socket cs = co_await _new_connections->make(as); - http_log.trace("created new http connection {}", cs.local_address()); - auto con = seastar::make_shared(std::move(cs), std::move(cr)); - co_return con; + http_log.trace("created new http connection {}", cs.local_address()); + auto con = seastar::make_shared(std::move(cs), std::move(cr)); + co_return con; } future<> client::put_connection(connection_ptr con) { From 2d1550edda21e0a8950ba89130cf467f275e3662 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Thu, 12 Sep 2024 10:41:06 +0300 Subject: [PATCH 15/28] http/client: Coroutinize client::put_connection There's a trickery here. After con->close() is resolved, the con may be the last pointer holding the connection alive. Before this patch, it was saved on .finally capture, after it -- it's preserved as on-stack variable. Signed-off-by: Pavel Emelyanov --- src/http/client.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/http/client.cc b/src/http/client.cc index 6442c32e83..9e30f40b19 100644 --- a/src/http/client.cc +++ b/src/http/client.cc @@ -249,11 +249,11 @@ future<> client::put_connection(connection_ptr con) { http_log.trace("push http connection {} to pool", con->_fd.local_address()); _pool.push_back(*con); _wait_con.signal(); - return make_ready_future<>(); + co_return; } http_log.trace("dropping connection {}", con->_fd.local_address()); - return con->close().finally([con] {}); + co_await con->close(); } future<> client::shrink_connections() { From 215e0245c8dc61b8e6e3ce8a3ba6078180b1a096 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Thu, 12 Sep 2024 10:42:06 +0300 Subject: [PATCH 16/28] http/client: Coroutinize client::shrink_connections In its .then-chain form the method calls itself to emulate "goto again" without goto. To do the same with coroutines, wrap it into a loop. Signed-off-by: Pavel Emelyanov --- src/http/client.cc | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/src/http/client.cc b/src/http/client.cc index 9e30f40b19..2196c6734c 100644 --- a/src/http/client.cc +++ b/src/http/client.cc @@ -257,21 +257,16 @@ future<> client::put_connection(connection_ptr con) { } future<> client::shrink_connections() { - if (_nr_connections <= _max_connections) { - return make_ready_future<>(); - } - + while (_nr_connections > _max_connections) { if (!_pool.empty()) { connection_ptr con = _pool.front().shared_from_this(); _pool.pop_front(); - return con->close().finally([this, con] { - return shrink_connections(); - }); + co_await con->close(); + continue; } - return _wait_con.wait().then([this] { - return shrink_connections(); - }); + co_await _wait_con.wait(); + } } future<> client::set_maximum_connections(unsigned nr) { From 88de135cb2ca5de63c378bfec85a48f5d81cf991 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Thu, 12 Sep 2024 10:46:43 +0300 Subject: [PATCH 17/28] http/client: Restore indentation of client::shrink_connections Signed-off-by: Pavel Emelyanov --- src/http/client.cc | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/src/http/client.cc b/src/http/client.cc index 2196c6734c..75bfb0166b 100644 --- a/src/http/client.cc +++ b/src/http/client.cc @@ -257,16 +257,16 @@ future<> client::put_connection(connection_ptr con) { } future<> client::shrink_connections() { - while (_nr_connections > _max_connections) { - if (!_pool.empty()) { - connection_ptr con = _pool.front().shared_from_this(); - _pool.pop_front(); - co_await con->close(); - continue; - } + while (_nr_connections > _max_connections) { + if (!_pool.empty()) { + connection_ptr con = _pool.front().shared_from_this(); + _pool.pop_front(); + co_await con->close(); + continue; + } - co_await _wait_con.wait(); - } + co_await _wait_con.wait(); + } } future<> client::set_maximum_connections(unsigned nr) { From b10c8b6aab53e776c8681f760ae72b9cc01a9369 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Thu, 12 Sep 2024 10:47:14 +0300 Subject: [PATCH 18/28] http/client: Coroutinize client::set_maximum_connections Signed-off-by: Pavel Emelyanov --- src/http/client.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/http/client.cc b/src/http/client.cc index 75bfb0166b..644aaeb55a 100644 --- a/src/http/client.cc +++ b/src/http/client.cc @@ -273,11 +273,11 @@ future<> client::set_maximum_connections(unsigned nr) { if (nr > _max_connections) { _max_connections = nr; _wait_con.broadcast(); - return make_ready_future<>(); + co_return; } _max_connections = nr; - return shrink_connections(); + co_await shrink_connections(); } template Fn> From 685f6f20c140cf6df65d2d8cfa4bb7bfdec9c9e8 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Thu, 12 Sep 2024 11:16:31 +0300 Subject: [PATCH 19/28] http/client: Coroutinize client::with_connection Signed-off-by: Pavel Emelyanov --- include/seastar/http/client.hh | 2 +- src/http/client.cc | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/include/seastar/http/client.hh b/include/seastar/http/client.hh index 703b54bb4c..41acf26a8d 100644 --- a/include/seastar/http/client.hh +++ b/include/seastar/http/client.hh @@ -191,7 +191,7 @@ private: future<> shrink_connections(); template Fn> - auto with_connection(Fn&& fn, abort_source*); + futurize_t> with_connection(Fn fn, abort_source*); template requires std::invocable diff --git a/src/http/client.cc b/src/http/client.cc index 644aaeb55a..ef94958216 100644 --- a/src/http/client.cc +++ b/src/http/client.cc @@ -36,6 +36,7 @@ module seastar; #include #include #include +#include #include #include #include @@ -281,12 +282,11 @@ future<> client::set_maximum_connections(unsigned nr) { } template Fn> -auto client::with_connection(Fn&& fn, abort_source* as) { - return get_connection(as).then([this, fn = std::move(fn)] (connection_ptr con) mutable { - return fn(*con).finally([this, con = std::move(con)] () mutable { - return put_connection(std::move(con)); - }); - }); +futurize_t> client::with_connection(Fn fn, abort_source* as) { + connection_ptr con = co_await get_connection(as); + auto f = co_await coroutine::as_future(futurize_invoke(std::move(fn), *con)); + co_await put_connection(std::move(con)); + co_return co_await std::move(f); } template From 200ea10e5d47f2fe34ee6106c0f36b8a782d4914 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Thu, 12 Sep 2024 11:17:23 +0300 Subject: [PATCH 20/28] http/client: Coroutinize client::with_new_connection Signed-off-by: Pavel Emelyanov --- include/seastar/http/client.hh | 2 +- src/http/client.cc | 11 +++++------ 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/include/seastar/http/client.hh b/include/seastar/http/client.hh index 41acf26a8d..7e14747211 100644 --- a/include/seastar/http/client.hh +++ b/include/seastar/http/client.hh @@ -195,7 +195,7 @@ private: template requires std::invocable - auto with_new_connection(Fn&& fn, abort_source*); + futurize_t> with_new_connection(Fn fn, abort_source*); future<> do_make_request(request req, reply_handler handle, abort_source*, std::optional expected); future<> do_make_request(connection& con, request& req, reply_handler& handle, abort_source*, std::optional expected); diff --git a/src/http/client.cc b/src/http/client.cc index ef94958216..a702862592 100644 --- a/src/http/client.cc +++ b/src/http/client.cc @@ -291,12 +291,11 @@ futurize_t> client::with_connection(Fn fn, template requires std::invocable -auto client::with_new_connection(Fn&& fn, abort_source* as) { - return make_connection(as).then([this, fn = std::move(fn)] (connection_ptr con) mutable { - return fn(*con).finally([this, con = std::move(con)] () mutable { - return put_connection(std::move(con)); - }); - }); +futurize_t> client::with_new_connection(Fn fn, abort_source* as) { + connection_ptr con = co_await make_connection(as); + auto f = co_await coroutine::as_future(futurize_invoke(std::move(fn), *con)); + co_await put_connection(std::move(con)); + co_return co_await std::move(f); } future<> client::make_request(request req, reply_handler handle, std::optional expected) { From 4250bca8f169b1d4510ade5d046a5ed4878c361f Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Thu, 12 Sep 2024 11:18:01 +0300 Subject: [PATCH 21/28] http/client: Coroutinize client::close This method closes all its connection by calling itself until the pool is empty. With coroutines it is done with the plain loop. Signed-off-by: Pavel Emelyanov --- src/http/client.cc | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/src/http/client.cc b/src/http/client.cc index a702862592..0c95ec8dbd 100644 --- a/src/http/client.cc +++ b/src/http/client.cc @@ -359,16 +359,12 @@ future<> client::do_make_request(connection& con, request& req, reply_handler& h } future<> client::close() { - if (_pool.empty()) { - return make_ready_future<>(); - } - + while (!_pool.empty()) { connection_ptr con = _pool.front().shared_from_this(); _pool.pop_front(); http_log.trace("closing connection {}", con->_fd.local_address()); - return con->close().then([this, con] { - return close(); - }); + co_await con->close(); + } } } // experimental namespace From 688ac04b643b1be91d8bca7714f4785c3e16a75f Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Thu, 12 Sep 2024 11:18:12 +0300 Subject: [PATCH 22/28] http/client: Restore indentation of client::close Signed-off-by: Pavel Emelyanov --- src/http/client.cc | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/http/client.cc b/src/http/client.cc index 0c95ec8dbd..a3cae43b7a 100644 --- a/src/http/client.cc +++ b/src/http/client.cc @@ -359,12 +359,12 @@ future<> client::do_make_request(connection& con, request& req, reply_handler& h } future<> client::close() { - while (!_pool.empty()) { - connection_ptr con = _pool.front().shared_from_this(); - _pool.pop_front(); - http_log.trace("closing connection {}", con->_fd.local_address()); - co_await con->close(); - } + while (!_pool.empty()) { + connection_ptr con = _pool.front().shared_from_this(); + _pool.pop_front(); + http_log.trace("closing connection {}", con->_fd.local_address()); + co_await con->close(); + } } } // experimental namespace From 3e22a255c50a9564b8da3443e0aed415bf65b054 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Thu, 12 Sep 2024 11:20:59 +0300 Subject: [PATCH 23/28] http/client: Coroutinize client::do_make_request Signed-off-by: Pavel Emelyanov --- src/http/client.cc | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/src/http/client.cc b/src/http/client.cc index a3cae43b7a..044ff328f8 100644 --- a/src/http/client.cc +++ b/src/http/client.cc @@ -336,26 +336,25 @@ future<> client::do_make_request(request req, reply_handler handle, abort_source future<> client::do_make_request(connection& con, request& req, reply_handler& handle, abort_source* as, std::optional expected) { auto sub = as ? as->subscribe([&con] () noexcept { con.shutdown(); }) : std::nullopt; - return con.do_make_request(req).then([&con, &handle, expected] (connection::reply_ptr reply) mutable { + try { + connection::reply_ptr reply = co_await con.do_make_request(req); auto& rep = *reply; if (expected.has_value() && rep._status != expected.value()) { if (!http_log.is_enabled(log_level::debug)) { - return make_exception_future<>(httpd::unexpected_status_error(rep._status)); + throw httpd::unexpected_status_error(reply->_status); } - return do_with(con.in(rep), [reply = std::move(reply)] (auto& in) mutable { - return util::read_entire_stream_contiguous(in).then([reply = std::move(reply)] (auto message) { - http_log.debug("request finished with {}: {}", reply->_status, message); - return make_exception_future<>(httpd::unexpected_status_error(reply->_status)); - }); - }); + auto in = con.in(*reply); + auto message = co_await util::read_entire_stream_contiguous(in); + http_log.debug("request finished with {}: {}", reply->_status, message); + throw httpd::unexpected_status_error(reply->_status); } - return handle(rep, con.in(rep)).finally([reply = std::move(reply)] {}); - }).handle_exception([&con] (auto ex) mutable { + co_await handle(rep, con.in(rep)); + } catch (...) { con._persistent = false; - return make_exception_future<>(std::move(ex)); - }).finally([sub = std::move(sub)] {}); + throw; + } } future<> client::close() { From 02df7b835be52580af3a1bbec5f31bb13c9f99f2 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Thu, 12 Sep 2024 14:57:29 +0300 Subject: [PATCH 24/28] http/client: Sanitize client::do_make_request unexpected body logging When reply contains unexpected status, client may print its body in logs. Before being coroutinized it was two disctinct chains, with coroutines it makes sense to rewrap them a little bit. Signed-off-by: Pavel Emelyanov --- src/http/client.cc | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/src/http/client.cc b/src/http/client.cc index 044ff328f8..e832ffb703 100644 --- a/src/http/client.cc +++ b/src/http/client.cc @@ -340,13 +340,11 @@ future<> client::do_make_request(connection& con, request& req, reply_handler& h connection::reply_ptr reply = co_await con.do_make_request(req); auto& rep = *reply; if (expected.has_value() && rep._status != expected.value()) { - if (!http_log.is_enabled(log_level::debug)) { - throw httpd::unexpected_status_error(reply->_status); + if (http_log.is_enabled(log_level::debug)) { + auto in = con.in(*reply); + auto message = co_await util::read_entire_stream_contiguous(in); + http_log.debug("request finished with {}: {}", reply->_status, message); } - - auto in = con.in(*reply); - auto message = co_await util::read_entire_stream_contiguous(in); - http_log.debug("request finished with {}: {}", reply->_status, message); throw httpd::unexpected_status_error(reply->_status); } From 4dfb09063eb083f507ee8ed1eced0d1e5fd27b35 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Thu, 12 Sep 2024 11:23:25 +0300 Subject: [PATCH 25/28] http/client: Coroutinize client::get_connection It used to call itself to "goto again". With coroutines it won't work as nice, neither it looks good enough with the outer loop :( So use goto explicitly. Signed-off-by: Pavel Emelyanov --- src/http/client.cc | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/http/client.cc b/src/http/client.cc index e832ffb703..fe1735a9f3 100644 --- a/src/http/client.cc +++ b/src/http/client.cc @@ -216,24 +216,24 @@ client::client(std::unique_ptr f, unsigned max_connections, } future client::get_connection(abort_source* as) { +try_again: if (!_pool.empty()) { connection_ptr con = _pool.front().shared_from_this(); _pool.pop_front(); http_log.trace("pop http connection {} from pool", con->_fd.local_address()); - return make_ready_future(con); + co_return con; } if (_nr_connections >= _max_connections) { auto sub = as ? as->subscribe([this] () noexcept { _wait_con.broadcast(); }) : std::nullopt; - return _wait_con.wait().then([this, as, sub = std::move(sub)] { + co_await _wait_con.wait(); if (as != nullptr && as->abort_requested()) { - return make_exception_future(as->abort_requested_exception_ptr()); + std::rethrow_exception(as->abort_requested_exception_ptr()); } - return get_connection(as); - }); + goto try_again; } - return make_connection(as); + co_return co_await make_connection(as); } future client::make_connection(abort_source* as) { From 539fa2a8a1baac77d342ac721b819106bbd3c4b1 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Thu, 12 Sep 2024 11:23:33 +0300 Subject: [PATCH 26/28] http/client: Restore indentation of client::get_connection Signed-off-by: Pavel Emelyanov --- src/http/client.cc | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/http/client.cc b/src/http/client.cc index fe1735a9f3..c72cdd6168 100644 --- a/src/http/client.cc +++ b/src/http/client.cc @@ -227,10 +227,10 @@ future client::get_connection(abort_source* as) { if (_nr_connections >= _max_connections) { auto sub = as ? as->subscribe([this] () noexcept { _wait_con.broadcast(); }) : std::nullopt; co_await _wait_con.wait(); - if (as != nullptr && as->abort_requested()) { - std::rethrow_exception(as->abort_requested_exception_ptr()); - } - goto try_again; + if (as != nullptr && as->abort_requested()) { + std::rethrow_exception(as->abort_requested_exception_ptr()); + } + goto try_again; } co_return co_await make_connection(as); From 3231966aacb9f869b7d2ed0961f7ea785fdbd2fe Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Thu, 12 Sep 2024 11:26:33 +0300 Subject: [PATCH 27/28] http/client: Coroutinize client::do_make_request (con-less overload) It makes some non-trivial decisions on whether or to retry the request in the catch block, so the conversion is a bit hairy. Signed-off-by: Pavel Emelyanov --- src/http/client.cc | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/src/http/client.cc b/src/http/client.cc index c72cdd6168..4f8da7b6dd 100644 --- a/src/http/client.cc +++ b/src/http/client.cc @@ -307,31 +307,31 @@ future<> client::make_request(request req, reply_handler handle, abort_source& a } future<> client::do_make_request(request req, reply_handler handle, abort_source* as, std::optional expected) { - return do_with(std::move(req), std::move(handle), [this, as, expected] (request& req, reply_handler& handle) mutable { - return with_connection([this, &req, &handle, as, expected] (connection& con) { - return do_make_request(con, req, handle, as, expected); - }, as).handle_exception_type([this, &req, &handle, as, expected] (const std::system_error& ex) { + try { + co_return co_await with_connection(coroutine::lambda([this, &req, &handle, as, expected] (connection& con) -> future<> { + co_await do_make_request(con, req, handle, as, expected); + }), as); + } catch (const std::system_error& ex) { if (as && as->abort_requested()) { - return make_exception_future<>(as->abort_requested_exception_ptr()); + std::rethrow_exception(as->abort_requested_exception_ptr()); } if (!_retry) { - return make_exception_future<>(ex); + throw; } auto code = ex.code().value(); if ((code != EPIPE) && (code != ECONNABORTED)) { - return make_exception_future<>(ex); + throw; } + } // The 'con' connection may not yet be freed, so the total connection // count still account for it and with_new_connection() may temporarily // break the limit. That's OK, the 'con' will be closed really soon - return with_new_connection([this, &req, &handle, as, expected] (connection& con) { - return do_make_request(con, req, handle, as, expected); - }, as); - }); - }); + co_await with_new_connection(coroutine::lambda([this, &req, &handle, as, expected] (connection& con) -> future<> { + co_await do_make_request(con, req, handle, as, expected); + }), as); } future<> client::do_make_request(connection& con, request& req, reply_handler& handle, abort_source* as, std::optional expected) { From 67134fd0e6851c3005ae36ec811525ef63c3afb1 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Thu, 12 Sep 2024 11:26:46 +0300 Subject: [PATCH 28/28] http/client: Restore indentation of client::do_make_request overload Signed-off-by: Pavel Emelyanov --- src/http/client.cc | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/src/http/client.cc b/src/http/client.cc index 4f8da7b6dd..296a217448 100644 --- a/src/http/client.cc +++ b/src/http/client.cc @@ -312,26 +312,26 @@ future<> client::do_make_request(request req, reply_handler handle, abort_source co_await do_make_request(con, req, handle, as, expected); }), as); } catch (const std::system_error& ex) { - if (as && as->abort_requested()) { - std::rethrow_exception(as->abort_requested_exception_ptr()); - } + if (as && as->abort_requested()) { + std::rethrow_exception(as->abort_requested_exception_ptr()); + } - if (!_retry) { - throw; - } + if (!_retry) { + throw; + } - auto code = ex.code().value(); - if ((code != EPIPE) && (code != ECONNABORTED)) { - throw; - } + auto code = ex.code().value(); + if ((code != EPIPE) && (code != ECONNABORTED)) { + throw; + } } - // The 'con' connection may not yet be freed, so the total connection - // count still account for it and with_new_connection() may temporarily - // break the limit. That's OK, the 'con' will be closed really soon - co_await with_new_connection(coroutine::lambda([this, &req, &handle, as, expected] (connection& con) -> future<> { - co_await do_make_request(con, req, handle, as, expected); - }), as); + // The 'con' connection may not yet be freed, so the total connection + // count still account for it and with_new_connection() may temporarily + // break the limit. That's OK, the 'con' will be closed really soon + co_await with_new_connection(coroutine::lambda([this, &req, &handle, as, expected] (connection& con) -> future<> { + co_await do_make_request(con, req, handle, as, expected); + }), as); } future<> client::do_make_request(connection& con, request& req, reply_handler& handle, abort_source* as, std::optional expected) {