Skip to content

Commit

Permalink
Support request stream (drogonframework#2055)
Browse files Browse the repository at this point in the history
  • Loading branch information
hwc0919 committed Jul 3, 2024
1 parent dfacd1b commit 5d4523a
Show file tree
Hide file tree
Showing 31 changed files with 1,967 additions and 183 deletions.
6 changes: 5 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ set(DROGON_SOURCES
lib/src/HttpFileUploadRequest.cc
lib/src/HttpRequestImpl.cc
lib/src/HttpRequestParser.cc
lib/src/RequestStream.cc
lib/src/HttpResponseImpl.cc
lib/src/HttpResponseParser.cc
lib/src/HttpServer.cc
Expand All @@ -278,6 +279,7 @@ set(DROGON_SOURCES
lib/src/ListenerManager.cc
lib/src/LocalHostFilter.cc
lib/src/MultiPart.cc
lib/src/MultipartStreamParser.cc
lib/src/NotFound.cc
lib/src/PluginsManager.cc
lib/src/PromExporter.cc
Expand Down Expand Up @@ -332,7 +334,8 @@ set(private_headers
lib/src/ConfigAdapterManager.h
lib/src/JsonConfigAdapter.h
lib/src/YamlConfigAdapter.h
lib/src/ConfigAdapter.h)
lib/src/ConfigAdapter.h
lib/src/MultipartStreamParser.h)

if (NOT WIN32)
set(DROGON_SOURCES
Expand Down Expand Up @@ -559,6 +562,7 @@ set(DROGON_HEADERS
lib/inc/drogon/HttpFilter.h
lib/inc/drogon/HttpMiddleware.h
lib/inc/drogon/HttpRequest.h
lib/inc/drogon/RequestStream.h
lib/inc/drogon/HttpResponse.h
lib/inc/drogon/HttpSimpleController.h
lib/inc/drogon/HttpTypes.h
Expand Down
7 changes: 5 additions & 2 deletions config.example.json
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@
"session_timeout": 0,
//string value of SameSite attribute of the Set-Cookie HTTP response header
//valid value is either 'Null' (default), 'Lax', 'Strict' or 'None'
"session_same_site" : "Null",
"session_same_site": "Null",
//session_cookie_key: The cookie key of the session, "JSESSIONID" by default
"session_cookie_key": "JSESSIONID",
//session_max_age: The max age of the session cookie, -1 by default
Expand Down Expand Up @@ -310,7 +310,10 @@
// Currently only gzip and br are supported. Note: max_memory_body_size and max_body_size applies twice for compressed requests.
// Once when receiving and once when decompressing. i.e. if the decompressed body is larger than max_body_size, the request
// will be rejected.
"enabled_compressed_request": false
"enabled_compressed_request": false,
// enable_request_stream: Defaults to false. If true the server will enable stream mode for http requests.
// See the wiki for more details.
"enable_request_stream": false,
},
//plugins: Define all plugins running in the application
"plugins": [
Expand Down
3 changes: 3 additions & 0 deletions config.example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,9 @@ app:
# Once when receiving and once when decompressing. i.e. if the decompressed body is larger than max_body_size, the request
# will be rejected.
enabled_compressed_request: false
# enable_request_stream: Defaults to false. If true the server will enable stream mode for http requests.
# See the wiki for more details.
enable_request_stream: false
# plugins: Define all plugins running in the application
plugins:
# name: The class name of the plugin
Expand Down
5 changes: 4 additions & 1 deletion drogon_ctl/templates/config_json.csp
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,10 @@
// Currently only gzip and br are supported. Note: max_memory_body_size and max_body_size applies twice for compressed requests.
// Once when receiving and once when decompressing. i.e. if the decompressed body is larger than max_body_size, the request
// will be rejected.
"enabled_compressed_request": false
"enabled_compressed_request": false,
// enable_request_stream: Defaults to false. If true the server will enable stream mode for http requests.
// See the wiki for more details.
"enable_request_stream": false,
},
//plugins: Define all plugins running in the application
"plugins": [
Expand Down
3 changes: 3 additions & 0 deletions drogon_ctl/templates/config_yaml.csp
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,9 @@ app:
# Once when receiving and once when decompressing. i.e. if the decompressed body is larger than max_body_size, the request
# will be rejected.
enabled_compressed_request: false
# enable_request_stream: Defaults to false. If true the server will enable stream mode for http requests.
# See the wiki for more details.
enable_request_stream: false
# plugins: Define all plugins running in the application
plugins:
# name: The class name of the plugin
Expand Down
3 changes: 2 additions & 1 deletion examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ add_executable(redis_simple redis/main.cc
add_executable(redis_chat redis_chat/main.cc
redis_chat/controllers/Chat.cc)

add_executable(async_stream async_stream/main.cc)
add_executable(async_stream async_stream/main.cc
async_stream/RequestStreamExampleCtrl.cc)

set(example_targets
benchmark
Expand Down
167 changes: 167 additions & 0 deletions examples/async_stream/RequestStreamExampleCtrl.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
#include <drogon/drogon.h>
#include <drogon/HttpController.h>
#include <drogon/HttpRequest.h>
#include <fstream>

using namespace drogon;

class StreamEchoReader : public RequestStreamReader
{
public:
StreamEchoReader(ResponseStreamPtr respStream)
: respStream_(std::move(respStream))
{
}

void onStreamData(const char *data, size_t length) override
{
LOG_INFO << "onStreamData[" << length << "]";
respStream_->send({data, length});
}

void onStreamFinish(std::exception_ptr ptr) override
{
if (ptr)
{
try
{
std::rethrow_exception(ptr);
}
catch (const std::exception &e)
{
LOG_ERROR << "onStreamError: " << e.what();
}
}
else
{
LOG_INFO << "onStreamFinish";
}
respStream_->close();
}

private:
ResponseStreamPtr respStream_;
};

class RequestStreamExampleCtrl : public HttpController<RequestStreamExampleCtrl>
{
public:
METHOD_LIST_BEGIN
ADD_METHOD_TO(RequestStreamExampleCtrl::stream_echo, "/stream_echo", Post);
ADD_METHOD_TO(RequestStreamExampleCtrl::stream_upload,
"/stream_upload",
Post);
METHOD_LIST_END

void stream_echo(
const HttpRequestPtr &,
RequestStreamPtr &&stream,
std::function<void(const HttpResponsePtr &)> &&callback) const
{
auto resp = drogon::HttpResponse::newAsyncStreamResponse(
[stream](ResponseStreamPtr respStream) {
stream->setStreamReader(
std::make_shared<StreamEchoReader>(std::move(respStream)));
});
callback(resp);
}

void stream_upload(
const HttpRequestPtr &req,
RequestStreamPtr &&stream,
std::function<void(const HttpResponsePtr &)> &&callback) const
{
struct Entry
{
MultipartHeader header;
std::string tmpName;
std::ofstream file;
};

auto files = std::make_shared<std::vector<Entry>>();
auto reader = RequestStreamReader::newMultipartReader(
req,
[files](MultipartHeader &&header) {
LOG_INFO << "Multipart name: " << header.name
<< ", filename:" << header.filename
<< ", contentType:" << header.contentType;

files->push_back({std::move(header)});
auto tmpName = drogon::utils::genRandomString(40);
if (!files->back().header.filename.empty())
{
files->back().tmpName = tmpName;
files->back().file.open("uploads/" + tmpName,
std::ios::trunc);
}
},
[files](const char *data, size_t length) {
if (files->back().tmpName.empty())
{
return;
}
auto &currentFile = files->back().file;
if (length == 0)
{
LOG_INFO << "file finish";
if (currentFile.is_open())
{
currentFile.flush();
currentFile.close();
}
return;
}
LOG_INFO << "data[" << length << "]: ";
if (currentFile.is_open())
{
LOG_INFO << "write file";
currentFile.write(data, length);
}
else
{
LOG_ERROR << "file not open";
}
},
[files, callback = std::move(callback)](std::exception_ptr ex) {
if (ex)
{
try
{
std::rethrow_exception(std::move(ex));
}
catch (const StreamError &e)
{
LOG_ERROR << "stream error: " << e.what();
}
catch (const std::exception &e)
{
LOG_ERROR << "multipart error: " << e.what();
}
auto resp = HttpResponse::newHttpResponse();
resp->setStatusCode(k400BadRequest);
resp->setBody("error\n");
callback(resp);
}
else
{
LOG_INFO << "stream finish, received " << files->size()
<< " files";
Json::Value respJson;
for (const auto &item : *files)
{
if (item.tmpName.empty())
continue;
Json::Value entry;
entry["name"] = item.header.name;
entry["filename"] = item.header.filename;
entry["tmpName"] = item.tmpName;
respJson.append(entry);
}
auto resp = HttpResponse::newHttpJsonResponse(respJson);
callback(resp);
}
});

stream->setStreamReader(std::move(reader));
}
};
52 changes: 51 additions & 1 deletion examples/async_stream/main.cc
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#include <drogon/drogon.h>
#include <chrono>
#include <memory>

using namespace drogon;
using namespace std::chrono_literals;

Expand Down Expand Up @@ -28,6 +28,56 @@ int main()
callback(resp);
});

// Example: register a stream-mode function handler
app().registerHandler(
"/stream_req",
[](const HttpRequestPtr &req,
RequestStreamPtr &&stream,
std::function<void(const HttpResponsePtr &)> &&callback) {
if (!stream)
{
LOG_INFO << "stream mode is not enabled";
auto resp = HttpResponse::newHttpResponse();
resp->setStatusCode(k400BadRequest);
resp->setBody("no stream");
callback(resp);
return;
}

auto reader = RequestStreamReader::newReader(
[](const char *data, size_t length) {
LOG_INFO << "piece[" << length
<< "]: " << std::string_view{data, length};
},
[callback = std::move(callback)](std::exception_ptr ex) {
auto resp = HttpResponse::newHttpResponse();
if (ex)
{
try
{
std::rethrow_exception(std::move(ex));
}
catch (const std::exception &e)
{
LOG_ERROR << "stream error: " << e.what();
}
resp->setStatusCode(k400BadRequest);
resp->setBody("error\n");
callback(resp);
}
else
{
LOG_INFO << "stream finish";
resp->setBody("success\n");
callback(resp);
}
});

stream->setStreamReader(std::move(reader));
},
{Post});

LOG_INFO << "Server running on 127.0.0.1:8848";
app().enableRequestStream(); // This is for request stream.
app().addListener("127.0.0.1", 8848).run();
}
3 changes: 3 additions & 0 deletions lib/inc/drogon/HttpAppFramework.h
Original file line number Diff line number Diff line change
Expand Up @@ -1606,6 +1606,9 @@ class DROGON_EXPORT HttpAppFramework : public trantor::NonCopyable
virtual HttpAppFramework &setAfterAcceptSockOptCallback(
std::function<void(int)> cb) = 0;

virtual HttpAppFramework &enableRequestStream(bool enable = true) = 0;
virtual bool isRequestStreamEnabled() const = 0;

private:
virtual void registerHttpController(
const std::string &pathPattern,
Expand Down
20 changes: 19 additions & 1 deletion lib/inc/drogon/HttpBinder.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ class HttpBinderBase
std::function<void(const HttpResponsePtr &)> &&callback) = 0;
virtual size_t paramCount() = 0;
virtual const std::string &handlerName() const = 0;
virtual bool isStreamHandler() = 0;

virtual ~HttpBinderBase()
{
Expand Down Expand Up @@ -218,6 +219,11 @@ class HttpBinder : public HttpBinderBase
return traits::arity;
}

bool isStreamHandler() override
{
return traits::isStreamHandler;
}

HttpBinder(FUNCTION &&func) : func_(std::forward<FUNCTION>(func))
{
static_assert(traits::isHTTPFunction,
Expand Down Expand Up @@ -266,6 +272,7 @@ class HttpBinder : public HttpBinderBase

template <typename... Values,
std::size_t Boundary = argument_count,
bool isStreamHandler = traits::isStreamHandler,
bool isCoroutine = traits::isCoroutine>
void run(std::deque<std::string> &pathArguments,
const HttpRequestPtr &req,
Expand Down Expand Up @@ -344,7 +351,17 @@ class HttpBinder : public HttpBinderBase
{
// Explicit copy because `callFunction` moves it
auto cb = callback;
callFunction(req, cb, std::move(values)...);
if constexpr (isStreamHandler)
{
callFunction(req,
createRequestStream(req),
cb,
std::move(values)...);
}
else
{
callFunction(req, cb, std::move(values)...);
}
}
catch (const std::exception &except)
{
Expand All @@ -359,6 +376,7 @@ class HttpBinder : public HttpBinderBase
#ifdef __cpp_impl_coroutine
else
{
static_assert(!isStreamHandler);
[this](HttpRequestPtr req,
std::function<void(const HttpResponsePtr &)> callback,
Values &&...values) -> AsyncTask {
Expand Down
Loading

0 comments on commit 5d4523a

Please sign in to comment.