対象バージョン: BOSE ver0.8 SPL版
作成: 2006/12/18
システムの複雑さの要因は、かならずしも業務内容によるものではありません。 むしろ、システムを構築する際に使用する開発言語や、実行環境の持つ制約から生じるものが大きいと考えられます。
boseは、従来の開発言語や実行環境が整備できていない次の点に対し、機能を体系的に提供することにより、システムの複雑さを解消し、開発者がより高次なサービスを容易に構築できるような環境を実現します。
これらのことは、従来の開発手法では、次のようなものを組み合わせてその場しのぎ的に開発を行ってきました。
トランザクションファイル、中間テーブル、時刻起動バッチJob、 プログラム内部に散逸的に記述されたステートチャートロジック、IF文のかたまり等々。
業務フロー自体を、言語として表現し直接実行可能とする環境を提供します。
この業務フロー言語・実行環境では、
を記述し実行することができます。
そして、この業務フロー言語は、ワークフロープロセスの実行環境(プログラムカウンタ、スタック、変数領域)に関してPersistence性(永続性)を持ちます。
・boseは、ワークフロープロセス実行エンジンを実現するためのSPL言語拡張モジュールとして実装されています。
SPL言語については、下記を参照してください。
SPL - The SPL Programming Language
http://www.clifford.at/spl/
・時刻起動イベント機能を実現する部分はPerlで実現されており、
特に Date::Manip パッケージを用いた高度なカレンダー機能を利用しています。
営業日や休日、営業時間帯の概念までをもサポートします。
Date::Manip - date manipulation routines
http://search.cpan.org/search?query=Date%3A%3AManip&mode=module
例) DEFINE_SUBPROCESS SubProcess_A() { var que_time = flow_get_current_time(); var resume_time = '\$start_time = UnixDate(DateCalc("+ 3 business days", "${que_time}"), "%Y/%m/%d %T")'; flow_sleep_until(resume_time, "", "after 3 business days"); } DEFINE_SUBPROCESS SubProcess_B() { flow_call_with_return("/usr/local/bose/scripts/flow1_callback.sh", "", "user data entry"); eval(flow_get_resume_code()); } DEFINE_SUBPROCESS main() { var id = "${flow_get_script_name()}:${flow_get_process_id()}"; eval(flow_get_resume_code()); flow_fork_block("fork1"); flow_fork_subprocess("SubProcess_A();", "", "ID=A"); flow_fork_subprocess("SubProcess_B();", "", "ID=B"); flow_join_block("join1"); flow_join_subprocess_all(); flow_unlock(flow_get_lock_id()); }
下記の処理は、処理が行われたフロー部分のトレースを、ログに記録するために使用します。 ログは、誰にでも理解しやすいように、グラフ形式で出力されます。 各サブプロセスの開始、終了、処理間の矢印に関しては、自動的に記録されます。
flow_actionstate_block(label)
ログには、処理が行われたフロー部分がアクションステートとして記録されます。
label: | アクションステートを識別するための任意指定可能な文字列を与えます。 |
flow_passage_point_label(label)
ログには、処理が行われたフロー部分が通過点として記録されます。
label: | 通過点を識別するための任意指定可能な文字列を与えます。 |
flow_decision_block(label)
ログには、処理が行われたフロー部分が判断処理として記録されます。
label: | 判断処理を識別するための任意指定可能な文字列を与えます。 |
flow_condition_label(label)
ログには、処理が行われたフロー部分が判断処理結果の分岐として記録されます。
label: | アクションステートを識別するための任意指定可能な文字列を与えます。 |
flow_fork_block(label)
ログには、処理が行われたフロー部分がFORKとして記録されます。
label: | どのFORK処理かを識別するための任意指定可能な文字列を与えます。 |
flow_join_block(label)
ログには、処理が行われたフロー部分がJOINとして記録されます。
label: | どのJOIN処理かを識別するための任意指定可能な文字列を与えます。 |
flow_fork_subprocess(code, subprocess_id, label)
指定したサブプロセス(DEFINE_SUBPROCESSで定義済みの)をFORKします。
code: | サブプロセスを実行するためのSPL言語による命令コードを文字列として与えます。 |
subprocess_id: | サブプロセスIDをプログラムにより指定する場合に使用します。1文字以上の長さの文字列を指定してください。そうでない場合は、自動的にサブプロセスIDが生成されます。 |
label: | ログに出力するための、どの内容のサブプロセスを起動したかを識別するための任意指定可能な文字列を与えます。 |
返り値: | サブプロセスID |
例) DEFINE_SUBPROCESS import_item(item_id) { ... } DEFINE_SUBPROCESS main() { ... flow_fork_subprocess("import_item('ABC001')", "import_item__ABC001", "ID:ABC001") ... }
flow_kill_subprocess(subprocess_id)
指定したサブプロセスをKILLします。
subprocess_id: | サブプロセスID |
flow_join_subprocess(subprocess_id_list)
複数のサブプロセスをJOINします。
subprocess_id_list: | JOINする対象のサブプロセスのIDが含まれた配列を渡します。 |
flow_join_subprocess_all()
自サブプロセスがFORKした子サブプロセスをすべてJOINします。
flow_sleep()
自サブプロセスをSLEEP状態にします。
flow_wakeup_subprocess(subprocess_id)
指定したサブプロセスをSLEEP状態から実行状態に戻します。
subprocess_id: | サブプロセスID |
flow_get_current_time()
現在時刻を文字列で取得します。
返り値: | 現在時刻を文字列"YYYY/MM/DD HH:MM:SS"の形式で返します。 |
flow_sleep_until(time_cond, lock_id, label)
自サブプロセスをtime_condで指定した時刻まで、SLEEP状態にし、その後継続処理を行います。
time_cond: | 起動時刻を設定する処理を、Perl言語による命令コードにより表現した文字列です。 |
lock_id: |
1文字以上の長さの文字列を与えた場合、サブプロセスのWakeup時にLOCK_IDによる排他チェックを行います。
LOCK_IDが、すでに排他DBに登録されていたら、そのLOCK_IDが排他DBから削除されるまで、継続のための再起動が遅延します。 |
label: | 本コマンドの実行はログに出力されます。時刻再起動処理を識別するための任意指定可能な文字列を与えます。 |
Perlで記述されたワークフロープロセスディスパッチャ上で time_cond 文字列がevalコマンドにより評価されます。time_cond文字列の評価の結果、Perl変数としての$flow_start_timeが生成される必要があります。$flow_start_timeであらわされた時刻に、サブプロセスがWakeupされます。
ワークフロープロセスディスパッチャは、Time::manipパッケージを利用しており、DateCalc等の日付・時刻処理関数が利用可能です。
※誤動作を避けるために、time_cond中の文字列中では、$flow_start_time以外の単語は、先頭に"flow_"をつけないでください。
例) var time_now = flow_get_current_time(); var time_cond = '\$flow_start_time = UnixDate(DateCalc("+ 3 business days", "${time_now}"), "%Y/%m/%d %T")';
flow_set_subprocess_var(subprocess_id, var_name, value)
サブプロセスIDと変数名により識別可能な変数領域に値を設定します。 この変数領域は、単純変数、配列、オブジェクトの格納が可能です。
subprocess_id: | サブプロセスID |
var_name: | 変数名 |
value: | 変数値 |
flow_get_subprocess_var(subprocess_id, var_name)
サブプロセスIDと変数名により識別可能な変数領域から値を取得します。
subprocess_id: | サブプロセスID |
var_name: | 変数名 |
返り値: | 変数領域の保持する変数値 |
flow_get_subprocess_id()
自サブプロセスIDを取得します。
返り値: | 自サブプロセスIDを返します。 |
flow_get_subprocess_status(subprocess_id)
指定したサブプロセスの現在の状態を取得します。
subprocess_id: | サブプロセスID |
戻り値: |
状態を文字列で返します。
状態は、次のいづれかの文字列であらわされます。 start, killed, joined, exit |
flow_get_subprocess_id_list()
自サブプロセスからForkした子サブプロセスのIDのリストを取得します。
戻り値: | 子サブプロセスIDのリストを配列で返します。 |
flow_call_with_return(cmd, cmd_args, label)
外部プログラムを起動します。 自サブタスクは、外部プログラムから、サブプロセス継続処理がかかるまで、SLEEP状態になります。
cmd: | 起動を行うプログラムを指定します。 |
cmd_args: | プログラムに渡す引数を指定します。 |
label: | 本コマンドの実行はログに出力されます。外部システム起動処理を識別するための任意指定可能な文字列を与えます。 |
外部プログラムの起動は、Shellを通して起動されます。 外部プログラムには、cmd_args文字列だけではなく、プロセスID、サブプロセスID情報を渡します。 下記と等価です。
sh -c cmd文字列 'プロセスID' 'サブプロセスID' cmd_args文字列 &
cmd_argsが複数の引数からなる場合、各引数を空白文字で区切ってください。一つの引数が空白文字を含む場合、引数を「'」もしくは「"」で括ってください。
flow_call(cmd)
外部プログラムを起動します。起動した後、サブプロセスは処理を続けます。
外部プログラムの起動は、Shellを通して起動されます。
下記と等価です。
sh -c cmd文字列 &
cmd: | 外部プログラム起動文字列 |
flow_get_resume_code()
ワークフロープロセスの起動時や、サブプロセスの継続処理時に、外部プログラムからワークフロープロセス実行エンジンにより実行可能なコードを渡すことができます。 flow_get_resume_code()は、その実行可能なコートを取得するもので、通常は取得したコードをeval関数により実行します。主として、外部プログラムからのアプリケーション変数の引渡しに使用します。
返り値: | SPL言語で記述された命令コードを文字列で返します。 |
flow_unlock(lock_id)
LOCK_IDで指定したロックを、排他DBから削除します。 ロックにより実行が遅延していた他のワークフロープロセスが、動作可能状態となります。
lock_id: | 削除を行うLOCK_IDを指定します。 |
flow_create_lock(lock_id)
LOCK_IDで指定したロックを、排他DBに登録します。
lock_id: | 登録を行うLOCK_IDを指定します。 |
flow_get_lock_id()
プロセス起動コネクタ、もしくはサブプロセス継続コネクタにより自ワークフロープロセスのために登録されたロックのIDを取得します。
返り値: | 自ワークフロープロセスを起動した際に指定したLOCK_IDを返します。 |
flow_get_request_work_lock_id()
処理依頼コネクタより自ワークフロープロセスのために登録されたロックのIDを取得します。
返り値: | 自ワークフロープロセスを起動した際に指定したLOCK_IDを返します。 |
flow_get_process_id()
自ワークフロープロセスIDを取得します。
返り値: | 自ワークフロープロセスIDを返します。 |
flow_set_max_execution_time(max_time)
ワークフロープロセスの実行制限時間を再設定します。
ワークフロープロセスの実行時間制限のデフォルト値は、600秒です。
max_time: | 実行制限時間を指定します。 |
途中プロセスがSuspend状態を挟まずに、ワークフローが連続して実行時間制限を超えて動作しようとすると、実行エンジンが無限ループ状態と判断して、プロセスの実行を中止します。 ワークフロープロセス状態DBのERROR_DUMP項目には、タイムアウトが記されます。
flow_get_script_name()
自ワークフロープロセスのスクリプト名を取得します。
返り値: | 自ワークフロープロセスのスクリプト名を返します。 |
flow_get_subprocess_delay_time()
プロセスの起動もしくは、サブプロセスの継続を行った際の予定起動時刻からの遅延時間(秒)を取得します。
返り値: | 予定起動時刻からの遅延時間(秒)を返します。 |
次のような要因で大幅な遅延が生じる可能性があります。
flow_get_request_work_delay_time()
現在の処理依頼の予定起動時刻からの遅延時間(秒)を取得します。
返り値: | 予定起動時刻からの遅延時間(秒)を返します。 |
$ /usr/local/bose/bin/compile_script.sh スクリプトファイル名
コンパイルエラーがある場合、エラーメッセージが出力されます。
プロセス起動コネクタの実行方法:
shellにて次のコマンドを実行してください。
イベントキューにプロセス起動トリガーレコードがインサートされます。
/usr/local/bose/bin/process_start 'スクリプトファイル名' 'プロセスID' 'LOCK_ID' 'プロセス起動時実行コード'
プロセスID: | プロセスIDをプログラムにより指定する場合に使用します。1文字以上の長さの文字列を指定してください。そうでない場合は、自動的にプロセスID値が生成されます。 |
プロセス起動時実行コード: | SPL言語による命令コードを文字列として記述してください。これは外部プログラムからのアプリケーション変数の受け渡しの為などに使用します。 |
LOCK_ID: | 1文字以上の長さの文字列を与えた場合、Dispatcharがイベントキューからトリガーレコードを取り出す時に、既に他でLOCK_IDによる排他が行われていないかチェックを行います。
既に排他が行われている場合には、そのLOCK_IDが排他DBから削除されるまで、プロセス起動が遅延します。 |
ワークフロースクリプトの"main"サブプロセス定義において、下記のようにプロセス起動時実行コードを記述することで、実行コードの文字列を取得し、さらにeval()関数によりコードを実行することができます。
/usr/local/bose/bin/process_start create_order_record.spl 'odr_1' '' 'item_id=123' DEFINE_SUBPROCESS main() { var item_id; eval(flow_get_resume_code()); write(item_id); } => 123 が出力されます。
SPL言語の文法に則っていれば良いので、配列渡しや関数コールも記述できます。
shellにて次のコマンドを実行してください。
イベントキューにサブプロセス継続トリガーレコードがインサートされます。
/usr/local/bose/bin/subprocess_continue 'プロセスID' 'サブプロセスID' 'LOCK_ID' 'プロセス継続再起動時実行コード'
プロセスID: | 継続を行うサブプロセスを含むワークフロープロセスのID |
サブプロセスID: | 継続を行うサブプロセスのID |
プロセス継続再起動時実行コード: | SPL言語による命令コードを文字列として記述してください。これは外部プログラムからのアプリケーション変数の受け渡しの為などに使用します。 |
LOCK_ID: | 1文字以上の長さの文字列を与えた場合、Dispatcharがイベントキューからトリガーレコードを取り出す時に、既に他でLOCK_IDによる排他が行われていないかチェックを行います。
既に排他が行われている場合には、そのLOCK_IDが排他DBから削除されるまで、継続のための再起動が遅延します。 |
ワークフロースクリプトの該当サブプロセス定義内部において、下記のようにプロセス継続再起動時実行コードを記述することで、実行コードの文字列を取得し、さらにeval()関数によりコードを実行することができます。
/usr/local/bose/bin/subprocess_continue mod_order_record.spl 'odr_1' 'mod_item__123' '' 'quantity=3' DEFINE_SUBPROCESS mod_item() { flow_call_with_return("mod_order_wait", "123", "mod order"); var quantity; eval(flow_get_resume_code()); write(quantity); } => 3 が出力されます。
shellにて次のコマンドを実行してください。
イベントキューに処理依頼トリガーレコードがインサートされます。
/usr/local/bose/bin/request_work 'プロセスID' 'LOCK_ID' '処理依頼実行コード'
処理依頼実行コード: | SPL言語による命令コードを文字列として記述してください。 |
LOCK_ID: | 1文字以上の長さの文字列を与えた場合、Dispatcharがイベントキューからトリガーレコードを取り出す時に、既に他でLOCK_IDによる排他が行われていないかチェックを行います。
既に排他が行われている場合には、そのLOCK_IDが排他DBから削除されるまで、処理依頼が遅延します。 |
ワークフロー実行エンジンは、処理依頼実行コードを自動的に実行(eval())します。
ワークフロースクリプト内にeval()を書く必要はありません。
SPLには、httpプロトコルによる通信処理の為の、CURL機能拡張モジュールが用意されています。
もし、httpにより外部プロセスを起動した後、自サブプロセスをSleepをさせ、外部プロセスからの継続トリガーの待ちを行うのであれば、下記の様な手順で行ってください。
task_system(); var resp = curl("http://....."); (resp内容のチェック); flow_sleep();
※task_system()は、自タスクがflow_sleep等によりSleepの状態になるまで、平行処理を行っている他のサブプロセスに切り替わらない保障をします。これは、外部システムが継続トリガーを返す前に自タスクをSleepにする必要があるからです。
sendmailを使用してメールを送信する方法を示します。
function send_mail() { ... }
1) インストールファイルの用意
SPLをダウンロードして下さい。
http://www.clifford.at/spl/releases/spl-1.0pre3.tar.gz
下記のモジュールがインストールされていなければ、それぞれインストール作業を行ってください。
DBD-mysql-3.0008.tar.gz (http://search.cpan.org/dist/DBD-mysql/)
DateManip-5.44.tar.gz (http://search.cpan.org/~sbeck/DateManip-5.44/)
pcre (http://www.pcre.org/)
curl (http://curl.haxx.se/)
MySQLサーバ
MySQL-client
MySQL-devel
MySQL-shared
2) コンパイルの準備
適当なディレクトリでspl-1.0pre3.tar.gzを展開してください。
作成されたspl-1.0pre3にbose...tgzを展開してください。
tar xvfz spl-1.0pre3.tar.gz cd spl-1.0pre3/ tar xvfz bose...tgz
3) コンパイル, インストール
次のコマンドを入力して、コンパイルおよびインストールを行ってください。
cp bose/GNUmakefile . make make -f bose/Makefile.bose make -f bose/Makefile.bose install /usr/local/bose下に必要なファイルがインストールされます。
4) DB作成
次のコマンドを入力して、ワークフロー管理DBを作成してください。
mysqladmin create bose mysql bose < /usr/local/bose/env/create_tbl.sql
1) DBホスト等を変更する場合
ワークフローエンジンは、下記のファイルを参照してDBの接続を行います。
DBホストを別マシンに変更する場合、適宜修正してください。
/usr/local/bose/env/db.ini
2) 複数ホストにワークフローディスパッチャを配置して、処理性能を上げる場合
ワークフロープロセスのスナップショットをDBに格納することにより、複数ホストに分散してのワークフローの実行が可能になります。格納方式の切り替えは、下記の部分で行います。
/usr/local/bose/bin/dispatcher.pl の下記の部分 ## SNAPSHOT格納方式 (file / db) $_flow_snapshot_mode = "db"; ワークフローディスパッチャが、単一のホストにのみ設置されている場合は、"file"としてください。
1) ディスパッチャの起動
/usr/local/bose/bin/dispatcher.pl をバックグラウンドで起動してください。
2) ワークフロースクリプトのコンパイル・DBへの登録
/usr/local/bose/bin/compile_script.sh にてワークフロースクリプトをコンパイル・DBへ登録してください。
3) ワークフロープロセスの起動
/usr/local/bose/bin/process_start にてワークフロープロセスを生成し、起動トリガーをDBへ登録してください。
下記のようにコマンドを打つと、プロセスIDで指定したワークフロープロセスの最新状態を表す、ログ情報ファイルが作成されます。
/usr/local/bose/bin/write_log 'プロセスID'
ログ情報ファイルは、画像形式です。
ログファイルの生成先: /tmp/log.プロセスID.png
ワークフロー実行中にエラーが発生すると、イベントキューDB上の該当レコードのACTIVATE_FGに'E'が設定され、またワークフロープロセス状態DBのERROR_DUMP項目にはエラー内容が記録されます。
エラーの要因は、次のようなものがあります。
下記のコマンドを実行すると、ワークフロープロセス状態DBのうち、プロセス終了時刻が60日を過ぎたレコードが削除されます。
/usr/local/bose/bin/delete_log
boseは、GPLにてご提供いたします。
株式会社PM9 野田邦昌 kuni@pm9.com
boseの名称は、Bose-Einstein condensateおよびSuperfluidityからヒントを得ました。
追補: | 当初は、passworksという名前も検討しました。passworksは、サッカーのパスワークから来ています。たしかに現在のboseの緩慢な動作は、まさにpassworksがぴったりなのですが、私たちの理想とするものは、もう少し先にあり、極端に巨大で整合性のあるシステムの実現であるため、その名前は使用しないことにしました。 |