diff --git a/core/containers/local_vector.h b/core/containers/local_vector.h index 8ca2b605b..cd9048179 100644 --- a/core/containers/local_vector.h +++ b/core/containers/local_vector.h @@ -134,9 +134,9 @@ public: } _FORCE_INLINE_ bool empty() const { return count == 0; } _FORCE_INLINE_ U get_capacity() const { return capacity; } - _FORCE_INLINE_ void reserve(U p_size) { + _FORCE_INLINE_ void reserve(U p_size, bool p_allow_shrink = false) { p_size = nearest_power_of_2_templated(p_size); - if (p_size > capacity) { + if (!p_allow_shrink ? p_size > capacity : ((p_size >= count) && (p_size != capacity))) { capacity = p_size; data = (T *)memrealloc(data, capacity * sizeof(T)); CRASH_COND_MSG(!data, "Out of memory"); diff --git a/core/object/message_queue.cpp b/core/object/message_queue.cpp index c573d7d0a..bb44f5f63 100644 --- a/core/object/message_queue.cpp +++ b/core/object/message_queue.cpp @@ -44,17 +44,24 @@ Error MessageQueue::push_call(ObjectID p_id, const StringName &p_method, const V int room_needed = sizeof(Message) + sizeof(Variant) * p_argcount; - if ((buffer_end + room_needed) >= buffer_size) { - String type; - if (ObjectDB::get_instance(p_id)) { - type = ObjectDB::get_instance(p_id)->get_class(); + Buffer &buffer = buffers[write_buffer]; + + if ((buffer.end + room_needed) > buffer.data.size()) { + if ((buffer.end + room_needed) > max_allowed_buffer_size) { + String type; + if (ObjectDB::get_instance(p_id)) { + type = ObjectDB::get_instance(p_id)->get_class(); + } + print_line("Failed method: " + p_method); + statistics(); + ERR_FAIL_V_MSG(ERR_OUT_OF_MEMORY, "Message queue out of memory. Try increasing 'memory/limits/message_queue/max_size_mb' in project settings."); + } else { + buffer.data.resize(buffer.end + room_needed); } - print_line("Failed method: " + type + ":" + p_method + " target ID: " + itos(p_id)); - statistics(); - ERR_FAIL_V_MSG(ERR_OUT_OF_MEMORY, "Message queue out of memory. Try increasing 'memory/limits/message_queue/max_size_kb' in project settings."); } - Message *msg = memnew_placement(&buffer[buffer_end], Message); + Message *msg = memnew_placement(&buffer.data[buffer.end], Message); + msg->args = p_argcount; msg->instance_id = p_id; msg->target = p_method; @@ -63,11 +70,11 @@ Error MessageQueue::push_call(ObjectID p_id, const StringName &p_method, const V msg->type |= FLAG_SHOW_ERROR; } - buffer_end += sizeof(Message); + buffer.end += sizeof(Message); for (int i = 0; i < p_argcount; i++) { - Variant *v = memnew_placement(&buffer[buffer_end], Variant); - buffer_end += sizeof(Variant); + Variant *v = memnew_placement(&buffer.data[buffer.end], Variant); + buffer.end += sizeof(Variant); *v = *p_args[i]; } @@ -94,26 +101,33 @@ Error MessageQueue::push_set(ObjectID p_id, const StringName &p_prop, const Vari uint8_t room_needed = sizeof(Message) + sizeof(Variant); - if ((buffer_end + room_needed) >= buffer_size) { - String type; - if (ObjectDB::get_instance(p_id)) { - type = ObjectDB::get_instance(p_id)->get_class(); + Buffer &buffer = buffers[write_buffer]; + + if ((buffer.end + room_needed) > buffer.data.size()) { + if ((buffer.end + room_needed) > max_allowed_buffer_size) { + String type; + if (ObjectDB::get_instance(p_id)) { + type = ObjectDB::get_instance(p_id)->get_class(); + } + print_line("Failed set: " + type + ":" + p_prop + " target ID: " + itos(p_id)); + statistics(); + ERR_FAIL_V_MSG(ERR_OUT_OF_MEMORY, "Message queue out of memory. Try increasing 'memory/limits/message_queue/max_size_mb' in project settings."); + } else { + buffer.data.resize(buffer.end + room_needed); } - print_line("Failed set: " + type + ":" + p_prop + " target ID: " + itos(p_id)); - statistics(); - ERR_FAIL_V_MSG(ERR_OUT_OF_MEMORY, "Message queue out of memory. Try increasing 'memory/limits/message_queue/max_size_kb' in project settings."); } - Message *msg = memnew_placement(&buffer[buffer_end], Message); + Message *msg = memnew_placement(&buffer.data[buffer.end], Message); + msg->args = 1; msg->instance_id = p_id; msg->target = p_prop; msg->type = TYPE_SET; - buffer_end += sizeof(Message); + buffer.end += sizeof(Message); - Variant *v = memnew_placement(&buffer[buffer_end], Variant); - buffer_end += sizeof(Variant); + Variant *v = memnew_placement(&buffer.data[buffer.end], Variant); + buffer.end += sizeof(Variant); *v = p_value; return OK; @@ -126,20 +140,30 @@ Error MessageQueue::push_notification(ObjectID p_id, int p_notification) { uint8_t room_needed = sizeof(Message); - if ((buffer_end + room_needed) >= buffer_size) { - print_line("Failed notification: " + itos(p_notification) + " target ID: " + itos(p_id)); - statistics(); - ERR_FAIL_V_MSG(ERR_OUT_OF_MEMORY, "Message queue out of memory. Try increasing 'memory/limits/message_queue/max_size_kb' in project settings."); + Buffer &buffer = buffers[write_buffer]; + + if ((buffer.end + room_needed) > buffer.data.size()) { + if ((buffer.end + room_needed) > max_allowed_buffer_size) { + String type; + if (ObjectDB::get_instance(p_id)) { + type = ObjectDB::get_instance(p_id)->get_class(); + } + print_line("Failed notification: " + itos(p_notification) + " target ID: " + itos(p_id)); + statistics(); + ERR_FAIL_V_MSG(ERR_OUT_OF_MEMORY, "Message queue out of memory. Try increasing 'memory/limits/message_queue/max_size_mb' in project settings."); + } else { + buffer.data.resize(buffer.end + room_needed); + } } - Message *msg = memnew_placement(&buffer[buffer_end], Message); + Message *msg = memnew_placement(&buffer.data[buffer.end], Message); msg->type = TYPE_NOTIFICATION; msg->instance_id = p_id; //msg->target; msg->notification = p_notification; - buffer_end += sizeof(Message); + buffer.end += sizeof(Message); return OK; } @@ -161,9 +185,11 @@ void MessageQueue::statistics() { RBMap call_count; int null_count = 0; + Buffer &buffer = buffers[write_buffer]; + uint32_t read_pos = 0; - while (read_pos < buffer_end) { - Message *message = (Message *)&buffer[read_pos]; + while (read_pos < buffer.end) { + Message *message = (Message *)&buffer.data[read_pos]; Object *target = ObjectDB::get_instance(message->instance_id); @@ -208,7 +234,7 @@ void MessageQueue::statistics() { } } - print_line("TOTAL BYTES: " + itos(buffer_end)); + print_line("TOTAL BYTES: " + itos(buffer.end)); print_line("NULL count: " + itos(null_count)); for (RBMap::Element *E = set_count.front(); E; E = E->next()) { @@ -225,7 +251,8 @@ void MessageQueue::statistics() { } int MessageQueue::get_max_buffer_usage() const { - return buffer_max_used; + // Note this may be better read_buffer, or a combination, depending when this is read. + return buffers[write_buffer].data.size(); } void MessageQueue::_call_function(Object *p_target, const StringName &p_func, const Variant *p_args, int p_argcount, bool p_show_error) { @@ -244,73 +271,138 @@ void MessageQueue::_call_function(Object *p_target, const StringName &p_func, co } } -void MessageQueue::flush() { - if (buffer_end > buffer_max_used) { - buffer_max_used = buffer_end; +void MessageQueue::_update_buffer_monitor() { + // The number of flushes is an approximate delay before + // considering shrinking. This is somewhat of a magic number, + // but only acts to prevent excessive oscillations. + if (++_buffer_size_monitor.flush_count == 8192) { + uint32_t max_size = _buffer_size_monitor.max_size; + + // Uncomment this define to log message queue sizes and + // auto-shrinking behaviour. + // #define DEBUG_MESSAGE_QUEUE_SIZES +#ifdef DEBUG_MESSAGE_QUEUE_SIZES + print_line("MessageQueue buffer max size " + itos(max_size) + " bytes."); +#endif + + // reset for next time + _buffer_size_monitor.flush_count = 0; + _buffer_size_monitor.max_size = 0; + + for (uint32_t n = 0; n < 2; n++) { + uint32_t cap = buffers[n].data.get_capacity(); + + // Only worry about reducing memory if the capacity is high + // (due to e.g. loading a level or something). + // The shrinking will only take place below 256K, to prevent + // excessive reallocating. + if (cap > (256 * 1024)) { + // Only shrink if we are routinely using a lot less than the capacity. + if ((max_size * 4) < cap) { + buffers[n].data.reserve(cap / 2, true); +#ifdef DEBUG_MESSAGE_QUEUE_SIZES + print_line("MessageQueue reducing buffer[" + itos(n) + "] capacity from " + itos(cap) + " bytes to " + itos(cap / 2) + " bytes."); +#endif + } + } + } } +} - uint32_t read_pos = 0; - +void MessageQueue::flush() { //using reverse locking strategy _THREAD_SAFE_LOCK_ - ERR_FAIL_COND(flushing); //already flushing, you did something odd - flushing = true; - - while (read_pos < buffer_end) { - //lock on each iteration, so a call can re-add itself to the message queue - - Message *message = (Message *)&buffer[read_pos]; - - uint32_t advance = sizeof(Message); - if ((message->type & FLAG_MASK) != TYPE_NOTIFICATION) { - advance += sizeof(Variant) * message->args; - } - - //pre-advance so this function is reentrant - read_pos += advance; - + if (flushing) { _THREAD_SAFE_UNLOCK_ - - Object *target = ObjectDB::get_instance(message->instance_id); - - if (target != nullptr) { - switch (message->type & FLAG_MASK) { - case TYPE_CALL: { - Variant *args = (Variant *)(message + 1); - - // messages don't expect a return value - - _call_function(target, message->target, args, message->args, message->type & FLAG_SHOW_ERROR); - - } break; - case TYPE_NOTIFICATION: { - // messages don't expect a return value - target->notification(message->notification); - - } break; - case TYPE_SET: { - Variant *arg = (Variant *)(message + 1); - // messages don't expect a return value - target->set(message->target, *arg); - - } break; - } - } - - if ((message->type & FLAG_MASK) != TYPE_NOTIFICATION) { - Variant *args = (Variant *)(message + 1); - for (int i = 0; i < message->args; i++) { - args[i].~Variant(); - } - } - - message->~Message(); - - _THREAD_SAFE_LOCK_ + ERR_FAIL_MSG("Already flushing"); //already flushing, you did something odd } - buffer_end = 0; // reset buffer + // first flip buffers, in preparation + SWAP(read_buffer, write_buffer); + + flushing = true; + + _update_buffer_monitor(); + _THREAD_SAFE_UNLOCK_ + + // This loop works by having a read buffer and write buffer. + // While we are reading from one buffer we can be filling another. + // This enables them to be independent, and not require locks per message. + // It also avoids pushing and resizing the write buffer corrupting the read buffer. + // The trade off is that it requires more memory. + // However the peak size of each can be lower, because they do not ADD + // to each other during transit. + + while (buffers[read_buffer].data.size()) { + uint32_t read_pos = 0; + Buffer &buffer = buffers[read_buffer]; + + while (read_pos < buffer.end) { + Message *message = (Message *)&buffer.data[read_pos]; + + uint32_t advance = sizeof(Message); + if ((message->type & FLAG_MASK) != TYPE_NOTIFICATION) { + advance += sizeof(Variant) * message->args; + } + + read_pos += advance; + + Object *target = ObjectDB::get_instance(message->instance_id); + + if (target != nullptr) { + switch (message->type & FLAG_MASK) { + case TYPE_CALL: { + Variant *args = (Variant *)(message + 1); + + // messages don't expect a return value + + _call_function(target, message->target, args, message->args, message->type & FLAG_SHOW_ERROR); + + } break; + case TYPE_NOTIFICATION: { + // messages don't expect a return value + target->notification(message->notification); + + } break; + case TYPE_SET: { + Variant *arg = (Variant *)(message + 1); + // messages don't expect a return value + target->set(message->target, *arg); + + } break; + } + } + + if ((message->type & FLAG_MASK) != TYPE_NOTIFICATION) { + Variant *args = (Variant *)(message + 1); + for (int i = 0; i < message->args; i++) { + args[i].~Variant(); + } + } + + message->~Message(); + + } // while going through buffer + + buffer.end = 0; // reset buffer + + uint32_t buffer_data_size = buffer.data.size(); + buffer.data.clear(); + + _THREAD_SAFE_LOCK_ + + // keep track of the maximum used size, so we can downsize buffers when appropriate + _buffer_size_monitor.max_size = MAX(buffer_data_size, _buffer_size_monitor.max_size); + + // flip buffers, this is the only part that requires a lock + SWAP(read_buffer, write_buffer); + _THREAD_SAFE_UNLOCK_ + + } // while read buffer not empty + + _THREAD_SAFE_LOCK_ + flushing = false; _THREAD_SAFE_UNLOCK_ } @@ -324,34 +416,36 @@ MessageQueue::MessageQueue() { singleton = this; flushing = false; - buffer_end = 0; - buffer_max_used = 0; - buffer_size = GLOBAL_DEF_RST("memory/limits/message_queue/max_size_kb", DEFAULT_QUEUE_SIZE_KB); - ProjectSettings::get_singleton()->set_custom_property_info("memory/limits/message_queue/max_size_kb", PropertyInfo(Variant::INT, "memory/limits/message_queue/max_size_kb", PROPERTY_HINT_RANGE, "1024,4096,1,or_greater")); - buffer_size *= 1024; - buffer = memnew_arr(uint8_t, buffer_size); + max_allowed_buffer_size = GLOBAL_DEF_RST("memory/limits/message_queue/max_size_mb", 32); + ProjectSettings::get_singleton()->set_custom_property_info("memory/limits/message_queue/max_size_mb", PropertyInfo(Variant::INT, "memory/limits/message_queue/max_size_mb", PROPERTY_HINT_RANGE, "4,512,1,or_greater")); + + max_allowed_buffer_size *= 1024 * 1024; } MessageQueue::~MessageQueue() { - uint32_t read_pos = 0; + for (int which = 0; which < 2; which++) { + Buffer &buffer = buffers[which]; + uint32_t read_pos = 0; - while (read_pos < buffer_end) { - Message *message = (Message *)&buffer[read_pos]; - Variant *args = (Variant *)(message + 1); - int argc = message->args; - if ((message->type & FLAG_MASK) != TYPE_NOTIFICATION) { - for (int i = 0; i < argc; i++) { - args[i].~Variant(); + while (read_pos < buffer.end) { + Message *message = (Message *)&buffer.data[read_pos]; + Variant *args = (Variant *)(message + 1); + int argc = message->args; + if ((message->type & FLAG_MASK) != TYPE_NOTIFICATION) { + for (int i = 0; i < argc; i++) { + args[i].~Variant(); + } + } + + message->~Message(); + + read_pos += sizeof(Message); + if ((message->type & FLAG_MASK) != TYPE_NOTIFICATION) { + read_pos += sizeof(Variant) * message->args; } } - message->~Message(); - read_pos += sizeof(Message); - if ((message->type & FLAG_MASK) != TYPE_NOTIFICATION) { - read_pos += sizeof(Variant) * message->args; - } - } + } // for which singleton = nullptr; - memdelete_arr(buffer); } diff --git a/core/object/message_queue.h b/core/object/message_queue.h index c91ab67a9..ca839fbca 100644 --- a/core/object/message_queue.h +++ b/core/object/message_queue.h @@ -30,16 +30,13 @@ /* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ /*************************************************************************/ +#include "core/containers/local_vector.h" #include "core/object/object.h" #include "core/os/thread_safe.h" class MessageQueue { _THREAD_SAFE_CLASS_ - enum { - DEFAULT_QUEUE_SIZE_KB = 4096 - }; - enum { TYPE_CALL, TYPE_NOTIFICATION, @@ -59,12 +56,23 @@ class MessageQueue { }; }; - uint8_t *buffer; - uint32_t buffer_end; - uint32_t buffer_max_used; - uint32_t buffer_size; + struct Buffer { + LocalVector data; + uint64_t end = 0; + }; + + Buffer buffers[2]; + int read_buffer = 0; + int write_buffer = 1; + uint64_t max_allowed_buffer_size = 0; + + struct BufferSizeMonitor { + uint32_t max_size = 0; + uint32_t flush_count = 0; + } _buffer_size_monitor; void _call_function(Object *p_target, const StringName &p_func, const Variant *p_args, int p_argcount, bool p_show_error); + void _update_buffer_monitor(); static MessageQueue *singleton; diff --git a/doc/classes/ProjectSettings.xml b/doc/classes/ProjectSettings.xml index 43df31e45..14aa44a59 100644 --- a/doc/classes/ProjectSettings.xml +++ b/doc/classes/ProjectSettings.xml @@ -1316,7 +1316,7 @@ - + Godot uses a message queue to defer some function calls. If you run out of space on it (you will see an error), you can increase the size here.