Pythonで終了処理を実行させる

Programing,Python

一つの問題です。
よくあるRaspberry PiのGPIOを制御するプログラムです。
以下のようなソースコードがあって

import RPi.GPIO as GPIO
import time
GPIO.setmode(GPIO.BOARD)
GPIO.setup(13, GPIO.OUT)
while
True:
GPIO.output(13, True)
time.sleep(1)
GPIO.output(13, False)
time.sleep(1)
GPIO.cleanup()

これを実行中に、Ctrl-Cやkillなどで停止させた場合、GPIO.cleanup()は実行されるでしょうか。

正解は、実行されません。
簡単な話で、while True:で無限ループしている途中で強制終了されるので、当然それ以下の処理は実行されません。

RPi.GPIOの場合、以前のプロセスがcleanupメソッドを実行せずにsetupメソッドを実行した場合、以下のエラーを発生させるようになっています。

RuntimeWarning:This channel is already in use,continuing anyway.Use GPIO.setwarning'(Flase) to disable warnings.

このメッセージを読むと、回避する方法として先頭に

GPIO.setwarning(Flase)

を追記する方法が推奨されていますが、この方法を使うと他のエラーメッセージも非表示となってしまうため、個人的にあまりおススメできません。

そもそも本来実行すべきcleanupメソッドが実行されない問題を先送りするべきではないですね。
RPi.GPIOのcleanup問題くらいだと、それほど重大な問題とはなりにくいですが、理想としては終了処理を実行させることです。

では、どうするのか
以下のクラスを用意しました。

import signal
import sys
class
ProcessDestructor():
def
__init__(self, callback):
self.callback	= callback
signal.signal(signal.SIGINT, self.__CatcheSignal)
signal.signal(signal.SIGTERM, self.__CatcheSignal)
def
__CatcheSignal(self, signum, frame):
self.SignalIgnoreDestruction()
self.callback()
self.SignalDefaultDestruction()
sys.exit()
@classmethod
def
SignalIgnoreTERM(self):
signal.signal(signal.SIGTERM, signal.SIG_IGN)
@classmethod
def
SignalIgnoreINT(self):
signal.signal(signal.SIGINT, signal.SIG_IGN)
@classmethod
def
SignalIgnoreDestruction(self):
self.SignalIgnoreTERM()
self.SignalIgnoreINT()
@classmethod
def
SignalDefaultTERM(self):
signal.signal(signal.SIGTERM, signal.SIG_DFL)
@classmethod
def
SignalDefaultINT(self):
signal.signal(signal.SIGINT, signal.SIG_DFL)
@classmethod
def
SignalDefaultDestruction(self):
self.SignalDefaultTERM()
self.SignalDefaultINT()

このクラスは、Ctrl-CやKillコマンドなどのシグナルをプロセスが受け取ったとき、指定されたコールバック関数を実行します。

簡単な使い方を紹介します。

import RPi.GPIO as GPIO
import time
import TsProcessDestructor
import sys
def
destructor():
print('call destructor')
GPIO.cleanup()
def
main():
ProcessDestructor   = TsProcessDestructor.ProcessDestructor(destructor)
GPIO.setmode(GPIO.BOARD)
GPIO.setup(13, GPIO.OUT)
while
True:
GPIO.output(13, True)
time.sleep(1)
GPIO.output(13, False)
time.sleep(1)
if __name__ == "__main__":
sys.exit(main())

mainでProcessDestructorクラスのインスタンスを生成します。生成時にはコールバック関数であるdestructorを引数にしています。
これで、終了シグナル検知時にdestructorが呼び出されるようになります。

実際に実行してCtrl-Cをすると、コンソールに「call destructor」が表示されるので、実行されていることがわかります。

これを利用すれば、main関数でループ処理などをしていても、終了時にきちんと終了処理をすることができるので、より安全にアプリを停止させることができます。

TsProcessDestrucor内のクラスメソッド群は、シグナルを実行するか無視するかを切り替えられるものです。
必要に応じて使いましょう。

2021/04/27 追記 multiprocessingでの挙動

このクラスですが、multiprocessingを使っている場合、すべてのプロセスに対してシグナルが発せられるのでプロセスの分だけコールバックが呼び出されてしまいます。
2回目のコールバックではAttribute Errorなどが発生する可能性もあるので、1度だけの発生にしたいところ。
理想はProcessDestrucotrインスタンスの生成したプロセスに対してのみコールバックを呼び出すことなので、少々変更を入れます。

import signal
import sys
from TsMessages import PrintDebugMess, DebugStatus
import os
class
ProcessDestructor():
def
__init__(self, callback):
self.callback	= callback
self.__pid		= os.getpid()
signal.signal(signal.SIGINT, self.__CatcheSignal)
signal.signal(signal.SIGTERM, self.__CatcheSignal)
def
__CatcheSignal(self, signum, frame):
if self.__pid == os.getpid():
self.SignalIgnoreDestruction()
self.callback()
self.SignalDefaultDestruction()
sys.exit()
@classmethod
def
SignalIgnoreTERM(self):
signal.signal(signal.SIGTERM, signal.SIG_IGN)
@classmethod
def
SignalIgnoreINT(self):
signal.signal(signal.SIGINT, signal.SIG_IGN)
@classmethod
def
SignalIgnoreDestruction(self):
self.SignalIgnoreTERM()
self.SignalIgnoreINT()
@classmethod
def
SignalDefaultTERM(self):
signal.signal(signal.SIGTERM, signal.SIG_DFL)
@classmethod
def
SignalDefaultINT(self):
signal.signal(signal.SIGINT, signal.SIG_DFL)
@classmethod
def
SignalDefaultDestruction(self):
self.SignalDefaultTERM()
self.SignalDefaultINT()

クラスのコンストラクタで生成元のプロセスID(PID)を取得します。
プロセスの分だけ__CatcheSignalが呼び出されてしまいますが、
生成元のPIDと、シグナルによって呼ばれたプロセスのPIDを比較して一致した場合のみコールバックを呼ぶという処理に変更しています。
これでマルチプロセスにも対応できます。

Programing,Python

Posted by tsubakurame