Fix file chunking with downloads. Also fixed a bunch of std::shared_ptr misuses, and made Application::send_file use Request's file chunking. Also the framework will properly handle client disconnects now.

This commit is contained in:
Relintai 2021-02-09 01:25:24 +01:00
parent 9a3404a7ac
commit 67e740e35b
6 changed files with 132 additions and 42 deletions

View File

@ -91,34 +91,42 @@ void Application::send_error(int error_code, Request *request) {
void Application::send_file(const std::string &path, Request *request) { void Application::send_file(const std::string &path, Request *request) {
std::string fp = FileCache::get_singleton()->wwwroot + path; std::string fp = FileCache::get_singleton()->wwwroot + path;
FILE *f = fopen(fp.c_str(), "rb"); request->send_file(fp);
if (!f) {
printf("Error: Registered file doesn't exists anymore! %s\n", path.c_str());
send_error(404, request);
return;
}
fseek(f, 0, SEEK_END);
long fsize = ftell(f);
fseek(f, 0, SEEK_SET); /* same as rewind(f); */
std::string body;
body.resize(fsize);
fread(&body[0], 1, fsize, f);
fclose(f);
//TODO set mimetype?
request->response->setBody(body);
request->send();
} }
void Application::migrate() { void Application::migrate() {
} }
void Application::register_request_update(Request *request) {
std::lock_guard<std::mutex> lock(_update_registered_requests_mutex);
_update_registered_requests.push_back(request);
}
void Application::unregister_request_update(Request *request) {
std::lock_guard<std::mutex> lock(_update_registered_requests_mutex);
std::size_t s = _update_registered_requests.size();
for (std::size_t i = 0; i < s; ++i) {
Request *r = _update_registered_requests[i];
if (r == request) {
_update_registered_requests[i] = _update_registered_requests[s - 1];
_update_registered_requests.pop_back();
return;
}
}
}
void Application::update() {
for (std::size_t i = 0; i < _update_registered_requests.size(); ++i) {
Request *r = _update_registered_requests[i];
r->update();
}
}
Application::Application() { Application::Application() {
_instance = this; _instance = this;
} }
@ -144,3 +152,6 @@ Application *Application::_instance = nullptr;
std::string Application::default_error_404_body = "<html><body>404 :(</body></html>"; std::string Application::default_error_404_body = "<html><body>404 :(</body></html>";
std::string Application::default_generic_error_body = "<html><body>Internal server error! :(</body></html>"; std::string Application::default_generic_error_body = "<html><body>Internal server error! :(</body></html>";
std::mutex Application::_update_registered_requests_mutex;
std::vector<Request *> Application::_update_registered_requests;

View File

@ -7,6 +7,8 @@
#include <string> #include <string>
#include <vector> #include <vector>
#include "mutex"
#include "handler_instance.h" #include "handler_instance.h"
class Request; class Request;
@ -31,6 +33,10 @@ public:
virtual void migrate(); virtual void migrate();
static void register_request_update(Request *request);
static void unregister_request_update(Request *request);
static void update();
Application(); Application();
virtual ~Application(); virtual ~Application();
@ -44,6 +50,10 @@ public:
static std::map<int, std::function<void(int, Request *)> > error_handler_map; static std::map<int, std::function<void(int, Request *)> > error_handler_map;
static std::function<void(int, Request *)> default_error_handler_func; static std::function<void(int, Request *)> default_error_handler_func;
protected:
static std::mutex _update_registered_requests_mutex;
static std::vector<Request *> _update_registered_requests;
private: private:
static Application *_instance; static Application *_instance;
}; };

View File

@ -12,8 +12,12 @@ void HTTPServer::http_callback_handler(Request *request) {
void HTTPServer::httpEnterCallbackDefault(const HTTPParser &httpParser, const HttpSession::Ptr &session) { void HTTPServer::httpEnterCallbackDefault(const HTTPParser &httpParser, const HttpSession::Ptr &session) {
Request *request = RequestPool::get_request(); Request *request = RequestPool::get_request();
request->http_parser = &httpParser; HttpSession *s = session.get();
request->session = &session;
_request_map[s] = request;
request->http_parser = std::make_shared<HTTPParser>(httpParser);
request->session = session;
request->setup_url_stack(); request->setup_url_stack();
@ -36,6 +40,22 @@ void HTTPServer::wsEnterCallbackDefault(const HttpSession::Ptr &httpSession, Web
//httpSession->send(frame); //httpSession->send(frame);
} }
void HTTPServer::closedCallbackDefault(const HttpSession::Ptr &session) {
HttpSession *s = session.get();
Request *r = _request_map[s];
if (r == nullptr) {
printf("Error HTTPServer::closedCallbackDefault: r == nullptr!");
_request_map.erase(s);
return;
}
r->connection_closed = true;
_request_map.erase(s);
}
void HTTPServer::configure() { void HTTPServer::configure() {
} }
@ -71,6 +91,7 @@ void HTTPServer::initialize() {
listenBuilder->configureEnterCallback([](const HttpSession::Ptr &httpSession, HttpSessionHandlers &handlers) { listenBuilder->configureEnterCallback([](const HttpSession::Ptr &httpSession, HttpSessionHandlers &handlers) {
handlers.setHttpCallback(HTTPServer::httpEnterCallbackDefault); handlers.setHttpCallback(HTTPServer::httpEnterCallbackDefault);
handlers.setClosedCallback(HTTPServer::closedCallbackDefault);
handlers.setWSCallback(HTTPServer::wsEnterCallbackDefault); handlers.setWSCallback(HTTPServer::wsEnterCallbackDefault);
}); });
@ -79,11 +100,14 @@ void HTTPServer::initialize() {
void HTTPServer::main_loop() { void HTTPServer::main_loop() {
while (true) { while (true) {
std::this_thread::sleep_for(std::chrono::seconds(1)); //std::this_thread::sleep_for(std::chrono::seconds(1));
std::this_thread::sleep_for(std::chrono::milliseconds(10));
if (brynet::base::app_kbhit()) { if (brynet::base::app_kbhit()) {
break; break;
} }
Application::get_instance()->update();
} }
} }
@ -96,3 +120,5 @@ HTTPServer::HTTPServer() {
HTTPServer::~HTTPServer() { HTTPServer::~HTTPServer() {
delete listenBuilder; delete listenBuilder;
} }
std::map<HttpSession*, Request*> HTTPServer::_request_map;

View File

@ -4,6 +4,7 @@
#include <condition_variable> #include <condition_variable>
#include <iostream> #include <iostream>
#include <string> #include <string>
#include <map>
#include <brynet/base/AppStatus.hpp> #include <brynet/base/AppStatus.hpp>
#include <brynet/net/http/HttpFormat.hpp> #include <brynet/net/http/HttpFormat.hpp>
@ -29,6 +30,7 @@ public:
static void httpEnterCallbackDefault(const HTTPParser &httpParser, const HttpSession::Ptr &session); static void httpEnterCallbackDefault(const HTTPParser &httpParser, const HttpSession::Ptr &session);
static void wsEnterCallbackDefault(const HttpSession::Ptr &httpSession, WebSocketFormat::WebSocketFrameType opcode, const std::string &payload); static void wsEnterCallbackDefault(const HttpSession::Ptr &httpSession, WebSocketFormat::WebSocketFrameType opcode, const std::string &payload);
static void closedCallbackDefault(const HttpSession::Ptr &session);
virtual void configure(); virtual void configure();
virtual void initialize(); virtual void initialize();
@ -37,6 +39,9 @@ public:
HTTPServer(); HTTPServer();
virtual ~HTTPServer(); virtual ~HTTPServer();
protected:
static std::map<HttpSession*, Request*> _request_map;
}; };
#endif #endif

View File

@ -1,5 +1,7 @@
#include "request.h" #include "request.h"
#include "application.h"
void Request::compile_body() { void Request::compile_body() {
compiled_body.reserve(body.size() + head.size() + 13 + 14 + 15); compiled_body.reserve(body.size() + head.size() + 13 + 14 + 15);
@ -40,26 +42,36 @@ void Request::next_stage() {
} }
void Request::send() { void Request::send() {
if (connection_closed) {
RequestPool::return_request(this);
return;
}
if (http_parser->isKeepAlive()) { if (http_parser->isKeepAlive()) {
response->addHeadValue("Connection", "Keep-Alive"); response->addHeadValue("Connection", "Keep-Alive");
std::string result = response->getResult(); std::string result = response->getResult();
(*session)->send(result.c_str(), result.size()); session->send(result.c_str(), result.size());
} else { } else {
response->addHeadValue("Connection", "Close"); response->addHeadValue("Connection", "Close");
std::string result = response->getResult(); std::string result = response->getResult();
const HttpSession::Ptr lsession = (*session); HttpSession::Ptr lsession = session;
(*session)->send(result.c_str(), result.size(), [lsession]() { lsession->postShutdown(); }); session->send(result.c_str(), result.size(), [lsession]() { lsession->postShutdown(); });
} }
RequestPool::return_request(this); RequestPool::return_request(this);
} }
void Request::send_file(const std::string &p_file_path) { void Request::send_file(const std::string &p_file_path) {
if (connection_closed) {
RequestPool::return_request(this);
return;
}
file_path = p_file_path; file_path = p_file_path;
FILE *f = fopen(file_path.c_str(), "rb"); FILE *f = fopen(file_path.c_str(), "rb");
@ -76,9 +88,10 @@ void Request::send_file(const std::string &p_file_path) {
response->addHeadValue("Connection", "Close"); response->addHeadValue("Connection", "Close");
std::string result = "HTTP/1.1 200 OK\r\nConnection: Close\r\n\r\n"; std::string result = "HTTP/1.1 200 OK\r\nConnection: Close\r\n\r\n";
const HttpSession::Ptr lsession = (*session);
(*session)->send(result.c_str(), result.size(), [this]() { this->_progress_send_file(); }); Application::register_request_update(this);
session->send(result.c_str(), result.size(), [this]() { this->_file_chunk_sent(); });
} }
void Request::reset() { void Request::reset() {
@ -90,6 +103,7 @@ void Request::reset() {
_path_stack_pointer = 0; _path_stack_pointer = 0;
file_size = 0; file_size = 0;
current_file_progress = 0; current_file_progress = 0;
connection_closed = false;
head.clear(); head.clear();
body.clear(); body.clear();
@ -173,10 +187,20 @@ void Request::push_path() {
_path_stack_pointer += 1; _path_stack_pointer += 1;
} }
void Request::update() {
if (file_next) {
file_next = false;
_progress_send_file();
}
}
Request::Request() { Request::Request() {
response = nullptr; response = nullptr;
//file_chunk_size = 1 << 20; //1MB
file_chunk_size = 1 << 23; //This value will need benchmarks, 2 MB seems to be just as fast for me as 4 MB, but 1MB is slower
//It is a tradeoff on server memory though, as every active download will consume this amount of memory
//where the file is bigger than this number
file_chunk_size = 1 << 21; //2MB
reset(); reset();
} }
@ -186,10 +210,13 @@ Request::~Request() {
} }
void Request::_progress_send_file() { void Request::_progress_send_file() {
const HttpSession::Ptr lsession = (*session); if (connection_closed) {
RequestPool::return_request(this);
return;
}
if (current_file_progress >= file_size) { if (current_file_progress >= file_size) {
lsession->postShutdown(); session->postShutdown();
RequestPool::return_request(this); RequestPool::return_request(this);
@ -201,7 +228,9 @@ void Request::_progress_send_file() {
if (!f) { if (!f) {
printf("Error: Download: In progress file doesn't exists anymore! %s\n", file_path.c_str()); printf("Error: Download: In progress file doesn't exists anymore! %s\n", file_path.c_str());
lsession->postShutdown(); Application::unregister_request_update(this);
session->postShutdown();
RequestPool::return_request(this); RequestPool::return_request(this);
@ -224,7 +253,11 @@ void Request::_progress_send_file() {
current_file_progress = nfp; current_file_progress = nfp;
(*session)->send(body.c_str(), body.size(), [this]() { this->_progress_send_file(); }); session->send(body.c_str(), body.size(), [this]() { this->_file_chunk_sent(); });
}
void Request::_file_chunk_sent() {
file_next = true;
} }
Request *RequestPool::get_request() { Request *RequestPool::get_request() {
@ -245,12 +278,12 @@ Request *RequestPool::get_request() {
_mutex.unlock(); _mutex.unlock();
request->reset();
return request; return request;
} }
void RequestPool::return_request(Request *request) { void RequestPool::return_request(Request *request) {
request->reset();
_mutex.lock(); _mutex.lock();
_requests.push_back(request); _requests.push_back(request);
_mutex.unlock(); _mutex.unlock();

View File

@ -15,8 +15,8 @@ using namespace brynet::net::http;
class Request { class Request {
public: public:
const HTTPParser *http_parser; HTTPParser::Ptr http_parser;
const HttpSession::Ptr *session; HttpSession::Ptr session;
HttpResponse *response; HttpResponse *response;
uint32_t current_middleware_index; uint32_t current_middleware_index;
@ -34,6 +34,8 @@ public:
long file_chunk_size; long file_chunk_size;
bool file_next; bool file_next;
bool connection_closed;
void compile_body(); void compile_body();
void compile_and_send_body(); void compile_and_send_body();
void next_stage(); void next_stage();
@ -52,11 +54,14 @@ public:
void pop_path(); void pop_path();
void push_path(); void push_path();
void update();
Request(); Request();
~Request(); ~Request();
protected: protected:
void _progress_send_file(); void _progress_send_file();
void _file_chunk_sent();
std::vector<std::string> _path_stack; std::vector<std::string> _path_stack;
uint32_t _path_stack_pointer; uint32_t _path_stack_pointer;