Skip to content

Commit

Permalink
MT: per scheduler stack pools (let's recycle)
Browse files Browse the repository at this point in the history
The stack pool was only used to create new stacks when MT was enabled.
The stacks were then pushed to each scheduler's free stacks, and
eventually dropped on reschedule (after context swap, so we're sure to
only ever deallocated stacks that are no longer used). This led the
stacks to never be reused with MT. Only created then deallocated.

This patch changes the behavior to have a stack pool running on each
scheduler, and to use it to create and collect the stacks, and reuse
them when possible. It also drops the mutex since the stack pool can
never be accessed in parallel (in fact it never was).

Also starts a collecting fiber on each thread.

It may only lead to better performance if there are different fibers,
running on multiple threads that are spawning fibers. It won't have much
(or any) impact if there is only one fiber spawning other fibers (e.g. a
HTTP::Server) as the stack of fibers that run on another thread won't be
reused (different stack pool).
  • Loading branch information
ysbaddaden committed Dec 14, 2023
1 parent 56f8909 commit 7b77067
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 54 deletions.
46 changes: 22 additions & 24 deletions src/crystal/scheduler.cr
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ require "crystal/system/event_loop"
require "crystal/system/print_error"
require "./fiber_channel"
require "fiber"
require "fiber/stack_pool"
require "crystal/system/thread"

# :nodoc:
Expand All @@ -13,6 +14,11 @@ require "crystal/system/thread"
# protected and must never be called directly.
class Crystal::Scheduler
@event_loop = Crystal::EventLoop.create
@stack_pool = Fiber::StackPool.new

def self.stack_pool : Fiber::StackPool
Thread.current.scheduler.@stack_pool
end

def self.event_loop
Thread.current.scheduler.@event_loop
Expand Down Expand Up @@ -84,15 +90,8 @@ class Crystal::Scheduler
{% end %}
end

{% if flag?(:preview_mt) %}
def self.enqueue_free_stack(stack : Void*) : Nil
Thread.current.scheduler.enqueue_free_stack(stack)
end
{% end %}

{% if flag?(:preview_mt) %}
private getter(fiber_channel : Crystal::FiberChannel) { Crystal::FiberChannel.new }
@free_stacks = Deque(Void*).new
{% end %}

@main : Fiber
Expand Down Expand Up @@ -158,18 +157,6 @@ class Crystal::Scheduler
exit 1
end

{% if flag?(:preview_mt) %}
protected def enqueue_free_stack(stack)
@free_stacks.push stack
end

private def release_free_stacks
while stack = @free_stacks.shift?
Fiber.stack_pool.release stack
end
end
{% end %}

protected def reschedule : Nil
loop do
if runnable = @lock.sync { @runnables.shift? }
Expand All @@ -179,10 +166,6 @@ class Crystal::Scheduler
@event_loop.run_once
end
end

{% if flag?(:preview_mt) %}
release_free_stacks
{% end %}
end

protected def sleep(time : Time::Span) : Nil
Expand All @@ -208,6 +191,8 @@ class Crystal::Scheduler
end

def run_loop
spawn_stack_pool_collector

fiber_channel = self.fiber_channel
loop do
@lock.lock
Expand Down Expand Up @@ -240,7 +225,7 @@ class Crystal::Scheduler
@lock.unlock
end

def self.init_workers
def self.init : Nil
count = worker_count
pending = Atomic(Int32).new(count - 1)
@@workers = Array(Thread).new(count) do |i|
Expand Down Expand Up @@ -282,5 +267,18 @@ class Crystal::Scheduler
4
end
end
{% else %}
def self.init : Nil
{% unless flag?(:interpreted) %}
Thread.current.scheduler.spawn_stack_pool_collector
{% end %}
end
{% end %}

# Background loop to cleanup unused fiber stacks.
def spawn_stack_pool_collector
fiber = Fiber.new(name: "Stack pool collector", &->@stack_pool.collect_loop)
{% if flag?(:preview_mt) %} fiber.set_current_thread {% end %}
enqueue(fiber)
end
end
18 changes: 4 additions & 14 deletions src/fiber.cr
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
require "crystal/system/thread_linked_list"
require "./fiber/context"
require "./fiber/stack_pool"

# :nodoc:
@[NoInline]
Expand Down Expand Up @@ -47,9 +46,6 @@ class Fiber
# :nodoc:
protected class_getter(fibers) { Thread::LinkedList(Fiber).new }

# :nodoc:
class_getter stack_pool = StackPool.new

@context : Context
@stack : Void*
@resume_event : Crystal::EventLoop::Event?
Expand Down Expand Up @@ -89,10 +85,9 @@ class Fiber
@context = Context.new
@stack, @stack_bottom =
{% if flag?(:interpreted) %}
# For interpreted mode we don't need a new stack, the stack is held by the interpreter
{Pointer(Void).null, Pointer(Void).null}
{% else %}
Fiber.stack_pool.checkout
Crystal::Scheduler.stack_pool.checkout
{% end %}

fiber_main = ->(f : Fiber) { f.run }
Expand Down Expand Up @@ -153,14 +148,6 @@ class Fiber
ex.inspect_with_backtrace(STDERR)
STDERR.flush
ensure
{% if flag?(:preview_mt) %}
Crystal::Scheduler.enqueue_free_stack @stack
{% elsif flag?(:interpreted) %}
# For interpreted mode we don't need a new stack, the stack is held by the interpreter
{% else %}
Fiber.stack_pool.release(@stack)
{% end %}

# Remove the current fiber from the linked list
Fiber.inactive(self)

Expand All @@ -170,6 +157,9 @@ class Fiber
@timeout_select_action = nil

@alive = false
{% unless flag?(:interpreted) %}
Crystal::Scheduler.stack_pool.release(@stack)
{% end %}
Crystal::Scheduler.reschedule
end

Expand Down
16 changes: 11 additions & 5 deletions src/fiber/stack_pool.cr
Original file line number Diff line number Diff line change
Expand Up @@ -7,36 +7,42 @@ class Fiber

def initialize
@deque = Deque(Void*).new
@mutex = Thread::Mutex.new
end

# Removes and frees at most *count* stacks from the top of the pool,
# returning memory to the operating system.
def collect(count = lazy_size // 2) : Nil
count.times do
if stack = @mutex.synchronize { @deque.shift? }
if stack = @deque.shift?
Crystal::System::Fiber.free_stack(stack, STACK_SIZE)
else
return
end
end
end

def collect_loop(every = 5.seconds) : Nil
loop do
sleep every
collect
end
end

# Removes a stack from the bottom of the pool, or allocates a new one.
def checkout : {Void*, Void*}
stack = @mutex.synchronize { @deque.pop? } || Crystal::System::Fiber.allocate_stack(STACK_SIZE)
stack = @deque.pop? || Crystal::System::Fiber.allocate_stack(STACK_SIZE)
{stack, stack + STACK_SIZE}
end

# Appends a stack to the bottom of the pool.
def release(stack) : Nil
@mutex.synchronize { @deque.push(stack) }
@deque.push(stack)
end

# Returns the approximated size of the pool. It may be equal or slightly
# bigger or smaller than the actual size.
def lazy_size : Int32
@mutex.synchronize { @deque.size }
@deque.size
end
end
end
12 changes: 1 addition & 11 deletions src/kernel.cr
Original file line number Diff line number Diff line change
Expand Up @@ -563,14 +563,6 @@ end
{% end %}

{% unless flag?(:interpreted) || flag?(:wasm32) %}
# Background loop to cleanup unused fiber stacks.
spawn(name: "Fiber Clean Loop") do
loop do
sleep 5
Fiber.stack_pool.collect
end
end

{% if flag?(:win32) %}
Crystal::System::Process.start_interrupt_loop
{% else %}
Expand All @@ -586,7 +578,5 @@ end
Exception::CallStack.load_debug_info if ENV["CRYSTAL_LOAD_DEBUG_INFO"]? == "1"
Exception::CallStack.setup_crash_handler

{% if flag?(:preview_mt) %}
Crystal::Scheduler.init_workers
{% end %}
Crystal::Scheduler.init
{% end %}

0 comments on commit 7b77067

Please sign in to comment.