diff --git a/core/application.cpp b/core/application.cpp index 2a7602b..e23f9dd 100644 --- a/core/application.cpp +++ b/core/application.cpp @@ -91,34 +91,42 @@ void Application::send_error(int error_code, Request *request) { void Application::send_file(const std::string &path, Request *request) { std::string fp = FileCache::get_singleton()->wwwroot + path; - FILE *f = fopen(fp.c_str(), "rb"); - - if (!f) { - printf("Error: Registered file doesn't exists anymore! %s\n", path.c_str()); - - send_error(404, request); - return; - } - - fseek(f, 0, SEEK_END); - long fsize = ftell(f); - fseek(f, 0, SEEK_SET); /* same as rewind(f); */ - - std::string body; - body.resize(fsize); - - fread(&body[0], 1, fsize, f); - fclose(f); - - //TODO set mimetype? - - request->response->setBody(body); - request->send(); + request->send_file(fp); } void Application::migrate() { } +void Application::register_request_update(Request *request) { + std::lock_guard lock(_update_registered_requests_mutex); + + _update_registered_requests.push_back(request); +} +void Application::unregister_request_update(Request *request) { + std::lock_guard lock(_update_registered_requests_mutex); + + std::size_t s = _update_registered_requests.size(); + for (std::size_t i = 0; i < s; ++i) { + Request *r = _update_registered_requests[i]; + + if (r == request) { + _update_registered_requests[i] = _update_registered_requests[s - 1]; + + _update_registered_requests.pop_back(); + + return; + } + } +} + +void Application::update() { + for (std::size_t i = 0; i < _update_registered_requests.size(); ++i) { + Request *r = _update_registered_requests[i]; + + r->update(); + } +} + Application::Application() { _instance = this; } @@ -144,3 +152,6 @@ Application *Application::_instance = nullptr; std::string Application::default_error_404_body = "404 :("; std::string Application::default_generic_error_body = "Internal server error! :("; + +std::mutex Application::_update_registered_requests_mutex; +std::vector Application::_update_registered_requests; diff --git a/core/application.h b/core/application.h index 9374eaf..578acc0 100644 --- a/core/application.h +++ b/core/application.h @@ -7,6 +7,8 @@ #include #include +#include "mutex" + #include "handler_instance.h" class Request; @@ -31,6 +33,10 @@ public: virtual void migrate(); + static void register_request_update(Request *request); + static void unregister_request_update(Request *request); + static void update(); + Application(); virtual ~Application(); @@ -44,6 +50,10 @@ public: static std::map > error_handler_map; static std::function default_error_handler_func; +protected: + static std::mutex _update_registered_requests_mutex; + static std::vector _update_registered_requests; + private: static Application *_instance; }; diff --git a/core/http_server.cpp b/core/http_server.cpp index 3b09d20..34eb796 100644 --- a/core/http_server.cpp +++ b/core/http_server.cpp @@ -12,8 +12,12 @@ void HTTPServer::http_callback_handler(Request *request) { void HTTPServer::httpEnterCallbackDefault(const HTTPParser &httpParser, const HttpSession::Ptr &session) { Request *request = RequestPool::get_request(); - request->http_parser = &httpParser; - request->session = &session; + HttpSession *s = session.get(); + + _request_map[s] = request; + + request->http_parser = std::make_shared(httpParser); + request->session = session; request->setup_url_stack(); @@ -36,6 +40,22 @@ void HTTPServer::wsEnterCallbackDefault(const HttpSession::Ptr &httpSession, Web //httpSession->send(frame); } +void HTTPServer::closedCallbackDefault(const HttpSession::Ptr &session) { + HttpSession *s = session.get(); + + Request *r = _request_map[s]; + + if (r == nullptr) { + printf("Error HTTPServer::closedCallbackDefault: r == nullptr!"); + _request_map.erase(s); + return; + } + + r->connection_closed = true; + + _request_map.erase(s); +} + void HTTPServer::configure() { } @@ -71,6 +91,7 @@ void HTTPServer::initialize() { listenBuilder->configureEnterCallback([](const HttpSession::Ptr &httpSession, HttpSessionHandlers &handlers) { handlers.setHttpCallback(HTTPServer::httpEnterCallbackDefault); + handlers.setClosedCallback(HTTPServer::closedCallbackDefault); handlers.setWSCallback(HTTPServer::wsEnterCallbackDefault); }); @@ -79,11 +100,14 @@ void HTTPServer::initialize() { void HTTPServer::main_loop() { while (true) { - std::this_thread::sleep_for(std::chrono::seconds(1)); + //std::this_thread::sleep_for(std::chrono::seconds(1)); + std::this_thread::sleep_for(std::chrono::milliseconds(10)); if (brynet::base::app_kbhit()) { break; } + + Application::get_instance()->update(); } } @@ -95,4 +119,6 @@ HTTPServer::HTTPServer() { HTTPServer::~HTTPServer() { delete listenBuilder; -} \ No newline at end of file +} + +std::map HTTPServer::_request_map; \ No newline at end of file diff --git a/core/http_server.h b/core/http_server.h index ec67d63..ceb6e3e 100644 --- a/core/http_server.h +++ b/core/http_server.h @@ -4,6 +4,7 @@ #include #include #include +#include #include #include @@ -29,6 +30,7 @@ public: static void httpEnterCallbackDefault(const HTTPParser &httpParser, const HttpSession::Ptr &session); static void wsEnterCallbackDefault(const HttpSession::Ptr &httpSession, WebSocketFormat::WebSocketFrameType opcode, const std::string &payload); + static void closedCallbackDefault(const HttpSession::Ptr &session); virtual void configure(); virtual void initialize(); @@ -37,6 +39,9 @@ public: HTTPServer(); virtual ~HTTPServer(); + +protected: + static std::map _request_map; }; #endif \ No newline at end of file diff --git a/core/request.cpp b/core/request.cpp index 3f40592..fb2243d 100644 --- a/core/request.cpp +++ b/core/request.cpp @@ -1,5 +1,7 @@ #include "request.h" +#include "application.h" + void Request::compile_body() { compiled_body.reserve(body.size() + head.size() + 13 + 14 + 15); @@ -40,26 +42,36 @@ void Request::next_stage() { } void Request::send() { + if (connection_closed) { + RequestPool::return_request(this); + return; + } + if (http_parser->isKeepAlive()) { response->addHeadValue("Connection", "Keep-Alive"); std::string result = response->getResult(); - (*session)->send(result.c_str(), result.size()); + session->send(result.c_str(), result.size()); } else { response->addHeadValue("Connection", "Close"); std::string result = response->getResult(); - const HttpSession::Ptr lsession = (*session); + HttpSession::Ptr lsession = session; - (*session)->send(result.c_str(), result.size(), [lsession]() { lsession->postShutdown(); }); + session->send(result.c_str(), result.size(), [lsession]() { lsession->postShutdown(); }); } RequestPool::return_request(this); } void Request::send_file(const std::string &p_file_path) { + if (connection_closed) { + RequestPool::return_request(this); + return; + } + file_path = p_file_path; FILE *f = fopen(file_path.c_str(), "rb"); @@ -76,9 +88,10 @@ void Request::send_file(const std::string &p_file_path) { response->addHeadValue("Connection", "Close"); std::string result = "HTTP/1.1 200 OK\r\nConnection: Close\r\n\r\n"; - const HttpSession::Ptr lsession = (*session); - (*session)->send(result.c_str(), result.size(), [this]() { this->_progress_send_file(); }); + Application::register_request_update(this); + + session->send(result.c_str(), result.size(), [this]() { this->_file_chunk_sent(); }); } void Request::reset() { @@ -90,6 +103,7 @@ void Request::reset() { _path_stack_pointer = 0; file_size = 0; current_file_progress = 0; + connection_closed = false; head.clear(); body.clear(); @@ -173,10 +187,20 @@ void Request::push_path() { _path_stack_pointer += 1; } +void Request::update() { + if (file_next) { + file_next = false; + _progress_send_file(); + } +} + Request::Request() { response = nullptr; - //file_chunk_size = 1 << 20; //1MB - file_chunk_size = 1 << 23; + + //This value will need benchmarks, 2 MB seems to be just as fast for me as 4 MB, but 1MB is slower + //It is a tradeoff on server memory though, as every active download will consume this amount of memory + //where the file is bigger than this number + file_chunk_size = 1 << 21; //2MB reset(); } @@ -186,10 +210,13 @@ Request::~Request() { } void Request::_progress_send_file() { - const HttpSession::Ptr lsession = (*session); + if (connection_closed) { + RequestPool::return_request(this); + return; + } if (current_file_progress >= file_size) { - lsession->postShutdown(); + session->postShutdown(); RequestPool::return_request(this); @@ -201,7 +228,9 @@ void Request::_progress_send_file() { if (!f) { printf("Error: Download: In progress file doesn't exists anymore! %s\n", file_path.c_str()); - lsession->postShutdown(); + Application::unregister_request_update(this); + + session->postShutdown(); RequestPool::return_request(this); @@ -224,7 +253,11 @@ void Request::_progress_send_file() { current_file_progress = nfp; - (*session)->send(body.c_str(), body.size(), [this]() { this->_progress_send_file(); }); + session->send(body.c_str(), body.size(), [this]() { this->_file_chunk_sent(); }); +} + +void Request::_file_chunk_sent() { + file_next = true; } Request *RequestPool::get_request() { @@ -245,12 +278,12 @@ Request *RequestPool::get_request() { _mutex.unlock(); + request->reset(); + return request; } void RequestPool::return_request(Request *request) { - request->reset(); - _mutex.lock(); _requests.push_back(request); _mutex.unlock(); diff --git a/core/request.h b/core/request.h index e6d0352..de89746 100644 --- a/core/request.h +++ b/core/request.h @@ -15,8 +15,8 @@ using namespace brynet::net::http; class Request { public: - const HTTPParser *http_parser; - const HttpSession::Ptr *session; + HTTPParser::Ptr http_parser; + HttpSession::Ptr session; HttpResponse *response; uint32_t current_middleware_index; @@ -34,6 +34,8 @@ public: long file_chunk_size; bool file_next; + bool connection_closed; + void compile_body(); void compile_and_send_body(); void next_stage(); @@ -52,11 +54,14 @@ public: void pop_path(); void push_path(); + void update(); + Request(); ~Request(); protected: void _progress_send_file(); + void _file_chunk_sent(); std::vector _path_stack; uint32_t _path_stack_pointer;