From 7b77067a888635f22de9fddd046c7027e26245c0 Mon Sep 17 00:00:00 2001 From: Julien Portalier Date: Thu, 7 Dec 2023 18:59:47 +0100 Subject: [PATCH] MT: per scheduler stack pools (let's recycle) The stack pool was only used to create new stacks when MT was enabled. The stacks were then pushed to each scheduler's free stacks, and eventually dropped on reschedule (after context swap, so we're sure to only ever deallocated stacks that are no longer used). This led the stacks to never be reused with MT. Only created then deallocated. This patch changes the behavior to have a stack pool running on each scheduler, and to use it to create and collect the stacks, and reuse them when possible. It also drops the mutex since the stack pool can never be accessed in parallel (in fact it never was). Also starts a collecting fiber on each thread. It may only lead to better performance if there are different fibers, running on multiple threads that are spawning fibers. It won't have much (or any) impact if there is only one fiber spawning other fibers (e.g. a HTTP::Server) as the stack of fibers that run on another thread won't be reused (different stack pool). --- src/crystal/scheduler.cr | 46 +++++++++++++++++++--------------------- src/fiber.cr | 18 ++++------------ src/fiber/stack_pool.cr | 16 +++++++++----- src/kernel.cr | 12 +---------- 4 files changed, 38 insertions(+), 54 deletions(-) diff --git a/src/crystal/scheduler.cr b/src/crystal/scheduler.cr index f46c22dd3bb6..39c692821f28 100644 --- a/src/crystal/scheduler.cr +++ b/src/crystal/scheduler.cr @@ -2,6 +2,7 @@ require "crystal/system/event_loop" require "crystal/system/print_error" require "./fiber_channel" require "fiber" +require "fiber/stack_pool" require "crystal/system/thread" # :nodoc: @@ -13,6 +14,11 @@ require "crystal/system/thread" # protected and must never be called directly. class Crystal::Scheduler @event_loop = Crystal::EventLoop.create + @stack_pool = Fiber::StackPool.new + + def self.stack_pool : Fiber::StackPool + Thread.current.scheduler.@stack_pool + end def self.event_loop Thread.current.scheduler.@event_loop @@ -84,15 +90,8 @@ class Crystal::Scheduler {% end %} end - {% if flag?(:preview_mt) %} - def self.enqueue_free_stack(stack : Void*) : Nil - Thread.current.scheduler.enqueue_free_stack(stack) - end - {% end %} - {% if flag?(:preview_mt) %} private getter(fiber_channel : Crystal::FiberChannel) { Crystal::FiberChannel.new } - @free_stacks = Deque(Void*).new {% end %} @main : Fiber @@ -158,18 +157,6 @@ class Crystal::Scheduler exit 1 end - {% if flag?(:preview_mt) %} - protected def enqueue_free_stack(stack) - @free_stacks.push stack - end - - private def release_free_stacks - while stack = @free_stacks.shift? - Fiber.stack_pool.release stack - end - end - {% end %} - protected def reschedule : Nil loop do if runnable = @lock.sync { @runnables.shift? } @@ -179,10 +166,6 @@ class Crystal::Scheduler @event_loop.run_once end end - - {% if flag?(:preview_mt) %} - release_free_stacks - {% end %} end protected def sleep(time : Time::Span) : Nil @@ -208,6 +191,8 @@ class Crystal::Scheduler end def run_loop + spawn_stack_pool_collector + fiber_channel = self.fiber_channel loop do @lock.lock @@ -240,7 +225,7 @@ class Crystal::Scheduler @lock.unlock end - def self.init_workers + def self.init : Nil count = worker_count pending = Atomic(Int32).new(count - 1) @@workers = Array(Thread).new(count) do |i| @@ -282,5 +267,18 @@ class Crystal::Scheduler 4 end end + {% else %} + def self.init : Nil + {% unless flag?(:interpreted) %} + Thread.current.scheduler.spawn_stack_pool_collector + {% end %} + end {% end %} + + # Background loop to cleanup unused fiber stacks. + def spawn_stack_pool_collector + fiber = Fiber.new(name: "Stack pool collector", &->@stack_pool.collect_loop) + {% if flag?(:preview_mt) %} fiber.set_current_thread {% end %} + enqueue(fiber) + end end diff --git a/src/fiber.cr b/src/fiber.cr index aa2af7bf2229..c96184f3cf1f 100644 --- a/src/fiber.cr +++ b/src/fiber.cr @@ -1,6 +1,5 @@ require "crystal/system/thread_linked_list" require "./fiber/context" -require "./fiber/stack_pool" # :nodoc: @[NoInline] @@ -47,9 +46,6 @@ class Fiber # :nodoc: protected class_getter(fibers) { Thread::LinkedList(Fiber).new } - # :nodoc: - class_getter stack_pool = StackPool.new - @context : Context @stack : Void* @resume_event : Crystal::EventLoop::Event? @@ -89,10 +85,9 @@ class Fiber @context = Context.new @stack, @stack_bottom = {% if flag?(:interpreted) %} - # For interpreted mode we don't need a new stack, the stack is held by the interpreter {Pointer(Void).null, Pointer(Void).null} {% else %} - Fiber.stack_pool.checkout + Crystal::Scheduler.stack_pool.checkout {% end %} fiber_main = ->(f : Fiber) { f.run } @@ -153,14 +148,6 @@ class Fiber ex.inspect_with_backtrace(STDERR) STDERR.flush ensure - {% if flag?(:preview_mt) %} - Crystal::Scheduler.enqueue_free_stack @stack - {% elsif flag?(:interpreted) %} - # For interpreted mode we don't need a new stack, the stack is held by the interpreter - {% else %} - Fiber.stack_pool.release(@stack) - {% end %} - # Remove the current fiber from the linked list Fiber.inactive(self) @@ -170,6 +157,9 @@ class Fiber @timeout_select_action = nil @alive = false + {% unless flag?(:interpreted) %} + Crystal::Scheduler.stack_pool.release(@stack) + {% end %} Crystal::Scheduler.reschedule end diff --git a/src/fiber/stack_pool.cr b/src/fiber/stack_pool.cr index 54d03e4ffa5f..aebd82a0870f 100644 --- a/src/fiber/stack_pool.cr +++ b/src/fiber/stack_pool.cr @@ -7,14 +7,13 @@ class Fiber def initialize @deque = Deque(Void*).new - @mutex = Thread::Mutex.new end # Removes and frees at most *count* stacks from the top of the pool, # returning memory to the operating system. def collect(count = lazy_size // 2) : Nil count.times do - if stack = @mutex.synchronize { @deque.shift? } + if stack = @deque.shift? Crystal::System::Fiber.free_stack(stack, STACK_SIZE) else return @@ -22,21 +21,28 @@ class Fiber end end + def collect_loop(every = 5.seconds) : Nil + loop do + sleep every + collect + end + end + # Removes a stack from the bottom of the pool, or allocates a new one. def checkout : {Void*, Void*} - stack = @mutex.synchronize { @deque.pop? } || Crystal::System::Fiber.allocate_stack(STACK_SIZE) + stack = @deque.pop? || Crystal::System::Fiber.allocate_stack(STACK_SIZE) {stack, stack + STACK_SIZE} end # Appends a stack to the bottom of the pool. def release(stack) : Nil - @mutex.synchronize { @deque.push(stack) } + @deque.push(stack) end # Returns the approximated size of the pool. It may be equal or slightly # bigger or smaller than the actual size. def lazy_size : Int32 - @mutex.synchronize { @deque.size } + @deque.size end end end diff --git a/src/kernel.cr b/src/kernel.cr index c3b3106ccae3..d3817ee11661 100644 --- a/src/kernel.cr +++ b/src/kernel.cr @@ -563,14 +563,6 @@ end {% end %} {% unless flag?(:interpreted) || flag?(:wasm32) %} - # Background loop to cleanup unused fiber stacks. - spawn(name: "Fiber Clean Loop") do - loop do - sleep 5 - Fiber.stack_pool.collect - end - end - {% if flag?(:win32) %} Crystal::System::Process.start_interrupt_loop {% else %} @@ -586,7 +578,5 @@ end Exception::CallStack.load_debug_info if ENV["CRYSTAL_LOAD_DEBUG_INFO"]? == "1" Exception::CallStack.setup_crash_handler - {% if flag?(:preview_mt) %} - Crystal::Scheduler.init_workers - {% end %} + Crystal::Scheduler.init {% end %}