MAGAZINE
ルーターマガジン
fluentdを用いたMySQLからPostgreSQLへのデータ移行方法
こちらの記事でpgloaderというツールを利用した、MySQLからPostgreSQLにデータを移行させる方法をご紹介しましたが、新しくfluentdというツールでもMySQLからPostgreSQLへのデータ移行ができたのでご紹介いたします。
大きな違いとしては、pgloaderでは、ある時点でのMySQLデータを、1回でPostgreSQLへデータを移行するのに対し、fluentdは、デーモンとして動かすことで、新しくインサートされたデータも、同期することができ、MySQLからPostgreSQLへ簡易的なレプリケーションを構築することができます。
fluentd自体もrubyのライブラリですが、fluentd用のライブラリも有志により数多く作成されており、その中でも今回はfluent-plugin-sqlというライブラリを利用しています。
環境構築
MySQLとPostgreSQLは構築済み前提です。(docker内でも外でもどちらでも大丈夫です。)
こちらの記事のテストデータを使って検証するため、必要な方は、MySQLのテーブル作成、CSVデータ読み込みまで、PostgreSQLはテーブルの作成まで終わらせてください。
fluentdはrubyのgemなので、rubyさえ入っていれば、ライブラリの追加だけで環境構築が終わるため、非常に簡単です。
ググるとdocker環境を用意する方法もでますが、docker内でfluentd環境を構築するのも面倒で、docker内のfluentdからMySQL、PostgreSQLへの接続もdockerになれていないとかなり苦労すると思います。
よってここでおすすめするのは、docker上ではなく、直接ローカルマシン環境にrubyをインストールしてfluentdをインストールする方法です。
rubyのインストール方法は省略しますが、おすすめはrbenvを使ってインストールする方法です。 gemの管理もbundlerで行います、こちらも省略します。 検証環境は以下のバージョンで行いました。
$ ruby -v
ruby 3.2.2 (2023-03-30 revision e51014f9c0) [x86_64-linux]
$ gem list |grep bundler
bundler (2.5.6, default: 2.4.10)
同じ環境であれば以下のGemfileを使うことですぐにbundle installできるはずです。
$ cat Gemfile
# frozen_string_literal: true
source 'https://rubygems.org'
gem 'fluentd'
gem 'pg'
gem 'activerecord'
gem 'activerecord-import'
gem 'mysql2', '>= 0.3.12b4'
gem 'fluent-plugin-sql'
bundle installまで完了すればもう環境構築は完了です。
MySQLからPostgreSQLデータ移行
今回は以下のようなconfファイルを用意しました。
$ cat fluent.conf
<source>
@type sql
# host host.docker.internal
host localhost
port 3306
database enterprises
adapter mysql2
username root
tag_prefix my.rdb # optional, but recommended
select_interval 1s # optional
select_limit 5000 # optional
# state_file /var/run/fluentd/sql_state
state_file /home/toyama/fluentd/sql_state
<table>
table enterprises
tag table1 # optional
update_column sequenceNumber
# time_column time_col2 # optional
</table>
# <table>
# table table2
# tag table2 # optional
# update_column updated_at
# time_column updated_at # optional
# time_format %Y-%m-%d %H:%M:%S.%6N # optional
# </table>
# detects all tables instead of <table> sections
# all_tables
</source>
# <filter **>
# @type stdout
# </filter>
<match my.rdb.*>
@type sql
host 127.0.0.1
port 5432
database enterprises
adapter postgresql
username postgres
# password mypassword
# socket path_to_socket
# remove_tag_prefix my.rdb # optional, dual of tag_prefix in in_sql
<table>
table enterprises
column_mapping 'sequenceNumber:sequenceNumber,corporateNumber:corporateNumber,process:process,correct:correct,updateDate:updateDate,changeDate:changeDate,name:name,nameImageId:nameImageId,kind:kind,prefectureName:prefectureName,cityName:cityName,streetNumber:streetNumber,addressImageId:addressImageId,prefectureCode:prefectureCode,cityCode:cityCode,postCode:postCode,addressOutside:addressOutside,addressOutsideImageId:addressOutsideImageId,closeDate:closeDate,closeCause:closeCause,successorCorporateNumber:successorCorporateNumber,changeCause:changeCause,assignmentDate:assignmentDate,latest:latest,enName:enName,enPrefectureName:enPrefectureName,enCityName:enCityName,enAddressOutside:enAddressOutside,furigana:furigana,hihyoji:hihyoji'
# This is the default table because it has no "pattern" argument in <table>
# The logic is such that if all non-default <table> blocks
# do not match, the default one is chosen.
# The default table is required.
</table>
</match>
MySQLとPostgreSQLの接続設定は、適宜環境に合わせてください。
select_intervalは、何秒間隔で監視するか、select_limitは1回に何レコードずつ同期するかを指定するパラメータです、 間隔が短く、レコード数が多い程、負荷は高まるので、環境に合わせて設定してください。
MySQL側のtable項目内のupdate_columnでレコードが増えているかを判断するカラムを指定します。
githubにも書いてあるとおり、このカラムをwhere文で指定するため、レコード数が多く、インデックスが貼っていない場合、クエリがとても重くなるので、必ずインデックスを貼ってください。 今回は、primary keyであるsequenceNumberを指定します。
state_fileは、どこまで同期したかを管理するファイルです、初期設定だと存在しないフォルダになってたので、適宜環境に合わせた場所を指定してください。
PostgreSQL側のテーブル設定のcolumn_mappingは、移行元のカラム名:移行先のカラム名で定義します。自由に設定できるので、移行元から必要なカラムのデータだけ持ってくることも可能です。 今回は、すべてのカラムを追加しています。
confファイルが作成できたら、以下のコマンドでfluentdを動かします。
bundle exec fluentd -c fluent.conf
fluentd運用上の注意点
しばらくしてsql_stateファイルが作成され、中にlast_recordsの情報が入ります。
Ctrl + C
で同期処理を停止できますが、当方の環境では、MySQLのselectに対して、PostgreSQLのインサートが間に合っていないようで、完全停止するまでに時間がかかっていました。select_interval, select_limitは環境に合わせないと失敗に繋がりそうです。
完全に停止したあとにPostgreSQLのテーブルを確認すると、sql_stateのlast_recordsまで同期がされていました。
fluentdを利用した簡易レプリケーションではupdate_columnで指定したカラムで、sql_stateファイル内のレコードより新しいレコードがMySQL側で検知された場合にデータが同期されます。そのため、updateやdeleteによる更新検知はできないので、insertのみの簡易レプリケーションであることはご注意下さい。
CONTACT
お問い合わせ・ご依頼はこちらから