Embulk MySQLデータ転送を試してみた

Embulkを使ってMySQLのデータ転送をやってみよう。

MySQLをインストールするとき、サンプルデータベース(sakila)をインストールしたので、sakilaにあるactorテーブルのデータを、別のテーブルに移してみよう。

と、その前にまずは Embulk のMySQLプラグイン(embulk-input-mysql,embulk-output-mysql)をインストールする。

Windows環境なので、コマンドプロンプトを起動しEmbulkコマンドを実行。

embulk gem install embulk-input-mysql embulk-output-mysql

とプラグインのインストール成功

Linuxなら

$java -jar embulk.jar gem install embulk-input-mysql embulk-output-mysql

次に、

MySQL用のテストYMLを作成(エディタで以下のYMLファイルを作成)

mysql_test_ins_actor_test.yml

-------------------------------------------

in:

   type: mysql

   host: localhost

   user: user

   password: user_pass

   database: sakila

   table: actor

   select: "*"

out:

   type: mysql

   host: localhost

   user: user

   password: user_pass

   database: sakila

   table: actor_test

   mode: insert

-------------------------------------------

out側のtable:actor_test は存在しないテーブルしかし、「MySQL output plugins for EmbulkREADME.md に、If the target table doesn't exist, it is created automatically.とあり、ない場合は処理時に作成してくれるそうだ。

とりあえず、作成したmysql_test_ins_actor.ymlをプレビューコマンド(preview)で試してみる。

embulk preview mysql_test_ins_actor.yml

in側の処理は少なくともうまくいっていることが確認できたの

で実行(run)してみる。

embulk run  mysql_test_ins_actor.yml

処理が正常に終了し、actor_testテーブルが作成され、データが転送されているが確認できました。

今回は、同一のデータベース(sakila)内で行いましたが、同MySQLサーバ上ですが別のデータベースに対しても同様に処理された事を確認。

この程度の簡単なお試し処理で意味がありませんが、並列処理されていることが実行情報より見てとれます。

[INFO] (0001:transaction): Using local thread executor with max_threads=8 / output tasks 4 = input tasks 1 * 4

出力(out:)をCSVファイルにすると、タスク数(プロセッサ数 :お試し用PCは4つのロジカルプロセッサ)分のCSVファイルが作成されます。

タスク数やスレッド数はオプションで設定できリーソースに合わせて処理の並列化が可能です。

次に、embulk-output-mysqlmergeモードを試してみるInsert/Updateで差分更新ができる様子。

差分更新用にMySQLにテーブルを作成

use test;

CREATE TABLE `actor_test` (

   `actor_id` smallint(5) unsigned NOT NULL,

   `first_name` varchar(45) NOT NULL,

   `last_name` varchar(45) NOT NULL,

   `last_update` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,

   PRIMARY KEY (`actor_id`),

   KEY `idx_actor_last_name` (`last_name`)

) ENGINE=InnoDB AUTO_INCREMENT=201 DEFAULT CHARSET=utf8;


差分更新(merge)用の処理をエディタにて作成

mysql_actor_test_sync.yml

-------------------------------------------

in:

  type: mysql

   host: localhost

   user: kano

   password: tkano

   database: sakila

   table: actor_test

   select: "*"

out:

   type: mysql

   host: localhost

   user: kano

   password: tkano

   database: test

   table: actor_test

   mode: merge

-------------------------------------------

事前にsakila.actor_testテーブルのデータをtes.actor_testテーブルに転送

 MAX(sakila.actor_test.actor_id) = 200

 MAX(test.actor_test.actor_id) = 200

差分(とりあえずactor_idで)がないことを確認。


差分更新用に、sakila.actor_test(in側)のデータを更新

insert into `sakila`.`actor_test` value(201,'test1','test1',now());

insert into `sakila`.`actor_test` value(202,'test2','test2',now());

insert into `sakila`.`actor_test` value(203,'test3','test3',now());

update `sakila`.`actor_test` first_name = 'HEART' where actor_id = 199 and first_name = 'JULIA';

delete from `sakila`.`actor_test` where actor_id = 200;

更新を確認したら差分更新処理を実行します。

embulk run mysql_actor_test_sync.yml

Insert、Updateについて差分更新が行われたことが確認できました。

delete(物理削除)は反映されずレコードが残っていることが確認できました。


Han The World

お出かけ、食、読書、システム構築やプログラミングに関する徒然なるブログ

0コメント

  • 1000 / 1000