もりはやメモφ(・ω・ )

ITとか読書感想文とか

digdagで繰り返し処理をする時に使うOperators(loop , for_each , for_range)

最近はちょくちょくdigdagのdigファイルを書くことがあり、毎回ドキュメントを見ながら思い出している"繰り返し処理"について自分でまとめておきたいと思います。

はじめに

公式ドキュメントは以下です。繰り返し処理を行うloopなどはWorkfrlow control Operatorsというカテゴリの中に記載されています。

digdag Docs » Operators » Workflow control operators

現在、繰り返し処理を行えるOperatorは以下の3つが用意されています。

  • loop - 指定回数の処理を繰り返す
  • for_each - 与えた1つ以上のkey,valueの組み合わせの個数の処理を繰り返す
  • for_range - 範囲(開始値、終わり値)と増加値の3項目を指定し、範囲内で開始値に増加値を加算し、確保できる範囲の個数の処理を繰り返す

loop operator

loopは引数として整数を渡すことで、指定回数の処理を繰り返すことができるOperatorです。 暗黙的に変数 ${i} が0から開始され、処理を繰り返すたびに1ずつ増加されます。

# loop.dig
+check_loop:
  loop>: 3
  _do:
    +step1:
      echo>: i is ${i}

結果は以下の様になります。

echo>: i is 0 , echo>: i is 1 のようにiが増加していることが確認できます。 なんとなくarr変数で${arr[$i]}みたいに書きたくなりそうですが、digdagの構文としてはエラーになるのでできません。

# digdag run loop.dig  --rerun
2019-01-13 20:02:59 +0900: Digdag v0.9.27
2019-01-13 20:03:00 +0900 [WARN] (main): Reusing the last session time 2019-01-13T00:00:00+00:00.
2019-01-13 20:03:00 +0900 [INFO] (main): Using session /Users/morihaya/work/digdag/.digdag/status/20190113T000000+0000.
2019-01-13 20:03:00 +0900 [INFO] (main): Starting a new session project id=1 workflow name=loop session_time=2019-01-13T00:00:00+00:00
2019-01-13 20:03:01 +0900 [INFO] (0017@[0:default]+loop+check_loop): loop>: 3
2019-01-13 20:03:02 +0900 [INFO] (0017@[0:default]+loop+check_loop^sub+loop-0+step1): echo>: i is 0
i is 0
2019-01-13 20:03:02 +0900 [INFO] (0017@[0:default]+loop+check_loop^sub+loop-1+step1): echo>: i is 1
i is 1
2019-01-13 20:03:02 +0900 [INFO] (0017@[0:default]+loop+check_loop^sub+loop-2+step1): echo>: i is 2
i is 2
Success. Task state is saved at /Users/morihaya/work/digdag/.digdag/status/20190113T000000+0000 directory.

for_each operator

for_eachはKey,Valueの変数を使用した繰り返し処理を行えます。

# for_each.dig
check_for_each:
  for_each>:
    key: [one, two]
  _do:
    echo>: ${key}

結果は以下の様になります。

# digdag run for_each.dig --rerun
2019-01-13 20:18:49 +0900: Digdag v0.9.27
2019-01-13 20:18:51 +0900 [WARN] (main): Reusing the last session time 2019-01-13T00:00:00+00:00.
2019-01-13 20:18:51 +0900 [INFO] (main): Using session /Users/morihaya/work/digdag/.digdag/status/20190113T000000+0000.
2019-01-13 20:18:51 +0900 [INFO] (main): Starting a new session project id=1 workflow name=for_each session_time=2019-01-13T00:00:00+00:00
2019-01-13 20:18:51 +0900 [INFO] (0017@[0:default]+for_each+check_for_each): for_each>: {key=[one, two]}
2019-01-13 20:18:52 +0900 [INFO] (0017@[0:default]+for_each+check_for_each^sub+for-0=key=0=one): echo>: one
one
2019-01-13 20:18:52 +0900 [INFO] (0017@[0:default]+for_each+check_for_each^sub+for-0=key=1=two): echo>: two
two
Success. Task state is saved at /Users/morihaya/work/digdag/.digdag/status/20190113T000000+0000 directory.

私の場合、事前にvalueを変数に入れておいて回すことが多いです。以下は上のfor_each.digと同じ結果になりますが、values変数にfor_eachで回す値を先に入れています。

# for_each2.dig
_export:
  values: [one , two]

+check_for_each:
  for_each>:
    key: ${values}
  _do:
    echo>: ${key}

for_eachは複数のkey,valueを受け付けることができます。以下のように二つのkey,valueを与えた場合、全ての組み合わせの処理が実行されます。

# for_each3.dig
+check_for_each:
  for_each>:
    key: [one , two]
    key2: [aaa , bbb]
  _do:
    echo>: ${key} ${key2}

結果は以下の様になります。2の2乗=4回の処理が行われています。

# digdag run for_each3.dig --rerun
2019-01-13 20:28:55 +0900: Digdag v0.9.27
2019-01-13 20:28:56 +0900 [WARN] (main): Using a new session time 2019-01-13T00:00:00+00:00.
2019-01-13 20:28:56 +0900 [INFO] (main): Using session /Users/morihaya/work/digdag/.digdag/status/20190113T000000+0000.
2019-01-13 20:28:56 +0900 [INFO] (main): Starting a new session project id=1 workflow name=for_each3 session_time=2019-01-13T00:00:00+00:00
2019-01-13 20:28:57 +0900 [INFO] (0017@[0:default]+for_each3+check_for_each): for_each>: {key=[one, two], key2=[aaa, bbb]}
2019-01-13 20:28:57 +0900 [INFO] (0017@[0:default]+for_each3+check_for_each^sub+for-0=key=0=one&1=key2=0=aaa): echo>: one aaa
one aaa
2019-01-13 20:28:57 +0900 [INFO] (0017@[0:default]+for_each3+check_for_each^sub+for-0=key=0=one&1=key2=1=bbb): echo>: one bbb
one bbb
2019-01-13 20:28:57 +0900 [INFO] (0017@[0:default]+for_each3+check_for_each^sub+for-0=key=1=two&1=key2=0=aaa): echo>: two aaa
two aaa
2019-01-13 20:28:58 +0900 [INFO] (0017@[0:default]+for_each3+check_for_each^sub+for-0=key=1=two&1=key2=1=bbb): echo>: two bbb
two bbb
Success. Task state is saved at /Users/morihaya/work/digdag/.digdag/status/20190113T000000+0000 directory.

そして以下のように3つのkey,valueにした場合も同様に全件組み合わせた結果が実行されます。

# for_each4.dig
+check_for_each:
  for_each>:
    key: [one , two]
    key2: [aaa , bbb]
    key3: [AAA , BBB]
  _do:
    echo>: ${key} ${key2} ${key3}

以下が結果です。2の3乗=8回の処理が行われています。

# digdag run for_each4.dig --rerun
2019-01-13 20:32:03 +0900: Digdag v0.9.27
2019-01-13 20:32:04 +0900 [WARN] (main): Using a new session time 2019-01-13T00:00:00+00:00.
2019-01-13 20:32:04 +0900 [INFO] (main): Using session /Users/morihaya/work/digdag/.digdag/status/20190113T000000+0000.
2019-01-13 20:32:04 +0900 [INFO] (main): Starting a new session project id=1 workflow name=for_each4 session_time=2019-01-13T00:00:00+00:00
2019-01-13 20:32:05 +0900 [INFO] (0017@[0:default]+for_each4+check_for_each): for_each>: {key=[one, two], key2=[aaa, bbb], key3=[AAA, BBB]}
2019-01-13 20:32:06 +0900 [INFO] (0017@[0:default]+for_each4+check_for_each^sub+for-0=key=0=one&1=key2=0=aaa&2=key3=0=AAA): echo>: one aaa AAA
one aaa AAA
2019-01-13 20:32:06 +0900 [INFO] (0017@[0:default]+for_each4+check_for_each^sub+for-0=key=0=one&1=key2=0=aaa&2=key3=1=BBB): echo>: one aaa BBB
one aaa BBB
2019-01-13 20:32:06 +0900 [INFO] (0017@[0:default]+for_each4+check_for_each^sub+for-0=key=0=one&1=key2=1=bbb&2=key3=0=AAA): echo>: one bbb AAA
one bbb AAA
2019-01-13 20:32:06 +0900 [INFO] (0017@[0:default]+for_each4+check_for_each^sub+for-0=key=0=one&1=key2=1=bbb&2=key3=1=BBB): echo>: one bbb BBB
one bbb BBB
2019-01-13 20:32:06 +0900 [INFO] (0017@[0:default]+for_each4+check_for_each^sub+for-0=key=1=two&1=key2=0=aaa&2=key3=0=AAA): echo>: two aaa AAA
two aaa AAA
2019-01-13 20:32:07 +0900 [INFO] (0017@[0:default]+for_each4+check_for_each^sub+for-0=key=1=two&1=key2=0=aaa&2=key3=1=BBB): echo>: two aaa BBB
two aaa BBB
2019-01-13 20:32:07 +0900 [INFO] (0017@[0:default]+for_each4+check_for_each^sub+for-0=key=1=two&1=key2=1=bbb&2=key3=0=AAA): echo>: two bbb AAA
two bbb AAA
2019-01-13 20:32:07 +0900 [INFO] (0017@[0:default]+for_each4+check_for_each^sub+for-0=key=1=two&1=key2=1=bbb&2=key3=1=BBB): echo>: two bbb BBB
two bbb BBB
Success. Task state is saved at /Users/morihaya/work/digdag/.digdag/status/20190113T000000+0000 directory.

for_eachの処理数の最大値は1000(default)

そしてこのfor_each、どこまでkey,valueを与えられるか軽く試したところ、繰り返し処理数が1000を超える場合に以下のToo many for_each subtasks.エラーとなることが分かりました。そうそうあるものではありませんがお気をつけください。またメッセージにLimit: 1000 (config)とあるのでコンフィグで変更できそうです。

# digdag run for_each5.dig --rerun
2019-01-13 20:41:45 +0900: Digdag v0.9.27
2019-01-13 20:41:46 +0900 [WARN] (main): Reusing the last session time 2019-01-13T00:00:00+00:00.
2019-01-13 20:41:46 +0900 [INFO] (main): Using session /Users/morihaya/work/digdag/.digdag/status/20190113T000000+0000.
2019-01-13 20:41:46 +0900 [INFO] (main): Starting a new session project id=1 workflow name=for_each5 session_time=2019-01-13T00:00:00+00:00
2019-01-13 20:41:47 +0900 [INFO] (0017@[0:default]+for_each5+check_for_each): for_each>: {key=[one, two], key2=[aaa, bbb], key3=[AAA, BBB], key4=[AAA, BBB], key5=[AAA, BBB], key6=[AAA, BBB], key7=[AAA, BBB], key8=[AAA, BBB], key9=[AAA, BBB], key10=[AAA, BBB]}
2019-01-13 20:41:47 +0900 [ERROR] (0017@[0:default]+for_each5+check_for_each): Configuration error at task +for_each5+check_for_each: Too many for_each subtasks. Limit: 1000 (config)
2019-01-13 20:41:47 +0900 [INFO] (0017@[0:default]+for_each5^failure-alert): type: notify
error:
  * +for_each5+check_for_each:
    Too many for_each subtasks. Limit: 1000 (config)

Task state is saved at /Users/morihaya/work/digdag/.digdag/status/20190113T000000+0000 directory.

参考までにToo many for_each subtasks.エラーとなったサンプルです。2の10乗=1024のために1000を超えてしまいました。

# for_each5.dig
+check_for_each:
  for_each>:
    key: [one , two]
    key2: [aaa , bbb]
    key3: [AAA , BBB]
    key4: [AAA , BBB]
    key5: [AAA , BBB]
    key6: [AAA , BBB]
    key7: [AAA , BBB]
    key8: [AAA , BBB]
    key9: [AAA , BBB]
    key10: [AAA , BBB]
  _do:
    echo>: ${key} ${key2} ${key3}

for_eachのパラメーター箇条書き

for_eachは便利なのですが、与えるパラメタを1行で書くため、valueの数が増えると横に長くなってしまい可読性が悪くなる悩みがありました。調べたところGitHubのissueに"値を箇条書きするサンプルの記載があり、問題なく利用できているので良く使っています。(なぜか公式ドキュメントには記載されていませんが)

GitHub: treasure-data/digdag/issues/for_each parameterization capability is limited

# for_each6.dig
_export:
  values: &VALUES
    - one
    - two

+parameterized_for_each:
  for_each>:
    key: *VALUES
  _do:
    echo>: ${key}

結果は以下の様になります。

# digdag run for_each6.dig --rerun
2019-01-13 20:53:35 +0900: Digdag v0.9.27
2019-01-13 20:53:36 +0900 [WARN] (main): Reusing the last session time 2019-01-13T00:00:00+00:00.
2019-01-13 20:53:36 +0900 [INFO] (main): Using session /Users/morihaya/work/digdag/.digdag/status/20190113T000000+0000.
2019-01-13 20:53:37 +0900 [INFO] (main): Starting a new session project id=1 workflow name=for_each6 session_time=2019-01-13T00:00:00+00:00
2019-01-13 20:53:37 +0900 [INFO] (0017@[0:default]+for_each6+parameterized_for_each): for_each>: {key=[one, two]}
2019-01-13 20:53:38 +0900 [INFO] (0017@[0:default]+for_each6+parameterized_for_each^sub+for-0=key=0=one): echo>: one
one
2019-01-13 20:53:38 +0900 [INFO] (0017@[0:default]+for_each6+parameterized_for_each^sub+for-0=key=1=two): echo>: two
two
Success. Task state is saved at /Users/morihaya/work/digdag/.digdag/status/20190113T000000+0000 directory.

for_range operator

for_rangeは以下の3つを整数を指定して処理を繰り返すことができます。

  • 開始値(from)
  • 終わり値(to)
  • 増加値(step)

処理の中では以下の3つの変数を利用できます。range.indexはloopの${i}の様に0から1ずつ増加します。

  • ${range.from}
  • ${range.to}
  • ${range.index}

以下のサンプルでは、開始値に100を指定して終わり値500まで100ずつ増加する=4回処理が行われます。5回ではない事に注意してください。100から500の間には100は4つしか入りません。

# for_range.dig
+check_for_range:
  for_range>:
    from: 100
    to: 500
    step: 100
  _do:
    echo>: range.from=${range.from} , range.to=${range.to} , range.index=${range.index}

以下のような結果になります。

igdag run for_range.dig --rerun
2019-01-13 21:03:18 +0900: Digdag v0.9.27
2019-01-13 21:03:19 +0900 [WARN] (main): Reusing the last session time 2019-01-13T00:00:00+00:00.
2019-01-13 21:03:19 +0900 [INFO] (main): Using session /Users/morihaya/work/digdag/.digdag/status/20190113T000000+0000.
2019-01-13 21:03:19 +0900 [INFO] (main): Starting a new session project id=1 workflow name=for_range session_time=2019-01-13T00:00:00+00:00
2019-01-13 21:03:19 +0900 [INFO] (0017@[0:default]+for_range+repeat): for_range>: {from=100, to=500, step=100}
2019-01-13 21:03:20 +0900 [INFO] (0017@[0:default]+for_range+repeat^sub+range-from=100&to=200): echo>: range.from=100 , range.to=200 , range.index=0
range.from=100 , range.to=200 , range.index=0
2019-01-13 21:03:20 +0900 [INFO] (0017@[0:default]+for_range+repeat^sub+range-from=200&to=300): echo>: range.from=200 , range.to=300 , range.index=1
range.from=200 , range.to=300 , range.index=1
2019-01-13 21:03:20 +0900 [INFO] (0017@[0:default]+for_range+repeat^sub+range-from=300&to=400): echo>: range.from=300 , range.to=400 , range.index=2
range.from=300 , range.to=400 , range.index=2
2019-01-13 21:03:20 +0900 [INFO] (0017@[0:default]+for_range+repeat^sub+range-from=400&to=500): echo>: range.from=400 , range.to=500 , range.index=3
range.from=400 , range.to=500 , range.index=3
Success. Task state is saved at /Users/morihaya/work/digdag/.digdag/status/20190113T000000+0000 directory.
  * Use --session <daily | hourly | "yyyy-MM-dd[ HH:mm:ss]"> to not reuse the last session time.
  * Use --rerun, --start +NAME, or --goal +NAME argument to rerun skipped tasks.

先ほどのサンプルの終わり値を500 -> 501に変更して実行した場合は、500から501という範囲があるため5回処理が実行されます。

# for_range2.dig
+check_for_range:
  for_range>:
    from: 100
    to: 501
    step: 100
  _do:
    echo>: range.from=${range.from} , range.to=${range.to} , range.index=${range.index}

結果は以下の様になります。for_range2+check_for_range^sub+range-from=500&to=501が追加されて5回実行されていますね。

digdag run for_range2.dig --rerun
2019-01-13 21:09:20 +0900: Digdag v0.9.27
2019-01-13 21:09:22 +0900 [WARN] (main): Using a new session time 2019-01-13T00:00:00+00:00.
2019-01-13 21:09:22 +0900 [INFO] (main): Using session /Users/morihaya/work/digdag/.digdag/status/20190113T000000+0000.
2019-01-13 21:09:22 +0900 [INFO] (main): Starting a new session project id=1 workflow name=for_range2 session_time=2019-01-13T00:00:00+00:00
2019-01-13 21:09:23 +0900 [INFO] (0017@[0:default]+for_range2+check_for_range): for_range>: {from=100, to=501, step=100}
2019-01-13 21:09:24 +0900 [INFO] (0017@[0:default]+for_range2+check_for_range^sub+range-from=100&to=200): echo>: range.from=100 , range.to=200 , range.index=0
range.from=100 , range.to=200 , range.index=0
2019-01-13 21:09:24 +0900 [INFO] (0017@[0:default]+for_range2+check_for_range^sub+range-from=200&to=300): echo>: range.from=200 , range.to=300 , range.index=1
range.from=200 , range.to=300 , range.index=1
2019-01-13 21:09:24 +0900 [INFO] (0017@[0:default]+for_range2+check_for_range^sub+range-from=300&to=400): echo>: range.from=300 , range.to=400 , range.index=2
range.from=300 , range.to=400 , range.index=2
2019-01-13 21:09:24 +0900 [INFO] (0017@[0:default]+for_range2+check_for_range^sub+range-from=400&to=500): echo>: range.from=400 , range.to=500 , range.index=3
range.from=400 , range.to=500 , range.index=3
2019-01-13 21:09:25 +0900 [INFO] (0017@[0:default]+for_range2+check_for_range^sub+range-from=500&to=501): echo>: range.from=500 , range.to=501 , range.index=4
range.from=500 , range.to=501 , range.index=4
Success. Task state is saved at /Users/morihaya/work/digdag/.digdag/status/20190113T000000+0000 directory.

最後に一言

digdagの繰り返し処理を見直しました。for_eachの最大処理数が1000だったり、for_rangeの範囲の考え方(from:100,to:500だと5回ではなく4回の処理)など新しい発見がありました。自分がいかに雰囲気でdigdagを利用していることが良く分かりました><

最後に、このdigdagという素晴らしいソフトウェアを開発して頂いている皆様に心から感謝します。 そして利用者である皆様の良いdigdagライフを祈ります!