Ruby Threads and ActiveRecord Connections
14 Feb 2016Processing large data sets is a common problem faced by many production web applications. One solution is to divide the work amongst multiple processes and have each responsible for a single or significantly smaller batch of data. However, this solution is not without its problems. Machine provisioning limitations or financial barriers may invalidate this solution for a very large N.
Within the same vein as “divide and conquer” exists another solution, one which requires far fewer parallel processes: Threads
. In the Ruby programming language, a Thread
is a built-in object for concurrent programming.
Unlike independent processes, all Ruby Threads
within the same process share memory, enabling each individual Thread
to consume or process objects and elements from the same data store.
For this example, a database will be queried, results manipulated and finally returned to same database via ActiveRecord
.
First Pass
Given a User
model backed by a simple users
table:
class User < ActiveRecord::Base
end
class CreateUsersTable < ActiveRecord::Migration
def change
create_table :users do |t|
t.string :first_name
t.string :last_name
t.string :email
t.boolean :validated, default: false
t.timestamps null: false
end
end
end
The problem to solve is fairly straight forward:
All
User
records that are not already validated should be fetched and an external API hit with their email address for validation, then saved. If aUser's
email address is not valid, it should be removed.
Fast forwarding through time, it can be assumed that a completely serial solution has been written and deemed unsatisfactory. Then, during a second iteration, a bit of concurrent code was written:
class UserEmailValidator
def self.run
User.where(validated: false)
.find_in_batches(batch_size: 30) do |user_batch|
validate_emails(user_batch)
end
end
def self.validate_emails(user_batch)
threads = user_batch.map do |user|
Thread.new do
email_validator = EmailService.new(user.email)
email_validator.validate
user.email = nil if email_validator.invalid?
user.validated = true
user.save!
end
end
threads.each(&:join)
end
end
The important pieces of this code are:
-
find_in_batches(batch_size: 30)
To cut down on memory allocation, only 30
Users
are fetched at one time. Each of theseUsers
have their email address validated in a separateThread
. -
threads.map(&:join)
The
join
method is used to make the program wait for allThreads
to complete. If oneThread
were to error out, some rescue and retry logic could be added to help.
Functionally adequate, this code should iterate over the entire set of User
records, 30 at a time, and update them in parallel.
However, actually running this code will point out a very serious limitation.
Note: The GIL in MRI Ruby prevents this code from ever running truly parallel but since this code is input/output constrained, it does not make a difference.
Connection Accumulation
Moments after the above code runs for the first time, this error is seen:
ActiveRecord::ConnectionTimeoutError:
could not obtain a database connection within 5.000 seconds
As it turns out, each Thread
spawned had allocated its own connection to the database via ActiveRecord
. Then, even after the Thread
completed its work, the connection was not relinquished. Stale, idle connections began to accumulate within the Ruby process and before even 1000 User
objects could be processed, the connection pool was empty.
Presumably, right after the self.validate_emails
method completed for a batch of Users
, the process should have released those connections to be used for either the next batch of Threads
or other processes.
Since this liberation of database connections did not happen automatically, it seems that we must intervene.
Pool Full of Connections, then We Dive in it
To help ActiveRecord
handle connection allocation and deallocation, the with_connection block is used. with_connection
is a method that yields
a connection to a block, then returns that same connection to the pool when the block has completed.
Basically, it ensures that the life cycle for a single database connection is only active within the block provided.
To make sure this solution will work, it is imperative that the database connection pool in either database.yml
or in a connection initializer is set high enough.
An example database.yml:
production:
adapter: postgresql
database: production_database
pool: 50
With a pool large enough, a with_connection
block can be inserted around the pertinent code:
class UserEmailValidator
def self.run
User.where(validated: false)
.find_in_batches(batch_size: 30) do |user_batch|
validate_emails(user_batch)
end
end
def self.validate_emails(user_batch)
threads = user_batch.map do |user|
Thread.new do
ActiveRecord::Base.connection_pool.with_connection do
email_validator = EmailService.new(user.email)
email_validator.validate
user.email = nil if email_validator.invalid?
user.validated = true
user.save!
end
end
end
threads.each(&:join)
end
end
Running this very deliberate connection handling process now ensures that no more than 30 connections are claimed. Each Thread
will connect to the database with one single connection and the with_connection
block will return that connection to the predefined pool upon completion.
With the addition of this necessary connection pruning, the great User
email validation effort can complete. Depending on the size of the data set and machines available, the 30 per batch limit could be easily increased, resulting in greater validation speed.
The database.yml
configuration should have a pool size of equal to or greater than the number of Threads
spawned. If 100 User
objects needed to be processed at once, the pool should be >= 100
.
Alternatives
Handling database connections and using raw Threads
this explicitly can make some developers uncomfortable. Luckily, a few great libraries have been written to alleviate this problem.
In a previous post, EM Synchrony was utilized to make parallel HTTP requests. It also has the ability to parallelize ActiveRecord
reading and writing without consuming excess database connections.
Another helpful library quickly becoming the most popular concurrency library is concurrent-ruby. The concurrent-ruby
library provides convenient models for dealing with futures, promises and other parallelization data structures. However, the same ActiveRecord
connections issues will occur with concurrent ruby objects as with plain old Threads
.
Additionally, this code has been written for the sake of making a point. There are many ways to solve a problem and this is simply one of them.