Skip to content

Commit

Permalink
MT: move set_current_thread to Fiber (from Crystal::Scheduler)
Browse files Browse the repository at this point in the history
This avoids manipulating `fiber.@current_thread` which ain't very
pretty, and moves the responsibility to manipulate it to
Crystal::Scheduler. That Crystal::Scheduler is also responsible from
making sure a fiber will always be enqueued or resumed on the thread
it's been associated to.

Lastly, we remove the current_thread store that was always replacing any
previous value on context swap, which sadly doesn't seem to improve
performance...
  • Loading branch information
ysbaddaden committed Dec 14, 2023
1 parent c9e2860 commit 56f8909
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 14 deletions.
6 changes: 2 additions & 4 deletions src/concurrent.cr
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,8 @@ end
# ```
def spawn(*, name : String? = nil, same_thread = false, &block)
fiber = Fiber.new(name, &block)
if same_thread
fiber.@current_thread.set(Thread.current)
end
Crystal::Scheduler.enqueue fiber
{% if flag?(:preview_mt) %} fiber.set_current_thread if same_thread {% end %}
fiber.enqueue
fiber
end

Expand Down
31 changes: 24 additions & 7 deletions src/crystal/scheduler.cr
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ class Crystal::Scheduler
scheduler = thread.scheduler

{% if flag?(:preview_mt) %}
th = fiber.@current_thread.lazy_get || scheduler.find_target_thread
unless th = fiber.get_current_thread
th = fiber.set_current_thread(scheduler.find_target_thread)
end

if th == thread
scheduler.enqueue(fiber)
Expand All @@ -50,6 +52,7 @@ class Crystal::Scheduler
end

def self.resume(fiber : Fiber) : Nil
validate_running_thread(fiber)
Thread.current.scheduler.resume(fiber)
end

Expand All @@ -65,9 +68,22 @@ class Crystal::Scheduler
end

def self.yield(fiber : Fiber) : Nil
validate_running_thread(fiber)
Thread.current.scheduler.yield(fiber)
end

private def validate_running_thread(fiber : Fiber) : Nil
{% if flag?(:preview_mt) %}
if th = fiber.get_current_thread
unless th == Thread.current
raise "BUG: tried to manually resume #{fiber} on #{Thread.current} instead of #{th}"
end
else
fiber.set_current_thread
end
{% end %}
end

{% if flag?(:preview_mt) %}
def self.enqueue_free_stack(stack : Void*) : Nil
Thread.current.scheduler.enqueue_free_stack(stack)
Expand All @@ -78,11 +94,15 @@ class Crystal::Scheduler
private getter(fiber_channel : Crystal::FiberChannel) { Crystal::FiberChannel.new }
@free_stacks = Deque(Void*).new
{% end %}

@main : Fiber
@lock = Crystal::SpinLock.new
@sleeping = false

# :nodoc:
def initialize(@main : Fiber)
def initialize(thread : Thread)
@main = thread.main_fiber
{% if flag?(:preview_mt) %} @main.set_current_thread(thread) {% end %}
@current = @main
@runnables = Deque(Fiber).new
end
Expand All @@ -97,8 +117,8 @@ class Crystal::Scheduler

protected def resume(fiber : Fiber) : Nil
validate_resumable(fiber)

{% if flag?(:preview_mt) %}
set_current_thread(fiber)
GC.lock_read
{% elsif flag?(:interpreted) %}
# No need to change the stack bottom!
Expand Down Expand Up @@ -132,10 +152,6 @@ class Crystal::Scheduler
end
end

private def set_current_thread(fiber)
fiber.@current_thread.set(Thread.current)
end

private def fatal_resume_error(fiber, message)
Crystal::System.print_error "\nFATAL: #{message}: #{fiber}\n"
caller.each { |line| Crystal::System.print_error " from #{line}\n" }
Expand Down Expand Up @@ -230,6 +246,7 @@ class Crystal::Scheduler
@@workers = Array(Thread).new(count) do |i|
if i == 0
worker_loop = Fiber.new(name: "Worker Loop") { Thread.current.scheduler.run_loop }
worker_loop.set_current_thread
Thread.current.scheduler.enqueue worker_loop
Thread.current
else
Expand Down
2 changes: 1 addition & 1 deletion src/crystal/system/thread.cr
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ class Thread
end

# :nodoc:
getter scheduler : Crystal::Scheduler { Crystal::Scheduler.new(main_fiber) }
getter scheduler : Crystal::Scheduler { Crystal::Scheduler.new(self) }

protected def start
Thread.threads.push(self)
Expand Down
16 changes: 14 additions & 2 deletions src/fiber.cr
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class Fiber
property name : String?

@alive = true
@current_thread = Atomic(Thread?).new(nil)
{% if flag?(:preview_mt) %} @current_thread = Atomic(Thread?).new(nil) {% end %}

# :nodoc:
property next : Fiber?
Expand Down Expand Up @@ -136,7 +136,7 @@ class Fiber
{% end %}
thread.gc_thread_handler, @stack_bottom = GC.current_thread_stack_bottom
@name = "main"
@current_thread.set(thread)
{% if flag?(:preview_mt) %} @current_thread.set(thread) {% end %}
Fiber.fibers.push(self)
end

Expand Down Expand Up @@ -305,4 +305,16 @@ class Fiber
# Push the used section of the stack
GC.push_stack @context.stack_top, @stack_bottom
end

{% if flag?(:preview_mt) %}
# :nodoc:
def set_current_thread(thread = Thread.current) : Thread
@current_thread.set(thread)
end

# :nodoc:
def get_current_thread : Thread?
@current_thread.lazy_get
end
{% end %}
end

0 comments on commit 56f8909

Please sign in to comment.