Fibers and Enumerators in Ruby: Turning Blocks Inside Out

Ruby has various ways of performing iteration — loops, blocks and enumerators. Most Ruby programmers are at least familiar with loops and blocks but Enumerator and Fiber often stay in the dark. In this edition of Ruby Magic, guest author Julik shines a light on Enumerable and Fiber to explain flow controlling enumerables and turning blocks inside out.

Suspending Blocks and Chained Iteration

We’ve discussed Enumerator in a previous edition of Ruby Magic, where we described how to return an Enumerator from your own #each method and what it can be used for. An even broader use case for Enumerator and Fiber is that they can “suspend a block” mid-flight. Not just the block given to #each or the entire call to #each, but any block!

db.with_each_row_of_result(sql_stmt) do |row|
yield row
end
@cursor = cursor

# later:
row = @cursor.next_row
send_row_to_event_stream(row)

Chaining Iterators

One of the most common uses of this pattern is chaining multiple iterators together. When we do so, the methods we are used to for iteration (like #each), return an Enumerator object instead, which we can use to “grab” the values that the block sends us using the yield statement:

range = 1..8
each_enum = range.each # => <Enumerator...>
(1..3).map.with_index {|element_n, index| [element_n, index] }
#=> [[1, 0], [2, 1], [3, 2]]
@cursor = db.to_enum(:with_each_row_of_result, sql_stmt)
schedule_for_later do
begin
row = @cursor.next
send_row_to_event_stream(row)
rescue StopIteration # the block has ended and the cursor is empty, the cleanup has taken place
end
end
cursor = Enumerator.new do |yielder|
db.with_each_row_of_result(sql_stmt) do |row|
yielder.yield row
end
end

Turning Blocks Inside Out

Rails allows us to assign the response body to also be an Enumerator. It will call next on the Enumerator we assign as the response body and expect the returned value to be a string—which will be written out into the Rack response. For example, we can return a call to the #each method of a Range as a Rails response body:

class MyController < ApplicationController
def index
response.body = ('a'..'z').each
end
end
File.open('output.tmp', 'wb') do |f|
# Yield file for writing, continuously
loop { yield(f) }
end
writer_enum = File.to_enum(:open, 'output.tmp', 'wb')
file = en.next
file << data
file << more_data
write_file_through_encryptor(file_name) do |writable|
writable << "Some data"
writable << "Some more data"
writable << "Even more data"
end
writable = write_file_through_encryptor(file_name)
writable << "Some data"
# ...later on
writable << "Some more data"
writable.finish
write_file_through_encryptor(file_name) do |writable|
loop do
yield_and_wait_for_next_call(writable)
# Then we somehow break out of this loop to let the block complete
end
end
deferred_writable = write_file_through_encryptor(file_name)
deferred_writable.next("Some data")
deferred_writable.next("Some more data")
deferred_writable.next("Even more data")
deferred_writable.next(:terminate)
deferred_writable = write_file_through_encryptor(file_name)
deferred_writable.next("Some data")
...
deferred_writable.next(:terminate)

Enter Ruby’s Fibers

This is exactly what Fibers permit. A Fiber allows you to accept arguments on each reentry, so we can implement our wrapper like so:

deferred_writable = Fiber.new do |data_to_write_or_termination|
write_file_through_encryptor(filename) do |f|
# Here we enter the block context of the fiber, reentry will be to the start of this block
loop do
# When we call Fiber.yield our fiber will be suspended—we won't reach the
# "data_to_write_or_termination = " assignment before our fiber gets resumed
data_to_write_or_termination = Fiber.yield
end
end
end
deferred_writes = Fiber.new do |data_to_write|
loop do
$stderr.puts "Received #{data_to_write} to work with"
data_to_write = Fiber.yield
end
end
# => #<Fiber:0x007f9f531783e8>
deferred_writes.resume("Hello") #=> Received Hello to work with
deferred_writes.resume("Goodbye") #=> Received Goodbye to work with
deferred_writes = Fiber.new do |data_to_write|
loop do
$stderr.puts "Received #{data_to_write} to work with"
break if data_to_write == :terminate # Break out of the loop, or...
write_to_output(data_to_write) # ...write to the output
data_to_write = Fiber.yield # suspend ourselves and wait for the next `resume`
end
# We end up here if we break out of the loop above. There is no Fiber.yield
# statement anywhere, so the Fiber will terminate and become "dead".
end

deferred_writes.resume("Hello") #=> Received Hello to work with
deferred_writes.resume("Goodbye") #=> Received Goodbye to work with
deferred_writes.resume(:terminate)
deferred_writes.resume("Some more data after close") # FiberError: dead fiber called
client_fiber = Fiber.new do |socket|
loop do
received_from_client = socket.read_nonblock(10)
sent_to_client = socket.write_nonblock("OK")
Fiber.yield # Return control back to the caller and wait for it to call 'resume' on us
end
end

client_fibers << client_fiber

# and then in your main webserver loop
client_fibers.each do |client_fiber|
client_fiber.resume # Receive data from the client if any, and send it an OK
end

Controlling Data Emission Rates

Another great use for fibers and enumerators can arise when you want to be able to control the rate at which a Ruby block emits data. For example, in zip_tricks we support the following block use as the primary way of using the library:

ZipTricks::Streamer.open(output_io) do |z|
z.write_deflated_file("big.csv") do |destination|
columns.each do |col|
destination << column
end
end
end
output_enum = ZipTricks::Streamer.output_enum do |z|
z.write_deflated_file("big.csv") do |destination|
columns.each do |col|
destination << column
end
end
end
# At this point nothing has been generated or written yet
enum = output_enum.each # Create an Enumerator
bin_str = enum.next # Let the block generate some binary data and then suspend it
output.write(bin_str) # Our block is suspended and waiting for the next invocation of `next`

Conclusion

This concludes our look into flow-controlled enumerables in Ruby. Along the way, Julik shone light on the similarities and differences between the Enumerable and Fiber classes, and dove into examples where caller determined the flow of data. We’ve also learned about Fiber’s additional magic to allow passing arguments on each block reentry. Happy flow-controlling!

Error tracking and performance insights for Ruby and Elixir without the silly per-host pricing. From Amsterdam with love.