最近はちょくちょくdigdagのdigファイルを書くことがあり、毎回ドキュメントを見ながら思い出している"繰り返し処理"について自分でまとめておきたいと思います。
はじめに
公式ドキュメントは以下です。繰り返し処理を行うloopなどはWorkfrlow control Operatorsというカテゴリの中に記載されています。
digdag Docs » Operators » Workflow control operators
現在、繰り返し処理を行えるOperatorは以下の3つが用意されています。
- loop - 指定回数の処理を繰り返す
- for_each - 与えた1つ以上のkey,valueの組み合わせの個数の処理を繰り返す
- for_range - 範囲(開始値、終わり値)と増加値の3項目を指定し、範囲内で開始値に増加値を加算し、確保できる範囲の個数の処理を繰り返す
- for_rangeは一言で説明が難しいのでサンプル結果を見てください
loop operator
loopは引数として整数を渡すことで、指定回数の処理を繰り返すことができるOperatorです。
暗黙的に変数 ${i}
が0から開始され、処理を繰り返すたびに1ずつ増加されます。
# loop.dig +check_loop: loop>: 3 _do: +step1: echo>: i is ${i}
結果は以下の様になります。
echo>: i is 0
, echo>: i is 1
のようにiが増加していることが確認できます。
なんとなくarr変数で${arr[$i]}
みたいに書きたくなりそうですが、digdagの構文としてはエラーになるのでできません。
# digdag run loop.dig --rerun 2019-01-13 20:02:59 +0900: Digdag v0.9.27 2019-01-13 20:03:00 +0900 [WARN] (main): Reusing the last session time 2019-01-13T00:00:00+00:00. 2019-01-13 20:03:00 +0900 [INFO] (main): Using session /Users/morihaya/work/digdag/.digdag/status/20190113T000000+0000. 2019-01-13 20:03:00 +0900 [INFO] (main): Starting a new session project id=1 workflow name=loop session_time=2019-01-13T00:00:00+00:00 2019-01-13 20:03:01 +0900 [INFO] (0017@[0:default]+loop+check_loop): loop>: 3 2019-01-13 20:03:02 +0900 [INFO] (0017@[0:default]+loop+check_loop^sub+loop-0+step1): echo>: i is 0 i is 0 2019-01-13 20:03:02 +0900 [INFO] (0017@[0:default]+loop+check_loop^sub+loop-1+step1): echo>: i is 1 i is 1 2019-01-13 20:03:02 +0900 [INFO] (0017@[0:default]+loop+check_loop^sub+loop-2+step1): echo>: i is 2 i is 2 Success. Task state is saved at /Users/morihaya/work/digdag/.digdag/status/20190113T000000+0000 directory.
for_each operator
for_eachはKey,Valueの変数を使用した繰り返し処理を行えます。
# for_each.dig check_for_each: for_each>: key: [one, two] _do: echo>: ${key}
結果は以下の様になります。
# digdag run for_each.dig --rerun 2019-01-13 20:18:49 +0900: Digdag v0.9.27 2019-01-13 20:18:51 +0900 [WARN] (main): Reusing the last session time 2019-01-13T00:00:00+00:00. 2019-01-13 20:18:51 +0900 [INFO] (main): Using session /Users/morihaya/work/digdag/.digdag/status/20190113T000000+0000. 2019-01-13 20:18:51 +0900 [INFO] (main): Starting a new session project id=1 workflow name=for_each session_time=2019-01-13T00:00:00+00:00 2019-01-13 20:18:51 +0900 [INFO] (0017@[0:default]+for_each+check_for_each): for_each>: {key=[one, two]} 2019-01-13 20:18:52 +0900 [INFO] (0017@[0:default]+for_each+check_for_each^sub+for-0=key=0=one): echo>: one one 2019-01-13 20:18:52 +0900 [INFO] (0017@[0:default]+for_each+check_for_each^sub+for-0=key=1=two): echo>: two two Success. Task state is saved at /Users/morihaya/work/digdag/.digdag/status/20190113T000000+0000 directory.
私の場合、事前にvalueを変数に入れておいて回すことが多いです。以下は上のfor_each.digと同じ結果になりますが、values
変数にfor_eachで回す値を先に入れています。
# for_each2.dig _export: values: [one , two] +check_for_each: for_each>: key: ${values} _do: echo>: ${key}
for_eachは複数のkey,valueを受け付けることができます。以下のように二つのkey,valueを与えた場合、全ての組み合わせの処理が実行されます。
# for_each3.dig +check_for_each: for_each>: key: [one , two] key2: [aaa , bbb] _do: echo>: ${key} ${key2}
結果は以下の様になります。2の2乗=4回の処理が行われています。
# digdag run for_each3.dig --rerun 2019-01-13 20:28:55 +0900: Digdag v0.9.27 2019-01-13 20:28:56 +0900 [WARN] (main): Using a new session time 2019-01-13T00:00:00+00:00. 2019-01-13 20:28:56 +0900 [INFO] (main): Using session /Users/morihaya/work/digdag/.digdag/status/20190113T000000+0000. 2019-01-13 20:28:56 +0900 [INFO] (main): Starting a new session project id=1 workflow name=for_each3 session_time=2019-01-13T00:00:00+00:00 2019-01-13 20:28:57 +0900 [INFO] (0017@[0:default]+for_each3+check_for_each): for_each>: {key=[one, two], key2=[aaa, bbb]} 2019-01-13 20:28:57 +0900 [INFO] (0017@[0:default]+for_each3+check_for_each^sub+for-0=key=0=one&1=key2=0=aaa): echo>: one aaa one aaa 2019-01-13 20:28:57 +0900 [INFO] (0017@[0:default]+for_each3+check_for_each^sub+for-0=key=0=one&1=key2=1=bbb): echo>: one bbb one bbb 2019-01-13 20:28:57 +0900 [INFO] (0017@[0:default]+for_each3+check_for_each^sub+for-0=key=1=two&1=key2=0=aaa): echo>: two aaa two aaa 2019-01-13 20:28:58 +0900 [INFO] (0017@[0:default]+for_each3+check_for_each^sub+for-0=key=1=two&1=key2=1=bbb): echo>: two bbb two bbb Success. Task state is saved at /Users/morihaya/work/digdag/.digdag/status/20190113T000000+0000 directory.
そして以下のように3つのkey,valueにした場合も同様に全件組み合わせた結果が実行されます。
# for_each4.dig +check_for_each: for_each>: key: [one , two] key2: [aaa , bbb] key3: [AAA , BBB] _do: echo>: ${key} ${key2} ${key3}
以下が結果です。2の3乗=8回の処理が行われています。
# digdag run for_each4.dig --rerun 2019-01-13 20:32:03 +0900: Digdag v0.9.27 2019-01-13 20:32:04 +0900 [WARN] (main): Using a new session time 2019-01-13T00:00:00+00:00. 2019-01-13 20:32:04 +0900 [INFO] (main): Using session /Users/morihaya/work/digdag/.digdag/status/20190113T000000+0000. 2019-01-13 20:32:04 +0900 [INFO] (main): Starting a new session project id=1 workflow name=for_each4 session_time=2019-01-13T00:00:00+00:00 2019-01-13 20:32:05 +0900 [INFO] (0017@[0:default]+for_each4+check_for_each): for_each>: {key=[one, two], key2=[aaa, bbb], key3=[AAA, BBB]} 2019-01-13 20:32:06 +0900 [INFO] (0017@[0:default]+for_each4+check_for_each^sub+for-0=key=0=one&1=key2=0=aaa&2=key3=0=AAA): echo>: one aaa AAA one aaa AAA 2019-01-13 20:32:06 +0900 [INFO] (0017@[0:default]+for_each4+check_for_each^sub+for-0=key=0=one&1=key2=0=aaa&2=key3=1=BBB): echo>: one aaa BBB one aaa BBB 2019-01-13 20:32:06 +0900 [INFO] (0017@[0:default]+for_each4+check_for_each^sub+for-0=key=0=one&1=key2=1=bbb&2=key3=0=AAA): echo>: one bbb AAA one bbb AAA 2019-01-13 20:32:06 +0900 [INFO] (0017@[0:default]+for_each4+check_for_each^sub+for-0=key=0=one&1=key2=1=bbb&2=key3=1=BBB): echo>: one bbb BBB one bbb BBB 2019-01-13 20:32:06 +0900 [INFO] (0017@[0:default]+for_each4+check_for_each^sub+for-0=key=1=two&1=key2=0=aaa&2=key3=0=AAA): echo>: two aaa AAA two aaa AAA 2019-01-13 20:32:07 +0900 [INFO] (0017@[0:default]+for_each4+check_for_each^sub+for-0=key=1=two&1=key2=0=aaa&2=key3=1=BBB): echo>: two aaa BBB two aaa BBB 2019-01-13 20:32:07 +0900 [INFO] (0017@[0:default]+for_each4+check_for_each^sub+for-0=key=1=two&1=key2=1=bbb&2=key3=0=AAA): echo>: two bbb AAA two bbb AAA 2019-01-13 20:32:07 +0900 [INFO] (0017@[0:default]+for_each4+check_for_each^sub+for-0=key=1=two&1=key2=1=bbb&2=key3=1=BBB): echo>: two bbb BBB two bbb BBB Success. Task state is saved at /Users/morihaya/work/digdag/.digdag/status/20190113T000000+0000 directory.
for_eachの処理数の最大値は1000(default)
そしてこのfor_each、どこまでkey,valueを与えられるか軽く試したところ、繰り返し処理数が1000を超える場合に以下のToo many for_each subtasks.
エラーとなることが分かりました。そうそうあるものではありませんがお気をつけください。またメッセージにLimit: 1000 (config)
とあるのでコンフィグで変更できそうです。
# digdag run for_each5.dig --rerun 2019-01-13 20:41:45 +0900: Digdag v0.9.27 2019-01-13 20:41:46 +0900 [WARN] (main): Reusing the last session time 2019-01-13T00:00:00+00:00. 2019-01-13 20:41:46 +0900 [INFO] (main): Using session /Users/morihaya/work/digdag/.digdag/status/20190113T000000+0000. 2019-01-13 20:41:46 +0900 [INFO] (main): Starting a new session project id=1 workflow name=for_each5 session_time=2019-01-13T00:00:00+00:00 2019-01-13 20:41:47 +0900 [INFO] (0017@[0:default]+for_each5+check_for_each): for_each>: {key=[one, two], key2=[aaa, bbb], key3=[AAA, BBB], key4=[AAA, BBB], key5=[AAA, BBB], key6=[AAA, BBB], key7=[AAA, BBB], key8=[AAA, BBB], key9=[AAA, BBB], key10=[AAA, BBB]} 2019-01-13 20:41:47 +0900 [ERROR] (0017@[0:default]+for_each5+check_for_each): Configuration error at task +for_each5+check_for_each: Too many for_each subtasks. Limit: 1000 (config) 2019-01-13 20:41:47 +0900 [INFO] (0017@[0:default]+for_each5^failure-alert): type: notify error: * +for_each5+check_for_each: Too many for_each subtasks. Limit: 1000 (config) Task state is saved at /Users/morihaya/work/digdag/.digdag/status/20190113T000000+0000 directory.
参考までにToo many for_each subtasks.
エラーとなったサンプルです。2の10乗=1024のために1000を超えてしまいました。
# for_each5.dig +check_for_each: for_each>: key: [one , two] key2: [aaa , bbb] key3: [AAA , BBB] key4: [AAA , BBB] key5: [AAA , BBB] key6: [AAA , BBB] key7: [AAA , BBB] key8: [AAA , BBB] key9: [AAA , BBB] key10: [AAA , BBB] _do: echo>: ${key} ${key2} ${key3}
for_eachのパラメーター箇条書き
for_eachは便利なのですが、与えるパラメタを1行で書くため、valueの数が増えると横に長くなってしまい可読性が悪くなる悩みがありました。調べたところGitHubのissueに"値を箇条書きするサンプルの記載があり、問題なく利用できているので良く使っています。(なぜか公式ドキュメントには記載されていませんが)
GitHub: treasure-data/digdag/issues/for_each parameterization capability is limited
# for_each6.dig _export: values: &VALUES - one - two +parameterized_for_each: for_each>: key: *VALUES _do: echo>: ${key}
結果は以下の様になります。
# digdag run for_each6.dig --rerun 2019-01-13 20:53:35 +0900: Digdag v0.9.27 2019-01-13 20:53:36 +0900 [WARN] (main): Reusing the last session time 2019-01-13T00:00:00+00:00. 2019-01-13 20:53:36 +0900 [INFO] (main): Using session /Users/morihaya/work/digdag/.digdag/status/20190113T000000+0000. 2019-01-13 20:53:37 +0900 [INFO] (main): Starting a new session project id=1 workflow name=for_each6 session_time=2019-01-13T00:00:00+00:00 2019-01-13 20:53:37 +0900 [INFO] (0017@[0:default]+for_each6+parameterized_for_each): for_each>: {key=[one, two]} 2019-01-13 20:53:38 +0900 [INFO] (0017@[0:default]+for_each6+parameterized_for_each^sub+for-0=key=0=one): echo>: one one 2019-01-13 20:53:38 +0900 [INFO] (0017@[0:default]+for_each6+parameterized_for_each^sub+for-0=key=1=two): echo>: two two Success. Task state is saved at /Users/morihaya/work/digdag/.digdag/status/20190113T000000+0000 directory.
for_range operator
for_rangeは以下の3つを整数を指定して処理を繰り返すことができます。
開始値(from)
終わり値(to)
増加値(step)
処理の中では以下の3つの変数を利用できます。range.indexはloopの${i}の様に0から1ずつ増加します。
- ${range.from}
- ${range.to}
- ${range.index}
以下のサンプルでは、開始値に100を指定して終わり値500まで100ずつ増加する=4回処理が行われます。5回ではない事に注意してください。100から500の間には100は4つしか入りません。
# for_range.dig +check_for_range: for_range>: from: 100 to: 500 step: 100 _do: echo>: range.from=${range.from} , range.to=${range.to} , range.index=${range.index}
以下のような結果になります。
igdag run for_range.dig --rerun 2019-01-13 21:03:18 +0900: Digdag v0.9.27 2019-01-13 21:03:19 +0900 [WARN] (main): Reusing the last session time 2019-01-13T00:00:00+00:00. 2019-01-13 21:03:19 +0900 [INFO] (main): Using session /Users/morihaya/work/digdag/.digdag/status/20190113T000000+0000. 2019-01-13 21:03:19 +0900 [INFO] (main): Starting a new session project id=1 workflow name=for_range session_time=2019-01-13T00:00:00+00:00 2019-01-13 21:03:19 +0900 [INFO] (0017@[0:default]+for_range+repeat): for_range>: {from=100, to=500, step=100} 2019-01-13 21:03:20 +0900 [INFO] (0017@[0:default]+for_range+repeat^sub+range-from=100&to=200): echo>: range.from=100 , range.to=200 , range.index=0 range.from=100 , range.to=200 , range.index=0 2019-01-13 21:03:20 +0900 [INFO] (0017@[0:default]+for_range+repeat^sub+range-from=200&to=300): echo>: range.from=200 , range.to=300 , range.index=1 range.from=200 , range.to=300 , range.index=1 2019-01-13 21:03:20 +0900 [INFO] (0017@[0:default]+for_range+repeat^sub+range-from=300&to=400): echo>: range.from=300 , range.to=400 , range.index=2 range.from=300 , range.to=400 , range.index=2 2019-01-13 21:03:20 +0900 [INFO] (0017@[0:default]+for_range+repeat^sub+range-from=400&to=500): echo>: range.from=400 , range.to=500 , range.index=3 range.from=400 , range.to=500 , range.index=3 Success. Task state is saved at /Users/morihaya/work/digdag/.digdag/status/20190113T000000+0000 directory. * Use --session <daily | hourly | "yyyy-MM-dd[ HH:mm:ss]"> to not reuse the last session time. * Use --rerun, --start +NAME, or --goal +NAME argument to rerun skipped tasks.
先ほどのサンプルの終わり値を500 -> 501
に変更して実行した場合は、500から501という範囲があるため5回処理が実行されます。
# for_range2.dig +check_for_range: for_range>: from: 100 to: 501 step: 100 _do: echo>: range.from=${range.from} , range.to=${range.to} , range.index=${range.index}
結果は以下の様になります。for_range2+check_for_range^sub+range-from=500&to=501
が追加されて5回実行されていますね。
digdag run for_range2.dig --rerun 2019-01-13 21:09:20 +0900: Digdag v0.9.27 2019-01-13 21:09:22 +0900 [WARN] (main): Using a new session time 2019-01-13T00:00:00+00:00. 2019-01-13 21:09:22 +0900 [INFO] (main): Using session /Users/morihaya/work/digdag/.digdag/status/20190113T000000+0000. 2019-01-13 21:09:22 +0900 [INFO] (main): Starting a new session project id=1 workflow name=for_range2 session_time=2019-01-13T00:00:00+00:00 2019-01-13 21:09:23 +0900 [INFO] (0017@[0:default]+for_range2+check_for_range): for_range>: {from=100, to=501, step=100} 2019-01-13 21:09:24 +0900 [INFO] (0017@[0:default]+for_range2+check_for_range^sub+range-from=100&to=200): echo>: range.from=100 , range.to=200 , range.index=0 range.from=100 , range.to=200 , range.index=0 2019-01-13 21:09:24 +0900 [INFO] (0017@[0:default]+for_range2+check_for_range^sub+range-from=200&to=300): echo>: range.from=200 , range.to=300 , range.index=1 range.from=200 , range.to=300 , range.index=1 2019-01-13 21:09:24 +0900 [INFO] (0017@[0:default]+for_range2+check_for_range^sub+range-from=300&to=400): echo>: range.from=300 , range.to=400 , range.index=2 range.from=300 , range.to=400 , range.index=2 2019-01-13 21:09:24 +0900 [INFO] (0017@[0:default]+for_range2+check_for_range^sub+range-from=400&to=500): echo>: range.from=400 , range.to=500 , range.index=3 range.from=400 , range.to=500 , range.index=3 2019-01-13 21:09:25 +0900 [INFO] (0017@[0:default]+for_range2+check_for_range^sub+range-from=500&to=501): echo>: range.from=500 , range.to=501 , range.index=4 range.from=500 , range.to=501 , range.index=4 Success. Task state is saved at /Users/morihaya/work/digdag/.digdag/status/20190113T000000+0000 directory.
最後に一言
digdagの繰り返し処理を見直しました。for_eachの最大処理数が1000だったり、for_rangeの範囲の考え方(from:100,to:500だと5回ではなく4回の処理)など新しい発見がありました。自分がいかに雰囲気でdigdagを利用していることが良く分かりました><
最後に、このdigdagという素晴らしいソフトウェアを開発して頂いている皆様に心から感謝します。 そして利用者である皆様の良いdigdagライフを祈ります!