Ruby Threads and ActiveRecord Connections14 Feb 2016
Processing large data sets is a common problem faced by many production web applications. One solution is to divide the work amongst multiple processes and have each responsible for a single or significantly smaller batch of data. However, this solution is not without its problems. Machine provisioning limitations or financial barriers may invalidate this solution for a very large N.
Within the same vein as “divide and conquer” exists another solution, one which requires far fewer parallel processes:
Threads. In the Ruby programming language, a
Thread is a built-in object for concurrent programming.
Unlike independent processes, all Ruby
Threads within the same process share memory, enabling each individual
Thread to consume or process objects and elements from the same data store.
For this example, a database will be queried, results manipulated and finally returned to same database via
User model backed by a simple
class User < ActiveRecord::Base end
class CreateUsersTable < ActiveRecord::Migration def change create_table :users do |t| t.string :first_name t.string :last_name t.string :email t.boolean :validated, default: false t.timestamps null: false end end end
The problem to solve is fairly straight forward:
Userrecords that are not already validated should be fetched and an external API hit with their email address for validation, then saved. If a
User'semail address is not valid, it should be removed.
Fast forwarding through time, it can be assumed that a completely serial solution has been written and deemed unsatisfactory. Then, during a second iteration, a bit of concurrent code was written:
class UserEmailValidator def self.run User.where(validated: false) .find_in_batches(batch_size: 30) do |user_batch| validate_emails(user_batch) end end def self.validate_emails(user_batch) threads = user_batch.map do |user| Thread.new do email_validator = EmailService.new(user.email) email_validator.validate user.email = nil if email_validator.invalid? user.validated = true user.save! end end threads.each(&:join) end end
The important pieces of this code are:
To cut down on memory allocation, only 30
Usersare fetched at one time. Each of these
Usershave their email address validated in a separate
joinmethod is used to make the program wait for all
Threadsto complete. If one
Threadwere to error out, some rescue and retry logic could be added to help.
Functionally adequate, this code should iterate over the entire set of
User records, 30 at a time, and update them in parallel.
However, actually running this code will point out a very serious limitation.
Note: The GIL in MRI Ruby prevents this code from ever running truly parallel but since this code is input/output constrained, it does not make a difference.
Moments after the above code runs for the first time, this error is seen:
ActiveRecord::ConnectionTimeoutError: could not obtain a database connection within 5.000 seconds
As it turns out, each
Thread spawned had allocated its own connection to the database via
ActiveRecord. Then, even after the
Thread completed its work, the connection was not relinquished. Stale, idle connections began to accumulate within the Ruby process and before even 1000
User objects could be processed, the connection pool was empty.
Presumably, right after the
self.validate_emails method completed for a batch of
Users, the process should have released those connections to be used for either the next batch of
Threads or other processes.
Since this liberation of database connections did not happen automatically, it seems that we must intervene.
Pool Full of Connections, then We Dive in it
ActiveRecord handle connection allocation and deallocation, the with_connection block is used.
with_connection is a method that
yields a connection to a block, then returns that same connection to the pool when the block has completed.
Basically, it ensures that the life cycle for a single database connection is only active within the block provided.
To make sure this solution will work, it is imperative that the database connection pool in either
database.yml or in a connection initializer is set high enough.
An example database.yml:
production: adapter: postgresql database: production_database pool: 50
With a pool large enough, a
with_connection block can be inserted around the pertinent code:
class UserEmailValidator def self.run User.where(validated: false) .find_in_batches(batch_size: 30) do |user_batch| validate_emails(user_batch) end end def self.validate_emails(user_batch) threads = user_batch.map do |user| Thread.new do ActiveRecord::Base.connection_pool.with_connection do email_validator = EmailService.new(user.email) email_validator.validate user.email = nil if email_validator.invalid? user.validated = true user.save! end end end threads.each(&:join) end end
Running this very deliberate connection handling process now ensures that no more than 30 connections are claimed. Each
Thread will connect to the database with one single connection and the
with_connection block will return that connection to the predefined pool upon completion.
With the addition of this necessary connection pruning, the great
User email validation effort can complete. Depending on the size of the data set and machines available, the 30 per batch limit could be easily increased, resulting in greater validation speed.
database.yml configuration should have a pool size of equal to or greater than the number of
Threads spawned. If 100
User objects needed to be processed at once, the pool should be
Handling database connections and using raw
Threads this explicitly can make some developers uncomfortable. Luckily, a few great libraries have been written to alleviate this problem.
In a previous post, EM Synchrony was utilized to make parallel HTTP requests. It also has the ability to parallelize
ActiveRecord reading and writing without consuming excess database connections.
Another helpful library quickly becoming the most popular concurrency library is concurrent-ruby. The
concurrent-ruby library provides convenient models for dealing with futures, promises and other parallelization data structures. However, the same
ActiveRecord connections issues will occur with concurrent ruby objects as with plain old
Additionally, this code has been written for the sake of making a point. There are many ways to solve a problem and this is simply one of them.