MAGAZINE

ルーターマガジン

インフラ/運用

fluentdを用いたMySQLからPostgreSQLへのデータ移行方法

2024.08.30
Pocket

こちらの記事でpgloaderというツールを利用した、MySQLからPostgreSQLにデータを移行させる方法をご紹介しましたが、新しくfluentdというツールでもMySQLからPostgreSQLへのデータ移行ができたのでご紹介いたします。

大きな違いとしては、pgloaderでは、ある時点でのMySQLデータを、1回でPostgreSQLへデータを移行するのに対し、fluentdは、デーモンとして動かすことで、新しくインサートされたデータも、同期することができ、MySQLからPostgreSQLへ簡易的なレプリケーションを構築することができます。

fluentd自体もrubyのライブラリですが、fluentd用のライブラリも有志により数多く作成されており、その中でも今回はfluent-plugin-sqlというライブラリを利用しています。

fluent-plugin-sql

環境構築

MySQLとPostgreSQLは構築済み前提です。(docker内でも外でもどちらでも大丈夫です。)

こちらの記事のテストデータを使って検証するため、必要な方は、MySQLのテーブル作成、CSVデータ読み込みまで、PostgreSQLはテーブルの作成まで終わらせてください。

fluentdはrubyのgemなので、rubyさえ入っていれば、ライブラリの追加だけで環境構築が終わるため、非常に簡単です。

ググるとdocker環境を用意する方法もでますが、docker内でfluentd環境を構築するのも面倒で、docker内のfluentdからMySQL、PostgreSQLへの接続もdockerになれていないとかなり苦労すると思います。

よってここでおすすめするのは、docker上ではなく、直接ローカルマシン環境にrubyをインストールしてfluentdをインストールする方法です。

rubyのインストール方法は省略しますが、おすすめはrbenvを使ってインストールする方法です。 gemの管理もbundlerで行います、こちらも省略します。 検証環境は以下のバージョンで行いました。

$ ruby -v
ruby 3.2.2 (2023-03-30 revision e51014f9c0) [x86_64-linux]

$ gem list |grep bundler
bundler (2.5.6, default: 2.4.10)

同じ環境であれば以下のGemfileを使うことですぐにbundle installできるはずです。

$ cat Gemfile
# frozen_string_literal: true

source 'https://rubygems.org'

gem 'fluentd'
gem 'pg'
gem 'activerecord'
gem 'activerecord-import'
gem 'mysql2', '>= 0.3.12b4'
gem 'fluent-plugin-sql'

bundle installまで完了すればもう環境構築は完了です。

MySQLからPostgreSQLデータ移行

今回は以下のようなconfファイルを用意しました。

$ cat fluent.conf
<source>
  @type sql

  # host host.docker.internal
  host localhost
  port 3306
  database enterprises
  adapter mysql2
  username root

  tag_prefix my.rdb  # optional, but recommended

  select_interval 1s  # optional
  select_limit 5000     # optional

  # state_file /var/run/fluentd/sql_state
  state_file /home/toyama/fluentd/sql_state

  <table>
    table enterprises
    tag table1  # optional
    update_column sequenceNumber
    # time_column time_col2  # optional
  </table>

  # <table>
  #   table table2
  #   tag table2  # optional
  #   update_column updated_at
  #   time_column updated_at  # optional
  #   time_format %Y-%m-%d %H:%M:%S.%6N # optional
  # </table>

  # detects all tables instead of <table> sections
  # all_tables
</source>

# <filter **>
#   @type stdout
# </filter>

<match my.rdb.*>
  @type sql
  host 127.0.0.1
  port 5432
  database enterprises
  adapter postgresql
  username postgres
  # password mypassword
  # socket path_to_socket
  # remove_tag_prefix my.rdb # optional, dual of tag_prefix in in_sql
  <table>
    table enterprises
    column_mapping 'sequenceNumber:sequenceNumber,corporateNumber:corporateNumber,process:process,correct:correct,updateDate:updateDate,changeDate:changeDate,name:name,nameImageId:nameImageId,kind:kind,prefectureName:prefectureName,cityName:cityName,streetNumber:streetNumber,addressImageId:addressImageId,prefectureCode:prefectureCode,cityCode:cityCode,postCode:postCode,addressOutside:addressOutside,addressOutsideImageId:addressOutsideImageId,closeDate:closeDate,closeCause:closeCause,successorCorporateNumber:successorCorporateNumber,changeCause:changeCause,assignmentDate:assignmentDate,latest:latest,enName:enName,enPrefectureName:enPrefectureName,enCityName:enCityName,enAddressOutside:enAddressOutside,furigana:furigana,hihyoji:hihyoji'
    # This is the default table because it has no "pattern" argument in <table>
    # The logic is such that if all non-default <table> blocks
    # do not match, the default one is chosen.
    # The default table is required.
  </table>
</match>

MySQLとPostgreSQLの接続設定は、適宜環境に合わせてください。

select_intervalは、何秒間隔で監視するか、select_limitは1回に何レコードずつ同期するかを指定するパラメータです、 間隔が短く、レコード数が多い程、負荷は高まるので、環境に合わせて設定してください。

MySQL側のtable項目内のupdate_columnでレコードが増えているかを判断するカラムを指定します。

githubにも書いてあるとおり、このカラムをwhere文で指定するため、レコード数が多く、インデックスが貼っていない場合、クエリがとても重くなるので、必ずインデックスを貼ってください。 今回は、primary keyであるsequenceNumberを指定します。

state_fileは、どこまで同期したかを管理するファイルです、初期設定だと存在しないフォルダになってたので、適宜環境に合わせた場所を指定してください。

PostgreSQL側のテーブル設定のcolumn_mappingは、移行元のカラム名:移行先のカラム名で定義します。自由に設定できるので、移行元から必要なカラムのデータだけ持ってくることも可能です。 今回は、すべてのカラムを追加しています。

confファイルが作成できたら、以下のコマンドでfluentdを動かします。

bundle exec fluentd -c fluent.conf

fluentd運用上の注意点

しばらくしてsql_stateファイルが作成され、中にlast_recordsの情報が入ります。

Ctrl + Cで同期処理を停止できますが、当方の環境では、MySQLのselectに対して、PostgreSQLのインサートが間に合っていないようで、完全停止するまでに時間がかかっていました。select_interval, select_limitは環境に合わせないと失敗に繋がりそうです。

完全に停止したあとにPostgreSQLのテーブルを確認すると、sql_stateのlast_recordsまで同期がされていました。

fluentdを利用した簡易レプリケーションではupdate_columnで指定したカラムで、sql_stateファイル内のレコードより新しいレコードがMySQL側で検知された場合にデータが同期されます。そのため、updateやdeleteによる更新検知はできないので、insertのみの簡易レプリケーションであることはご注意下さい。

Pocket

CONTACT

お問い合わせ・ご依頼はこちらから