Ruby Threads and ActiveRecord Connections

Processing large data sets is a common problem faced by many production web applications. One solution is to divide the work amongst multiple processes and have each responsible for a single or significantly smaller batch of data. However, this solution is not without its problems. Machine provisioning limitations or financial barriers may invalidate this solution for a very large N.

Within the same vein as “divide and conquer” exists another solution, one which requires far fewer parallel processes: Threads. In the Ruby programming language, a Thread is a built-in object for concurrent programming.

Unlike independent processes, all Ruby Threads within the same process share memory, enabling each individual Thread to consume or process objects and elements from the same data store.

For this example, a database will be queried, results manipulated and finally returned to same database via ActiveRecord.

First Pass

Given a User model backed by a simple users table:

class User < ActiveRecord::Base
end
class CreateUsersTable < ActiveRecord::Migration
  def change
    create_table :users do |t|
      t.string :first_name
      t.string :last_name
      t.string :email
      t.boolean :validated, default: false
      t.timestamps null: false
    end
  end
end

The problem to solve is fairly straight forward:

All User records that are not already validated should be fetched and an external API hit with their email address for validation, then saved. If a User's email address is not valid, it should be removed.

Fast forwarding through time, it can be assumed that a completely serial solution has been written and deemed unsatisfactory. Then, during a second iteration, a bit of concurrent code was written:

class UserEmailValidator
  def self.run
    User.where(validated: false)
        .find_in_batches(batch_size: 30) do |user_batch|
          validate_emails(user_batch)
        end
  end

  def self.validate_emails(user_batch)
    threads = user_batch.map do |user|
      Thread.new do
        email_validator = EmailService.new(user.email)
        email_validator.validate

        user.email = nil if email_validator.invalid?

        user.validated = true

        user.save!
      end
    end

    threads.each(&:join)
  end
end

The important pieces of this code are:

  1. find_in_batches(batch_size: 30)

    To cut down on memory allocation, only 30 Users are fetched at one time. Each of these Users have their email address validated in a separate Thread.

  2. threads.map(&:join)

    The join method is used to make the program wait for all Threads to complete. If one Thread were to error out, some rescue and retry logic could be added to help.

Functionally adequate, this code should iterate over the entire set of User records, 30 at a time, and update them in parallel.

However, actually running this code will point out a very serious limitation.

Note: The GIL in MRI Ruby prevents this code from ever running truly parallel but since this code is input/output constrained, it does not make a difference.

Connection Accumulation

Moments after the above code runs for the first time, this error is seen:

ActiveRecord::ConnectionTimeoutError:
  could not obtain a database connection within 5.000 seconds

As it turns out, each Thread spawned had allocated its own connection to the database via ActiveRecord. Then, even after the Thread completed its work, the connection was not relinquished. Stale, idle connections began to accumulate within the Ruby process and before even 1000 User objects could be processed, the connection pool was empty.

Presumably, right after the self.validate_emails method completed for a batch of Users, the process should have released those connections to be used for either the next batch of Threads or other processes.

Since this liberation of database connections did not happen automatically, it seems that we must intervene.

Pool Full of Connections, then We Dive in it

To help ActiveRecord handle connection allocation and deallocation, the with_connection block is used. with_connection is a method that yields a connection to a block, then returns that same connection to the pool when the block has completed.

Basically, it ensures that the life cycle for a single database connection is only active within the block provided.

To make sure this solution will work, it is imperative that the database connection pool in either database.yml or in a connection initializer is set high enough.

An example database.yml:

production:
  adapter: postgresql
  database: production_database
  pool: 50

With a pool large enough, a with_connection block can be inserted around the pertinent code:

class UserEmailValidator
  def self.run
    User.where(validated: false)
        .find_in_batches(batch_size: 30) do |user_batch|
          validate_emails(user_batch)
        end
  end

  def self.validate_emails(user_batch)
    threads = user_batch.map do |user|
      Thread.new do
        ActiveRecord::Base.connection_pool.with_connection do
          email_validator = EmailService.new(user.email)
          email_validator.validate

          user.email = nil if email_validator.invalid?

          user.validated = true

          user.save!
        end
      end
    end

    threads.each(&:join)
  end
end

Running this very deliberate connection handling process now ensures that no more than 30 connections are claimed. Each Thread will connect to the database with one single connection and the with_connection block will return that connection to the predefined pool upon completion.

With the addition of this necessary connection pruning, the great User email validation effort can complete. Depending on the size of the data set and machines available, the 30 per batch limit could be easily increased, resulting in greater validation speed.

The database.yml configuration should have a pool size of equal to or greater than the number of Threads spawned. If 100 User objects needed to be processed at once, the pool should be >= 100.

Alternatives

Handling database connections and using raw Threads this explicitly can make some developers uncomfortable. Luckily, a few great libraries have been written to alleviate this problem.

In a previous post, EM Synchrony was utilized to make parallel HTTP requests. It also has the ability to parallelize ActiveRecord reading and writing without consuming excess database connections.

Another helpful library quickly becoming the most popular concurrency library is concurrent-ruby. The concurrent-ruby library provides convenient models for dealing with futures, promises and other parallelization data structures. However, the same ActiveRecord connections issues will occur with concurrent ruby objects as with plain old Threads.

Additionally, this code has been written for the sake of making a point. There are many ways to solve a problem and this is simply one of them.