Skip to content

Rails 8のSolid Queueで実現するジョブキューの新しいアプローチ

はじめに

Rails 8で導入されたSolid Queueは、従来のRedisやmemcachedに依存しない、データベースベースのジョブキューシステムです。外部インフラの管理を簡素化し、トランザクションの整合性を保ちながら、高性能なバックグラウンドジョブ処理を実現します。

従来の課題

ruby
# 従来のSidekiq + Redisの構成
# config/application.rb
config.active_job.queue_adapter = :sidekiq

# Redis接続設定
Sidekiq.configure_server do |config|
  config.redis = { url: ENV['REDIS_URL'] }
end

問題点:

  • Redis/memcachedの追加インフラが必要
  • トランザクション外でのジョブエンキューによる整合性問題
  • 開発環境での設定の複雑さ

Solid Queueの導入

1. インストールと設定

bash
# Rails 8では標準で含まれていますが、明示的に追加する場合
# Gemfile
gem "solid_queue"

# 初期設定
rails generate solid_queue:install
rails db:migrate

2. 基本設定

ruby
# config/application.rb
config.active_job.queue_adapter = :solid_queue

# config/environments/production.rb
config.solid_queue.connects_to = { database: { writing: :primary } }

3. キューの定義

yaml
# config/queue.yml
production:
  dispatchers:
    - polling_interval: 1
      batch_size: 500
  workers:
    - queues: "default,low_priority"
      threads: 3
    - queues: "high_priority"
      threads: 5
      processes: 2

実践的な使用例

1. 基本的なジョブの作成

ruby
# app/jobs/email_notification_job.rb
class EmailNotificationJob < ApplicationJob
  queue_as :default
  
  def perform(user_id, template)
    user = User.find(user_id)
    NotificationMailer.send_email(user, template).deliver_now
  end
end

# ジョブの実行
EmailNotificationJob.perform_later(user.id, "welcome")

2. トランザクション内でのエンキュー

ruby
# app/models/user.rb
class User < ApplicationRecord
  after_create_commit :send_welcome_email
  
  private
  
  def send_welcome_email
    # トランザクションコミット後に確実にジョブが実行される
    EmailNotificationJob.perform_later(id, "welcome")
  end
end

# または明示的なトランザクション
User.transaction do
  user = User.create!(email: "user@example.com")
  # userの作成が成功した場合のみジョブがエンキューされる
  WelcomeEmailJob.perform_later(user.id)
end

3. 優先度とスケジュール機能

ruby
# 優先度の設定
class HighPriorityJob < ApplicationJob
  queue_as :high_priority
  
  def perform(data)
    # 緊急性の高い処理
    process_urgent_task(data)
  end
end

# スケジュール実行
class ReportGenerationJob < ApplicationJob
  def perform(report_id)
    report = Report.find(report_id)
    report.generate!
  end
end

# 5分後に実行
ReportGenerationJob.set(wait: 5.minutes).perform_later(report.id)

# 特定時刻に実行
ReportGenerationJob.set(wait_until: Date.tomorrow.noon).perform_later(report.id)

4. リトライとエラーハンドリング

ruby
class DataProcessingJob < ApplicationJob
  queue_as :default
  retry_on StandardError, wait: :exponentially_longer, attempts: 5
  discard_on ActiveJob::DeserializationError
  
  def perform(data_id)
    data = ProcessingData.find(data_id)
    
    begin
      process_complex_data(data)
    rescue CustomAPIError => e
      # カスタムエラーの場合は即座に失敗とする
      logger.error "API Error: #{e.message}"
      raise ActiveJob::DeserializationError
    end
  end
  
  private
  
  def process_complex_data(data)
    # 複雑な処理ロジック
    ExternalAPI.process(data.payload)
    data.update!(status: :processed)
  end
end

高度な機能

1. バッチジョブの実装

ruby
class BatchProcessingJob < ApplicationJob
  queue_as :batch_processing
  
  def perform(batch_id, item_ids)
    batch = ProcessingBatch.find(batch_id)
    
    item_ids.each_slice(100) do |chunk|
      chunk.each do |item_id|
        process_item(item_id)
      end
      
      # 進捗の更新
      batch.increment!(:processed_count, chunk.size)
    end
    
    batch.update!(status: :completed) if batch.all_processed?
  end
end

# バッチの作成とエンキュー
def enqueue_batch_processing(items)
  batch = ProcessingBatch.create!(
    total_count: items.size,
    status: :pending
  )
  
  # アイテムを適切なサイズに分割
  items.each_slice(1000) do |chunk|
    BatchProcessingJob.perform_later(batch.id, chunk.map(&:id))
  end
  
  batch
end

2. ジョブの監視とメトリクス

ruby
# app/models/job_monitor.rb
class JobMonitor
  def self.queue_stats
    {
      pending: SolidQueue::Job.pending.count,
      running: SolidQueue::Job.running.count,
      failed: SolidQueue::Job.failed.count,
      completed_today: SolidQueue::Job.where(
        finished_at: Date.current.all_day
      ).count
    }
  end
  
  def self.slow_jobs(threshold = 1.minute)
    SolidQueue::Job.where(
      "finished_at - scheduled_at > ?", threshold
    ).includes(:arguments)
  end
end

# 管理画面での表示
def admin_dashboard
  @queue_stats = JobMonitor.queue_stats
  @slow_jobs = JobMonitor.slow_jobs
end

3. カスタムワーカー設定

ruby
# config/solid_queue.yml
production:
  dispatchers:
    - polling_interval: 1
      batch_size: 500
      concurrency_maintenance_interval: 600
  
  workers:
    # 高優先度キュー専用ワーカー
    - queues: "critical"
      threads: 1
      processes: 3
      
    # 通常処理用ワーカー
    - queues: "default,low_priority"
      threads: 3
      processes: 2
      
    # バッチ処理専用ワーカー
    - queues: "batch_processing"
      threads: 1
      processes: 1

パフォーマンスの最適化

1. データベースインデックスの追加

ruby
# db/migrate/add_solid_queue_indexes.rb
class AddSolidQueueIndexes < ActiveRecord::Migration[8.0]
  def change
    add_index :solid_queue_jobs, [:queue_name, :finished_at]
    add_index :solid_queue_jobs, [:scheduled_at], where: "finished_at IS NULL"
    add_index :solid_queue_jobs, [:created_at]
  end
end

2. 設定の調整

ruby
# config/environments/production.rb
config.solid_queue.silence_polling = true
config.solid_queue.supervisor_pidfile = Rails.root.join("tmp", "pids", "solid_queue_supervisor.pid")

# ワーカープロセスの最適化
config.solid_queue.default_concurrency = Rails.env.production? ? 3 : 1

3. メモリ使用量の監視

ruby
class MemoryMonitorJob < ApplicationJob
  def perform
    memory_usage = `ps -o rss= -p #{Process.pid}`.to_i
    
    if memory_usage > 500_000  # 500MB
      Rails.logger.warn "High memory usage detected: #{memory_usage}KB"
      
      # 必要に応じてワーカーを再起動
      if memory_usage > 1_000_000  # 1GB
        Process.kill("TERM", Process.pid)
      end
    end
  end
end

# 定期実行
MemoryMonitorJob.set(wait: 5.minutes).perform_later

運用のベストプラクティス

1. ログ設定

ruby
# config/environments/production.rb
config.solid_queue.logger = Logger.new(Rails.root.join("log", "solid_queue.log"))
config.solid_queue.logger.level = Logger::INFO

2. 健全性チェック

ruby
# app/controllers/health_controller.rb
class HealthController < ApplicationController
  def check
    queue_health = check_queue_health
    
    if queue_health[:healthy]
      render json: { status: "healthy", details: queue_health }
    else
      render json: { status: "unhealthy", details: queue_health }, status: :service_unavailable
    end
  end
  
  private
  
  def check_queue_health
    pending_jobs = SolidQueue::Job.pending.count
    old_jobs = SolidQueue::Job.where("created_at < ?", 1.hour.ago).pending.count
    
    {
      healthy: old_jobs < 100,
      pending_jobs: pending_jobs,
      old_pending_jobs: old_jobs,
      workers_count: SolidQueue::Process.where(kind: "Worker").count
    }
  end
end

まとめ

Solid Queueは、Rails 8の大きな革新の一つです。外部依存を削減し、データベースの一貫性を保ちながら、スケーラブルなバックグラウンドジョブ処理を実現します。

主な利点:

  • インフラの簡素化
  • トランザクション整合性の保証
  • 高性能なジョブ処理
  • 豊富な監視・管理機能

従来のRedisベースのソリューションからの移行を検討している場合は、段階的に導入し、パフォーマンスとリソース使用量を監視しながら最適化を進めることをお勧めします。

AI が自動生成した技術記事をまとめたテックブログ