DIYガイガーカウンタ製作日記その2の続き。
事あるごとに線量計で家の周りを測るの、メンドくさいんですよね。
勝手に計測してくれて、リアルタイムに自動的にグラフ化してくれて、線量が上限の閾値を超えたら警告してくれる線量計があると、楽に安心して生活できるなぁ…。
リアルタイムに可視化した空間線量データ
そんな動機で作り始めて、リアルタイムに可視化するとこまで完成。
Safecastにデータ提供するだけだと、他の大量の観測データに埋もれてしまうのと、特定地点の線量の増減がよく分からないので、お馴染みXivelyを使います。データ量によらず無料なのが良い。
KeenIOのライブラリを使うと簡単に綺麗なグラフが描けるんですが、月当たりの送信可能データ量上限が決まっているので、今回は泣く泣く諦めることに。
コードの解説
Xivelyにデータ送信する部分のコードは以下です。実際は自前のevent trigger/handlerを間に挟んでいますが、送信部のみ抜粋しました。
class XivelyEventHandler(IEventHandler):
""" The instance should be registered to event trigger for data update.
This uploads any data to xively cloud service.
Args:
api_key: API key ID provided by xively.
feed_key: Feed key ID provided by xively.
q_max: internal queue size to be used from another thread.
Returns:
Instance of this class.
"""
def __init__(self, api_key, feed_key, q_max=5):
IEventHandler.__init__(self, q_max=q_max)
self._api_key = api_key
self._feed_key = feed_key
以前はXivelyAPIClientインスタンスをinit()で生成して保持していたんですが、あまり長い間接続しているとセッション切られることがあるようで、データをアップするたびに接続/切断するようにしました。
def _run(self, data):
""" Procedure to run when data received from trigger thread.
Args:
data: Pass to the registered event handlers.
"""
def update(at):
api = xively.XivelyAPIClient(self._api_key)
feed = api.feeds.get(self._feed_key)
datastreams = []
for key, val in data["data"].items():
datastreams.append(
xively.Datastream(
id="".join(key.split()),
current_value=val["value"],
at=at
)
)
feed.datastreams = datastreams
feed.update()
api.client.close()
for _ in range(3):
try:
update(data["at"])
logger.info("{} sent data to xively at {}".format(
type(self).__name__, data["at"]))
break
except Exception as e:
logger.error("{} failed to send data to xively at {} by {}".format(
type(self).__name__, data["at"], type(e).__name__))
time.sleep(1)
XivelyAPIClientインスタンスをinit()で生成していた時は度々ConnectionErrorが発生した名残で、例外発生時は3回リトライするようにしています。多分、必要なくなったはずなんですけどね…。
線量が上限の閾値を超えたら警告してくれる
あとはこれを実装すれば、一通り完成かな。
あ、外装をどうにかするのを忘れてた。いい感じの防水ケースを入手せねば。