もりはやメモφ(・ω・ )

インフラなエンジニアからSREへ

Embulkでinputするzipファイルをデコードする2つの方法

Embulkではpluginにより様々な形式のファイルをimputとして扱うことができます。今回はzipファイルを解凍(decorde)する二つの方法を紹介します。

圧縮されたファイルを解凍するプラグインは List of Plugins by Categoryのfile-decoderで紹介されています。

その中で、zipファイルに対応しているのは以下の2つです。

利用の仕方

それぞれの利用方法を紹介します。

embulk-decoder-commons-compres

"Commons Compress decoder plugin for Embulk" はその名前の通り、一般的な圧縮形式の解凍が可能なプラグインです。具体的には以下に対応しています(READMEに記載されています)

  • Archive format:
    • ar, cpio, jar, tar, zip
  • compress format:
    • bzip2, deflate, gzip
  • solid compression format: (format句でどの形式化の設定も必要)
    • tgz, tar.gz
    • tbz, tbz2, tb2, tar.bz2
    • taz, tz, tar.Z

実際に私が使用したコードを記載します。前提条件は以下になります。

  • sftpでリモートのファイルを取得
  • zipファイル
  • csv形式
in:
  type: sftp
  host: {{ env.SFTP_HOST }}
  user: {{ env.SFTP_USER }}
  password: {{ env.SFTP_PASSWORD }}
  user_directory_is_root: {{ env.SFTP_USER_DIRECTORY_IS_ROOT }}
  timeout: {{ env.SFTP_TIMEOUT }}
  path_prefix: {{ env.SFTP_PATH_PREFIX }}
  path_match_pattern: {{ env.SFTP_PATH_MATCH_PATTERN }}
  decoders:
    - type: commons-compress
  parser:
    charset: UTF-8
    newline: CRLF
    type: csv
    delimiter: ','
    quote: '"'
    escape: '"'
    trim_if_not_quoted: false
    skip_header_lines: 1
    allow_extra_columns: false
    allow_optional_columns: false
    columns:
    - {name: Name, type: string}
    .....

当初問題なく稼働していたのですが、後述する理由で embulk-decoder-command プラグインに切り替えを行いました。

embulk-decoder-command

"embulk-decoder-command plugin for Embulk" はその名前の通り、embulkが実行されるホスト上のコマンドを利用して、あらゆる圧縮形式の解凍が可能なプラグインです。ホスト側さえ設定すれば、なんだって対応できるというのは魅力的です。

READMEに分かりやすいイメージが記載されているので引用します。 以下の例はターミナルのシェルコマンドでパイプラインでデータを繋ぐように、lzop*1という圧縮ツールを呼び出して、embulkがimputしたデータを解凍していくイメージを示しています。

$ embulk-input-plugin | lzop -dc | embulk-filter-plugin | ...
                        ^^^^^^^^
                         (here)

実際のコードを記載します。前提条件は先ほどと変わらず以下です。

  • sftpでリモートのファイルを取得
  • zipファイル
  • csv形式

ここでのポイントとして、zipファイルの解凍を行うのに gunzip コマンドを利用している点です。私もこの設定を行うまで知りませんでしたが、 gunzip.gzip だけではなく .zip も解凍できるのです。

逆にzipファイルの解凍を行う一般的なコマンド unzip では、上手く動作しませんでした。 これはpluginの仕様で、stdinからの該当のコマンドにデータが渡されるが、stdinからのデータ入力に unzip が対応していないためだと想定しています。

in:
  type: sftp
  host: {{ env.SFTP_HOST }}
  user: {{ env.SFTP_USER }}
  password: {{ env.SFTP_PASSWORD }}
  user_directory_is_root: {{ env.SFTP_USER_DIRECTORY_IS_ROOT }}
  timeout: {{ env.SFTP_TIMEOUT }}
  path_prefix: {{ env.SFTP_PATH_PREFIX }}
  path_match_pattern: {{ env.SFTP_PATH_MATCH_PATTERN }}
  decoders:
    - type: command
      command: gunzip
  parser:
    charset: UTF-8
    newline: CRLF
    type: csv
    delimiter: ','
    quote: '"'
    escape: '"'
    trim_if_not_quoted: false
    skip_header_lines: 1
    allow_extra_columns: false
    allow_optional_columns: false
    columns:
    - {name: Journey_Name, type: string}
    .....

プラグインを切り替えた訳

当初embulk-decoder-commons-compresを利用していましたが、問題が起きたためembulk-decoder-commandに変更した理由を書きます。

理由はツイートの通りで、

  • zip圧縮されたcsvファイルをsftp経由で取得
  • zipの中のcsvは1つのみである

という条件においても、以下の問題が発生するようになりました。

  • なぜかembulkで扱うと2ファイルに分割されてしまう

これは取得したcsvをpythonで処理する際に問題となりまして、なんとかする必要が出てきました。*2

そのため上記のTweetを行ったところ、embulk-input-sftp pluginの開発者である @oreradio 氏やdigdag/embulk界隈で著名な @hiroysato 氏が反応してくださり、embulk-decoder-commons-compresが影響しているとの助言を頂きました。

結果としてembulk-decoder-commons-compresからembulk-decoder-commandにdecoder処理を変更したことで、問題は解決されました。

根本原因については確定できていませんが、対象のzip圧縮されたcsvファイルの文字コードが悪さをしていたと想定しています。*3

ご支援いただいたお二人に感謝します。

*1:lzop is one of the fastest compressor and decompressor around. - https://www.lzop.org

*2:なお該当の処理については会社ブログに書いています

*3:実装から6ヶ月は問題なく、変更点としてそれくらいしか無いため