Embulk MySQLデータ転送を試してみた
Embulkを使ってMySQLのデータ転送をやってみよう。
MySQLをインストールするとき、サンプルデータベース(sakila)をインストールしたので、sakilaにあるactorテーブルのデータを、別のテーブルに移してみよう。
と、その前にまずは Embulk のMySQLプラグイン(embulk-input-mysql,embulk-output-mysql)をインストールする。
Windows環境なので、コマンドプロンプトを起動しEmbulkコマンドを実行。
とプラグインのインストール成功
Linuxなら
$java -jar embulk.jar gem install embulk-input-mysql embulk-output-mysql
次に、
MySQL用のテストYMLを作成(エディタで以下のYMLファイルを作成)
mysql_test_ins_actor_test.yml
-------------------------------------------
in:
type: mysql
host: localhost
user: user
password: user_pass
database: sakila
table: actor
select: "*"
out:
type: mysql
host: localhost
user: user
password: user_pass
database: sakila
table: actor_test
mode: insert
-------------------------------------------
out側のtable:actor_test は存在しないテーブルしかし、「MySQL output plugins for Embulk」README.md に、If the target table doesn't exist, it is created automatically.とあり、ない場合は処理時に作成してくれるそうだ。
とりあえず、作成したmysql_test_ins_actor.ymlをプレビューコマンド(preview)で試してみる。
embulk preview mysql_test_ins_actor.yml
in側の処理は少なくともうまくいっていることが確認できたの
で実行(run)してみる。
embulk run mysql_test_ins_actor.yml
処理が正常に終了し、actor_testテーブルが作成され、データが転送されているが確認できました。
今回は、同一のデータベース(sakila)内で行いましたが、同MySQLサーバ上ですが別のデータベースに対しても同様に処理された事を確認。
この程度の簡単なお試し処理で意味がありませんが、並列処理されていることが実行情報より見てとれます。
[INFO] (0001:transaction): Using local thread executor with max_threads=8 / output tasks 4 = input tasks 1 * 4
出力(out:)をCSVファイルにすると、タスク数(プロセッサ数 :お試し用PCは4つのロジカルプロセッサ)分のCSVファイルが作成されます。
タスク数やスレッド数はオプションで設定できリーソースに合わせて処理の並列化が可能です。
次に、embulk-output-mysql のmergeモードを試してみるInsert/Updateで差分更新ができる様子。
差分更新用にMySQLにテーブルを作成
use test;
CREATE TABLE `actor_test` (
`actor_id` smallint(5) unsigned NOT NULL,
`first_name` varchar(45) NOT NULL,
`last_name` varchar(45) NOT NULL,
`last_update` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`actor_id`),
KEY `idx_actor_last_name` (`last_name`)
) ENGINE=InnoDB AUTO_INCREMENT=201 DEFAULT CHARSET=utf8;
差分更新(merge)用の処理をエディタにて作成
mysql_actor_test_sync.yml
-------------------------------------------
in:
type: mysql
host: localhost
user: kano
password: tkano
database: sakila
table: actor_test
select: "*"
out:
type: mysql
host: localhost
user: kano
password: tkano
database: test
table: actor_test
mode: merge
-------------------------------------------
事前にsakila.actor_testテーブルのデータをtes.actor_testテーブルに転送
MAX(sakila.actor_test.actor_id) = 200
MAX(test.actor_test.actor_id) = 200
差分(とりあえずactor_idで)がないことを確認。
差分更新用に、sakila.actor_test(in側)のデータを更新
insert into `sakila`.`actor_test` value(201,'test1','test1',now());
insert into `sakila`.`actor_test` value(202,'test2','test2',now());
insert into `sakila`.`actor_test` value(203,'test3','test3',now());
update `sakila`.`actor_test` first_name = 'HEART' where actor_id = 199 and first_name = 'JULIA';
delete from `sakila`.`actor_test` where actor_id = 200;
更新を確認したら差分更新処理を実行します。
embulk run mysql_actor_test_sync.yml
Insert、Updateについて差分更新が行われたことが確認できました。
delete(物理削除)は反映されずレコードが残っていることが確認できました。
0コメント