Embulkではpluginにより様々な形式のファイルをimputとして扱うことができます。今回はzipファイルを解凍(decorde)する二つの方法を紹介します。
圧縮されたファイルを解凍するプラグインは List of Plugins by Categoryのfile-decoderで紹介されています。
その中で、zipファイルに対応しているのは以下の2つです。
利用の仕方
それぞれの利用方法を紹介します。
embulk-decoder-commons-compres
"Commons Compress decoder plugin for Embulk" はその名前の通り、一般的な圧縮形式の解凍が可能なプラグインです。具体的には以下に対応しています(READMEに記載されています)
- Archive format:
- ar, cpio, jar, tar, zip
- compress format:
- bzip2, deflate, gzip
- solid compression format: (format句でどの形式化の設定も必要)
- tgz, tar.gz
- tbz, tbz2, tb2, tar.bz2
- taz, tz, tar.Z
実際に私が使用したコードを記載します。前提条件は以下になります。
- sftpでリモートのファイルを取得
- zipファイル
- csv形式
in: type: sftp host: {{ env.SFTP_HOST }} user: {{ env.SFTP_USER }} password: {{ env.SFTP_PASSWORD }} user_directory_is_root: {{ env.SFTP_USER_DIRECTORY_IS_ROOT }} timeout: {{ env.SFTP_TIMEOUT }} path_prefix: {{ env.SFTP_PATH_PREFIX }} path_match_pattern: {{ env.SFTP_PATH_MATCH_PATTERN }} decoders: - type: commons-compress parser: charset: UTF-8 newline: CRLF type: csv delimiter: ',' quote: '"' escape: '"' trim_if_not_quoted: false skip_header_lines: 1 allow_extra_columns: false allow_optional_columns: false columns: - {name: Name, type: string} .....
当初問題なく稼働していたのですが、後述する理由で embulk-decoder-command プラグインに切り替えを行いました。
embulk-decoder-command
"embulk-decoder-command plugin for Embulk" はその名前の通り、embulkが実行されるホスト上のコマンドを利用して、あらゆる圧縮形式の解凍が可能なプラグインです。ホスト側さえ設定すれば、なんだって対応できるというのは魅力的です。
READMEに分かりやすいイメージが記載されているので引用します。 以下の例はターミナルのシェルコマンドでパイプラインでデータを繋ぐように、lzop*1という圧縮ツールを呼び出して、embulkがimputしたデータを解凍していくイメージを示しています。
$ embulk-input-plugin | lzop -dc | embulk-filter-plugin | ... ^^^^^^^^ (here)
実際のコードを記載します。前提条件は先ほどと変わらず以下です。
- sftpでリモートのファイルを取得
- zipファイル
- csv形式
ここでのポイントとして、zipファイルの解凍を行うのに gunzip
コマンドを利用している点です。私もこの設定を行うまで知りませんでしたが、 gunzip
は .gzip
だけではなく .zip
も解凍できるのです。
逆にzipファイルの解凍を行う一般的なコマンド unzip
では、上手く動作しませんでした。
これはpluginの仕様で、stdinからの該当のコマンドにデータが渡されるが、stdinからのデータ入力に unzip
が対応していないためだと想定しています。
in: type: sftp host: {{ env.SFTP_HOST }} user: {{ env.SFTP_USER }} password: {{ env.SFTP_PASSWORD }} user_directory_is_root: {{ env.SFTP_USER_DIRECTORY_IS_ROOT }} timeout: {{ env.SFTP_TIMEOUT }} path_prefix: {{ env.SFTP_PATH_PREFIX }} path_match_pattern: {{ env.SFTP_PATH_MATCH_PATTERN }} decoders: - type: command command: gunzip parser: charset: UTF-8 newline: CRLF type: csv delimiter: ',' quote: '"' escape: '"' trim_if_not_quoted: false skip_header_lines: 1 allow_extra_columns: false allow_optional_columns: false columns: - {name: Journey_Name, type: string} .....
プラグインを切り替えた訳
当初embulk-decoder-commons-compresを利用していましたが、問題が起きたためembulk-decoder-commandに変更した理由を書きます。
理由はツイートの通りで、
- zip圧縮されたcsvファイルをsftp経由で取得
- zipの中のcsvは1つのみである
という条件においても、以下の問題が発生するようになりました。
- なぜかembulkで扱うと2ファイルに分割されてしまう
Embulkで
— もりはや (@morihaya55) July 9, 2019
```
exec:
max_threads: 1
min_output_tasks: 1
```
を設定していても
```
Using local thread executor with max_threads=1 / tasks=2
```
となるのは何故なんだろうか?
tasks=2 ….
これは取得したcsvをpythonで処理する際に問題となりまして、なんとかする必要が出てきました。*2
そのため上記のTweetを行ったところ、embulk-input-sftp pluginの開発者である @oreradio 氏やdigdag/embulk界隈で著名な @hiroysato 氏が反応してくださり、embulk-decoder-commons-compresが影響しているとの助言を頂きました。
embulk-input-sftp書いたの自分ですけど、そうはならないと思いますよ。Decoderにcommon-compress使ってるなら、https://t.co/n6gcFpraBi()でFileInputをnewしてる辺りが怪しい気がします。decoder外して再現しないならdecoderが原因だと思います。 https://t.co/fknNujm9oW
— Satoshi Akama (@oreradio) July 10, 2019
結果としてembulk-decoder-commons-compresからembulk-decoder-commandにdecoder処理を変更したことで、問題は解決されました。
根本原因については確定できていませんが、対象のzip圧縮されたcsvファイルの文字コードが悪さをしていたと想定しています。*3
ご支援いただいたお二人に感謝します。
*1:lzop is one of the fastest compressor and decompressor around. - https://www.lzop.org
*3:実装から6ヶ月は問題なく、変更点としてそれくらいしか無いため