Pythonで終了処理を実行させる
一つの問題です。
よくあるRaspberry PiのGPIOを制御するプログラムです。
以下のようなソースコードがあって
import RPi.GPIO as GPIO import time GPIO.setmode(GPIO.BOARD) GPIO.setup(13, GPIO.OUT) while True: GPIO.output(13, True) time.sleep(1) GPIO.output(13, False) time.sleep(1) GPIO.cleanup()
これを実行中に、Ctrl-Cやkillなどで停止させた場合、GPIO.cleanup()は実行されるでしょうか。
正解は、実行されません。
簡単な話で、while True:で無限ループしている途中で強制終了されるので、当然それ以下の処理は実行されません。
RPi.GPIOの場合、以前のプロセスがcleanupメソッドを実行せずにsetupメソッドを実行した場合、以下のエラーを発生させるようになっています。
RuntimeWarning:This channel is already in use,continuing anyway.Use GPIO.setwarning'(Flase) to disable warnings.
このメッセージを読むと、回避する方法として先頭に
GPIO.setwarning(Flase)
を追記する方法が推奨されていますが、この方法を使うと他のエラーメッセージも非表示となってしまうため、個人的にあまりおススメできません。
そもそも本来実行すべきcleanupメソッドが実行されない問題を先送りするべきではないですね。
RPi.GPIOのcleanup問題くらいだと、それほど重大な問題とはなりにくいですが、理想としては終了処理を実行させることです。
では、どうするのか
以下のクラスを用意しました。
import signal import sys class ProcessDestructor(): def __init__(self, callback): self.callback = callback signal.signal(signal.SIGINT, self.__CatcheSignal) signal.signal(signal.SIGTERM, self.__CatcheSignal) def __CatcheSignal(self, signum, frame): self.SignalIgnoreDestruction() self.callback() self.SignalDefaultDestruction() sys.exit() @classmethod def SignalIgnoreTERM(self): signal.signal(signal.SIGTERM, signal.SIG_IGN) @classmethod def SignalIgnoreINT(self): signal.signal(signal.SIGINT, signal.SIG_IGN) @classmethod def SignalIgnoreDestruction(self): self.SignalIgnoreTERM() self.SignalIgnoreINT() @classmethod def SignalDefaultTERM(self): signal.signal(signal.SIGTERM, signal.SIG_DFL) @classmethod def SignalDefaultINT(self): signal.signal(signal.SIGINT, signal.SIG_DFL) @classmethod def SignalDefaultDestruction(self): self.SignalDefaultTERM() self.SignalDefaultINT()
このクラスは、Ctrl-CやKillコマンドなどのシグナルをプロセスが受け取ったとき、指定されたコールバック関数を実行します。
簡単な使い方を紹介します。
import RPi.GPIO as GPIO import time import TsProcessDestructor import sys def destructor(): print('call destructor') GPIO.cleanup() def main(): ProcessDestructor = TsProcessDestructor.ProcessDestructor(destructor) GPIO.setmode(GPIO.BOARD) GPIO.setup(13, GPIO.OUT) while True: GPIO.output(13, True) time.sleep(1) GPIO.output(13, False) time.sleep(1) if __name__ == "__main__": sys.exit(main())
mainでProcessDestructorクラスのインスタンスを生成します。生成時にはコールバック関数であるdestructorを引数にしています。
これで、終了シグナル検知時にdestructorが呼び出されるようになります。
実際に実行してCtrl-Cをすると、コンソールに「call destructor」が表示されるので、実行されていることがわかります。
これを利用すれば、main関数でループ処理などをしていても、終了時にきちんと終了処理をすることができるので、より安全にアプリを停止させることができます。
TsProcessDestrucor内のクラスメソッド群は、シグナルを実行するか無視するかを切り替えられるものです。
必要に応じて使いましょう。
2021/04/27 追記 multiprocessingでの挙動
このクラスですが、multiprocessingを使っている場合、すべてのプロセスに対してシグナルが発せられるのでプロセスの分だけコールバックが呼び出されてしまいます。
2回目のコールバックではAttribute Errorなどが発生する可能性もあるので、1度だけの発生にしたいところ。
理想はProcessDestrucotrインスタンスの生成したプロセスに対してのみコールバックを呼び出すことなので、少々変更を入れます。
import signal import sys from TsMessages import PrintDebugMess, DebugStatus import os class ProcessDestructor(): def __init__(self, callback): self.callback = callback self.__pid = os.getpid() signal.signal(signal.SIGINT, self.__CatcheSignal) signal.signal(signal.SIGTERM, self.__CatcheSignal) def __CatcheSignal(self, signum, frame): if self.__pid == os.getpid(): self.SignalIgnoreDestruction() self.callback() self.SignalDefaultDestruction() sys.exit() @classmethod def SignalIgnoreTERM(self): signal.signal(signal.SIGTERM, signal.SIG_IGN) @classmethod def SignalIgnoreINT(self): signal.signal(signal.SIGINT, signal.SIG_IGN) @classmethod def SignalIgnoreDestruction(self): self.SignalIgnoreTERM() self.SignalIgnoreINT() @classmethod def SignalDefaultTERM(self): signal.signal(signal.SIGTERM, signal.SIG_DFL) @classmethod def SignalDefaultINT(self): signal.signal(signal.SIGINT, signal.SIG_DFL) @classmethod def SignalDefaultDestruction(self): self.SignalDefaultTERM() self.SignalDefaultINT()
クラスのコンストラクタで生成元のプロセスID(PID)を取得します。
プロセスの分だけ__CatcheSignalが呼び出されてしまいますが、
生成元のPIDと、シグナルによって呼ばれたプロセスのPIDを比較して一致した場合のみコールバックを呼ぶという処理に変更しています。
これでマルチプロセスにも対応できます。
ディスカッション
コメント一覧
まだ、コメントがありません