最近、仕事でExcelを使っているとふと気づくことがあります。「あれ?これってPythonでやった方が早いんじゃない?」って。かといって、いちいちExcelのデータをCSVに出力してPythonで処理して…というのも面倒ですよね。
実は私も、この「Excel×Python問題」で頭を悩ませていました。データ分析の現場では、ExcelとPythonの両方のいいとこ取りができたら…という願望は誰もが持っているはず。
そんな中で朗報です。2024年9月16日、ついにMicrosoft Excelで直接Pythonが使える機能「Python in Excel」が正式リリースされました。実は、この機能は2023年8月からパブリックプレビュー版として一部のユーザーに提供されていたんです。
- 新機能は想像以上に便利
- 導入のハードル
- 私の正直な感想
- 開発中に遭遇した「謎の白紙Excel」問題
- なぜ2つのExcelウィンドウが必要なのか
- 現実的な解決策
- ツールの主な機能
- なぜExcelを使うのか?
- 必要な環境とセットアップ
- コードの実装
- コードの解説
- 使用する上での注意点
- カスタマイズのポイント
- その前に…開発環境を整理しよう
- なぜStreamlitを選んだのか?
- ネットワーク監視ツールの実行方法
- ファイルの準備
- 実行環境について
- 実行手順
- 実行後の確認
- トラブルシューティング
- ブラウザ表示用のコード(app.py)
- コードの全体構造
- 設定管理とログ機能
- 効率的なネットワークスキャン
- スマートなスキャン戦略
- リアルタイム監視の実装
- 直接設定を変更する
- 実際の運用方法
- よくある質問と対処法
- 応用アイデア
- 進化したネットワーク監視ツール:CSVバージョンの解説
- CSVバージョンの特徴
- コードの重要な変更点
- Streamlitアプリケーションの改良
- CSVデータの活用方法
- 使い方の例
- まとめ
新機能は想像以上に便利
この「Python in Excel」、最初は「また何か中途半端な機能が追加されたのかな」と正直思いました。でも、実際に使ってみると結構いい感じなんです。
特に気に入ったのは、Anacondaが標準で組み込まれている点。Pythonユーザーなら「あ、このライブラリ使いたいな」と思ったとき、わざわざインストール作業が必要ないんです。データの可視化やモデリングも、Excelのセル上で直接Pythonコードを書けば実行できます。
導入のハードル
ただし、いくつか注意点もあります。この機能を使うには、Microsoft 365のサブスクリプションが必要で、さらにInsiderプログラムへの参加が必要になります。Office 2019や2021などの買い切り版では使えないのが少し残念。
私の正直な感想
最近、「Pythonちゃんと勉強しておけばよかった…」と後悔することが増えていました。でも、この新機能のおかげで、Excelユーザーが自然にPythonの世界に足を踏み入れるきっかけになりそうです。
むしろ、これを機にPythonをしっかり学んでおこうと思います。なぜなら、これからのビジネスシーンでは、ExcelとPythonの両方を使いこなせる人材が重宝されるのは間違いないからです。
まだ発展途上の機能ではありますが、これからのアップデートで更に便利になっていくことを期待しています。みなさんも、この機能を使ってExcelとPythonの新しい可能性を探ってみませんか?
PythonでExcelを操作:ネットワーク監視ツールの開発で気づいたこと
実は私、Microsoft 365のサブスクリプションには5年以上も加入しているのですが、Python in Excelを使ったことは一度もありません。いつも通り、VSCodeでPythonスクリプトを書いてExcelを制御する方法を選びました。今回も同じように、ネットワークの状態をリアルタイムでExcelに表示して可視化するスクリプトを作成することにしたんです。
開発中に遭遇した「謎の白紙Excel」問題
プログラムを実行すると、なぜか一瞬だけ白紙のExcelウィンドウが表示されることに気づきました。「バグかな?」と最初は心配になりましたが、調べてみると実はこれには技術的な理由があったんです。
なぜ2つのExcelウィンドウが必要なのか
このプログラムでは、大きく分けて2つの処理を行っています:
- 設定の読み込みとテンプレートの作成
- ネットワーク状態のリアルタイム更新
これらの処理を1つのExcelウィンドウで行おうとすると、実は結構厄介な問題が発生します。というのも、ExcelのCOM(Component Object Model)という仕組みの特性上、異なるスレッドで同じExcelウィンドウを操作しようとすると、予期せぬエラーが発生する可能性が高いんです。
現実的な解決策
理論的には1つのウィンドウだけで全ての処理を行うことも可能なのですが、その場合:
- COMのマーシャリングエラーのリスク
- スレッドセーフ性の問題
- 実装の複雑化
といった問題が出てきます。
そこで今回は、以下のようなアプローチを取ることにしました:
- 設定読み込み用の一時的なExcelウィンドウを起動
- 必要な設定を読み込んだらすぐに閉じる
- メインの処理はリアルタイム更新用のウィンドウで行う
結果として、ユーザーの目に触れるのは実質的にメインウィンドウだけ。起動時に一瞬白紙のウィンドウが表示されるのは、プログラムの安定性を確保するための「必要な妥協点」というわけです。
ExcelとPythonで作るネットワーク監視ツール
先ほど説明したように、このツールはPythonスクリプトを使ってExcelでネットワークの状態を可視化します。コードを見る前に、このツールの特徴を簡単に説明させてください。
ツールの主な機能
- ネットワークの状態をリアルタイムで監視
- 結果をExcelシートにわかりやすく表示
- 異常があった場合は自動で色分け表示
- 履歴データの保存と参照が可能
なぜExcelを使うのか?
「なぜわざわざExcelを使うの?」という疑問を持つ方もいるかもしれません。理由は主に以下の3つです:
- データの可視性が高い:表形式で見やすく、色分けなども直感的
- データの再利用が容易:Excel形式なら他の分析にも使いやすい
- 非エンジニアでも扱いやすい:慣れ親しんだExcelインターフェースを使用
必要な環境とセットアップ
必要なもの
- Python 3.8以上
- pip(Pythonのパッケージマネージャー)
- Microsoft Excel(2019以降)
- 管理者権限(Windowsの場合)
パッケージのインストール
pip install pywin32 # Windows環境でExcel自動操作を行う際に使われる「pythoncom」などが含まれています。
pip install xlwings # Excelとの連携に使用されています。
その他のモジュール(socket、platform、subprocess、os、sys、time、threading、datetime、ipaddress、re、concurrent.futures)はPythonの標準ライブラリに含まれているので、通常は追加インストール不要です。
実行方法
- コマンドプロンプトを管理者権限で開く
- スクリプトのあるディレクトリに移動
- 以下のコマンドで実行
python excellan.py
以下のコードををexcellan.pyとして保存した場合、上記のコマンドを入力します。
よくあるエラーと対処法
- 「COMオブジェクトの初期化に失敗」 → Excel が起動していないことを確認
- 「権限エラー」 → 管理者権限で実行しているか確認
- 「モジュールが見つからない」 → 必要なパッケージが正しくインストールされているか確認
コードの実装
以下が実際のコードになります。コメントを詳しめに書いているので、Pythonに詳しくない方でも理解しやすいようになっています。詳しめに書いているので、Pythonに詳しくない方でも理解しやすいようになっています。
import socket
import platform
import subprocess
import os
import sys
import time
import threading
from datetime import datetime
import xlwings as xw
import ipaddress
import re
from concurrent.futures import ThreadPoolExecutor, as_completed
def get_shared_folders(ip_address: str) -> str:
"""
指定したIPアドレスの共有フォルダ一覧を取得する。
Windowsの場合、'net view' コマンドを利用する。
"""
if platform.system().lower() == "windows":
cmd = f'net view \\\\{ip_address}'
try:
output = subprocess.check_output(cmd, shell=True, universal_newlines=True, stderr=subprocess.DEVNULL)
shares = re.findall(r'\\\\\S+\s+(\S+)', output)
return ", ".join(shares) if shares else ""
except subprocess.CalledProcessError:
return ""
except Exception:
return ""
else:
return ""
class ImprovedNetworkMonitor:
def __init__(self, excel_file='improved_lan_monitor.xlsx'):
self.excel_file = excel_file
self.should_run = False
self.monitor_thread = None
self.refresh_interval = 30 # 通常更新間隔(秒)
self.full_scan_interval = 600 # 全スキャン間隔(秒)
self.known_devices = {} # IPアドレスをキーとした既知デバイス辞書
self.active_ips = set() # アクティブなIPのセット
self.priority_ips = set() # 優先的にスキャンするIP
self.check_shared_folders = False # 共有フォルダ検出の有無(設定で切替)
self.thread_app = None # バックグラウンドスレッド専用のExcelアプリ
self.monitor_wb = None # 常時オープンするWorkbook
self.close_excel_on_stop = True # 終了時にExcelも閉じるか(exit時 True、stop時 False)
# Excelファイルが存在しなければテンプレート作成、存在すれば設定読み込み
if not os.path.exists(self.excel_file):
self.create_excel_template()
else:
self.load_settings()
def create_excel_template(self):
"""モニタリング用のExcelテンプレートを作成(メインスレッドで実施)"""
try:
app = xw.App(visible=False)
wb = app.books.add()
try:
main_sheet = wb.sheets['ネットワークモニター']
except Exception:
main_sheet = wb.sheets.add(name='ネットワークモニター')
try:
status_sheet = wb.sheets['ステータス履歴']
except Exception:
status_sheet = wb.sheets.add(name='ステータス履歴')
try:
config_sheet = wb.sheets['設定']
except Exception:
config_sheet = wb.sheets.add(name='設定')
main_sheet.range('A1').value = [
'IP', 'ホスト名', 'デバイスタイプ', 'ステータス', '応答時間(ms)', '最終確認', '検出回数', '共有フォルダ'
]
main_sheet.range('A1:H1').api.Font.Bold = True
main_sheet.range('A1:H1').color = (200, 200, 250)
status_sheet.range('A1').value = ['タイムスタンプ', 'オンラインデバイス', 'オフラインデバイス', 'ネットワークステータス', 'スキャンモード']
status_sheet.range('A1:E1').api.Font.Bold = True
status_sheet.range('A1:E1').color = (200, 200, 250)
network_info = self.get_network_info()
config_sheet.range('A1').value = '通常更新間隔(秒)'
config_sheet.range('B1').value = 30
config_sheet.range('A2').value = '全スキャン間隔(秒)'
config_sheet.range('B2').value = 600
config_sheet.range('A3').value = 'ネットワーク範囲'
config_sheet.range('B3').value = network_info['network_range']
config_sheet.range('A4').value = 'DHCPレンジ開始'
config_sheet.range('B4').value = 100
config_sheet.range('A5').value = 'DHCPレンジ終了'
config_sheet.range('B5').value = 200
config_sheet.range('A6').value = '優先IPアドレス(カンマ区切り)'
config_sheet.range('B6').value = f"{network_info['gateway']},{network_info['local_ip']}"
config_sheet.range('A7').value = '詳細表示'
config_sheet.range('B7').value = False
config_sheet.range('A8').value = '共有フォルダ検出'
config_sheet.range('B8').value = False
config_sheet.range('C8').value = '注: 共有フォルダ情報を取得する場合はTrue'
config_sheet.range('C1').value = '注: 短すぎると負荷が高くなります(最小10秒)'
config_sheet.range('C2').value = '注: ネットワーク内の全デバイスを検出する間隔'
config_sheet.range('C4').value = '注: ルーターのDHCP設定範囲'
config_sheet.range('C6').value = '注: 常にスキャンするIPアドレス'
config_sheet.range('C7').value = '注: TrueにするとExcelにデバッグ情報を表示'
main_sheet.autofit()
status_sheet.autofit()
config_sheet.autofit()
for sheet in wb.sheets:
if sheet.name not in ['ネットワークモニター', 'ステータス履歴', '設定']:
sheet.delete()
wb.save(self.excel_file)
wb.close()
app.quit()
print(f"テンプレートを作成しました: {self.excel_file}")
self.load_settings()
except Exception as e:
print(f"テンプレート作成エラー: {e}")
def load_settings(self):
"""Excelファイルから設定を読み込む(メインスレッドで実施)"""
try:
app = xw.App(visible=False)
wb = app.books.open(self.excel_file)
config_sheet = wb.sheets['設定']
try:
interval = config_sheet.range('B1').value
if isinstance(interval, (int, float)) and interval >= 10:
self.refresh_interval = int(interval)
except:
pass
try:
full_interval = config_sheet.range('B2').value
if isinstance(full_interval, (int, float)) and full_interval >= 60:
self.full_scan_interval = int(full_interval)
except:
pass
try:
priority_ips_str = config_sheet.range('B6').value
if priority_ips_str and isinstance(priority_ips_str, str):
for ip in priority_ips_str.split(','):
ip = ip.strip()
if self.is_valid_ip(ip):
self.priority_ips.add(ip)
except:
pass
network_info = self.get_network_info()
self.priority_ips.add(network_info['local_ip'])
if network_info['gateway'] != "不明":
self.priority_ips.add(network_info['gateway'])
try:
shared_option = config_sheet.range('B8').value
self.check_shared_folders = bool(shared_option)
except:
pass
wb.close()
app.quit()
except Exception as e:
print(f"設定読み込みエラー: {e}")
def is_valid_ip(self, ip):
pattern = re.compile(r'^(\d{1,3}\.){3}\d{1,3}$')
if not pattern.match(ip):
return False
return all(0 <= int(octet) <= 255 for octet in ip.split('.'))
def get_network_info(self):
# ローカルホスト名は環境変数から取得(例: n4060)
hostname = os.environ.get("COMPUTERNAME", socket.gethostname())
try:
local_ip = socket.gethostbyname(hostname)
except:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
s.connect(('8.8.8.8', 1))
local_ip = s.getsockname()[0]
except:
local_ip = '127.0.0.1'
finally:
s.close()
gateway = "不明"
try:
if platform.system() == "Windows":
output = subprocess.check_output("ipconfig", universal_newlines=True)
for line in output.split('\n'):
if "Default Gateway" in line or "デフォルト ゲートウェイ" in line:
gw = line.split(":")[-1].strip()
if gw:
gateway = gw
break
else:
try:
output = subprocess.check_output("ip route | grep default", shell=True, universal_newlines=True)
gateway = output.split()[2]
except:
output = subprocess.check_output("netstat -nr | grep default", shell=True, universal_newlines=True)
gateway = output.split()[1]
except:
pass
ip_parts = local_ip.split('.')
network_prefix = '.'.join(ip_parts[:3]) + '.'
network_range = f"{network_prefix}0/24"
return {
"hostname": hostname,
"local_ip": local_ip,
"gateway": gateway,
"network_range": network_range,
"network_prefix": network_prefix,
"scan_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
def ping_device(self, ip_address, count=1):
try:
if platform.system().lower() == "windows":
ping_cmd = f"ping -n {count} -w 500 {ip_address}"
else:
ping_cmd = f"ping -c {count} -W 0.5 {ip_address}"
start_time = time.time()
result = subprocess.run(ping_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, universal_newlines=True)
response_time_ms = round((time.time() - start_time) * 1000)
if result.returncode == 0:
return {"status": "オンライン", "response_time_ms": response_time_ms}
else:
return {"status": "オフライン", "response_time_ms": None}
except Exception as e:
print(f"Pingエラー({ip_address}): {e}")
return {"status": "エラー", "response_time_ms": None}
def get_hostname(self, ip_address):
try:
socket.setdefaulttimeout(1)
hostname = socket.getfqdn(ip_address)
if hostname == ip_address:
return "不明"
return hostname
except:
return "不明"
def get_device_type(self, ip_address, network_info):
if ip_address == network_info["local_ip"]:
return "このPC"
elif ip_address == network_info["gateway"]:
return "ゲートウェイ"
if ip_address in self.known_devices:
hostname = self.known_devices[ip_address].get("hostname_base", "").lower()
if "printer" in hostname or "print" in hostname:
return "プリンター"
elif "phone" in hostname or "mobile" in hostname or "android" in hostname or "iphone" in hostname:
return "モバイル"
elif "tv" in hostname or "television" in hostname or "smart-tv" in hostname:
return "テレビ/メディア"
return "その他"
def get_scan_targets(self, network_info, mode="regular"):
targets = set()
network_prefix = network_info["network_prefix"]
if mode == "full":
try:
network = ipaddress.IPv4Network(network_info["network_range"])
for ip in network.hosts():
targets.add(str(ip))
except Exception as e:
print(f"IPネットワーク解析エラー: {e}")
for i in range(1, 255):
targets.add(f"{network_prefix}{i}")
elif mode == "dhcp":
try:
config_sheet = self.monitor_wb.sheets['設定']
start_range = int(config_sheet.range('B4').value or 100)
end_range = int(config_sheet.range('B5').value or 200)
if start_range < 1:
start_range = 1
if end_range > 254:
end_range = 254
for i in range(start_range, end_range + 1):
targets.add(f"{network_prefix}{i}")
except Exception as e:
for i in range(100, 201):
targets.add(f"{network_prefix}{i}")
else: # regular
targets.update(self.active_ips)
targets.update(self.priority_ips)
targets.update(self.priority_ips)
return list(targets)
def scan_network(self, mode="regular"):
network_info = self.get_network_info()
targets = self.get_scan_targets(network_info, mode)
print(f"ネットワークスキャン中: {mode}モード ({len(targets)}個のIP)...")
devices = []
new_active_ips = set()
results = {}
# 並列処理でpingを実行
with ThreadPoolExecutor(max_workers=min(50, len(targets))) as executor:
future_to_ip = {executor.submit(self.ping_device, ip): ip for ip in targets}
for future in as_completed(future_to_ip):
ip = future_to_ip[future]
try:
results[ip] = future.result()
except Exception as e:
results[ip] = {"status": "エラー", "response_time_ms": None}
# 結果に基づいてデバイス情報を更新
for ip in targets:
ping_result = results.get(ip, {"status": "エラー", "response_time_ms": None})
if ping_result["status"] == "オンライン":
new_active_ips.add(ip)
if ip not in self.known_devices or self.known_devices[ip].get("status") != "オンライン":
hostname_base = self.get_hostname(ip)
else:
hostname_base = self.known_devices[ip].get("hostname_base", "不明")
device_type = self.get_device_type(ip, network_info)
device = {
"ip": ip,
"hostname": hostname_base,
"hostname_base": hostname_base,
"device_type": device_type,
"status": ping_result["status"],
"response_time_ms": ping_result["response_time_ms"],
"last_seen": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"detection_count": self.known_devices.get(ip, {}).get("detection_count", 0) + 1
}
self.known_devices[ip] = device
devices.append(device)
elif ip in self.known_devices:
offline_device = self.known_devices[ip].copy()
offline_device["status"] = "オフライン"
offline_device["response_time_ms"] = None
offline_device["last_seen"] = self.known_devices[ip].get("last_seen", "不明")
devices.append(offline_device)
self.active_ips = new_active_ips
return devices
def update_excel(self, devices, scan_mode):
max_retries = 5
while max_retries > 0:
try:
wb = self.monitor_wb
app = wb.app
app.screen_updating = False
try:
sheet = wb.sheets['ネットワークモニター']
except Exception:
sheet = wb.sheets.add(name='ネットワークモニター')
sheet.range("A2:H1000").clear()
unique_devices = {}
for device in devices:
ip = device.get('ip')
if ip not in unique_devices or device.get('detection_count', 0) > unique_devices[ip].get('detection_count', 0):
unique_devices[ip] = device
device_data = []
for device in unique_devices.values():
if self.check_shared_folders and device.get('status') == 'オンライン':
shared_folders = get_shared_folders(device.get('ip'))
else:
shared_folders = ""
device_data.append([
device.get('ip', '不明'),
device.get('hostname', '不明'),
device.get('device_type', 'その他'),
device.get('status', '不明'),
device.get('response_time_ms', '-'),
device.get('last_seen', datetime.now().strftime("%Y-%m-%d %H:%M:%S")),
device.get('detection_count', 1),
shared_folders
])
if device_data:
device_data.sort(key=lambda x: [int(octet) for octet in x[0].split('.')])
sheet.range('A2').value = device_data
for i, row_data in enumerate(device_data, start=2):
status = row_data[3]
if status == 'オンライン':
sheet.range(f'D{i}').color = (198, 239, 206)
else:
sheet.range(f'D{i}').color = (255, 199, 206)
device_type = row_data[2]
if device_type == 'ゲートウェイ':
sheet.range(f'C{i}').color = (255, 230, 153)
elif device_type == 'このPC':
sheet.range(f'C{i}').color = (197, 217, 241)
elif device_type == 'プリンター':
sheet.range(f'C{i}').color = (216, 191, 216)
elif device_type == 'モバイル':
sheet.range(f'C{i}').color = (255, 204, 153)
try:
status_sheet = wb.sheets['ステータス履歴']
except Exception:
try:
status_sheet = wb.sheets.add(name='ステータス履歴')
status_sheet.range('A1').value = ['タイムスタンプ', 'オンラインデバイス', 'オフラインデバイス', 'ネットワークステータス', 'スキャンモード']
status_sheet.range('A1:E1').api.Font.Bold = True
status_sheet.range('A1:E1').color = (200, 200, 250)
except:
status_sheet = None
if status_sheet:
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
online_count = sum(1 for d in unique_devices.values() if d['status'] == 'オンライン')
offline_count = sum(1 for d in unique_devices.values() if d['status'] == 'オフライン')
if online_count == 0:
network_status = "ネットワーク接続なし"
elif online_count < 2:
network_status = "問題あり - デバイスがほとんど見つかりません"
else:
network_status = "正常"
last_status_row = status_sheet.range('A' + str(status_sheet.cells.last_cell.row)).end('up').row
status_sheet.range(f'A{last_status_row+1}').value = [timestamp, online_count, offline_count, network_status, scan_mode]
if last_status_row > 30:
status_sheet.range(f'A2:E{last_status_row-29}').delete()
try:
config_sheet = wb.sheets['設定']
show_details = bool(config_sheet.range('B7').value)
except:
show_details = False
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
try:
info_text = f"最終更新: {current_time} ({scan_mode}スキャン)"
if show_details:
info_text += f" | アクティブIP: {len(self.active_ips)} | 優先IP: {len(self.priority_ips)}"
sheet.range('A1').offset(-1, 0).value = info_text
except:
pass
wb.save()
app.screen_updating = True
print(f"Excelを更新しました: {current_time} ({scan_mode}モード)")
return True
except Exception as e:
if "0x800ac472" in str(e):
print("Excelがビジー状態です。再試行します。")
time.sleep(1)
max_retries -= 1
continue
else:
print(f"Excel更新エラー: {e}")
return False
print("Excel更新の再試行に失敗しました。")
return False
def monitor_loop(self):
import pythoncom
pythoncom.CoInitialize()
self.thread_app = xw.App(visible=True)
try:
self.monitor_wb = self.thread_app.books.open(self.excel_file)
last_scan_time = 0
last_full_scan_time = 0
while self.should_run:
try:
current_time = time.time()
scan_mode = "regular"
if current_time - last_full_scan_time >= self.full_scan_interval:
scan_mode = "full"
last_full_scan_time = current_time
elif len(self.active_ips) < 3 and current_time - last_full_scan_time >= 60:
scan_mode = "dhcp"
if (current_time - last_scan_time >= self.refresh_interval) or scan_mode != "regular":
try:
devices = self.scan_network(mode=scan_mode)
if devices:
self.update_excel(devices, scan_mode)
last_scan_time = current_time
except Exception as e:
print(f"モニタリングサイクルエラー: {e}")
time.sleep(5)
except Exception as e:
print(f"モニタリングループエラー: {e}")
time.sleep(10)
finally:
if self.close_excel_on_stop:
if self.monitor_wb:
self.monitor_wb.save()
self.monitor_wb.close()
self.monitor_wb = None
self.thread_app.quit()
self.thread_app = None
else:
if self.monitor_wb:
self.monitor_wb.save()
pythoncom.CoUninitialize()
def start_monitoring(self):
if self.monitor_thread and self.monitor_thread.is_alive():
print("すでにモニタリングは実行中です")
return False
self.load_settings()
self.close_excel_on_stop = True
self.should_run = True
self.monitor_thread = threading.Thread(target=self.monitor_loop)
self.monitor_thread.start()
print("ネットワークモニタリングを開始しました")
return True
def stop_monitoring(self, close_excel=False):
self.close_excel_on_stop = close_excel
self.should_run = False
if self.monitor_thread:
try:
self.monitor_thread.join(timeout=5)
except:
pass
print("ネットワークモニタリングを停止しました")
return True
def main():
run_now = False
excel_file = 'improved_lan_monitor.xlsx'
if len(sys.argv) > 1:
for arg in sys.argv[1:]:
if arg == '--run':
run_now = True
elif arg.startswith('--excel='):
excel_file = arg.split('=')[1]
monitor = ImprovedNetworkMonitor(excel_file=excel_file)
if run_now:
monitor.start_monitoring()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("終了中...")
monitor.stop_monitoring(close_excel=True)
sys.exit(0)
else:
print(f"改良版LAN監視ツール v1.0")
print(f"Excelファイル: {monitor.excel_file}")
print("コマンド: start - モニタリング開始, stop - モニタリング停止(Excelはそのまま), exit - 終了(Excelを閉じる)")
while True:
try:
cmd = input("> ").strip().lower()
if cmd == "start":
monitor.start_monitoring()
elif cmd == "stop":
monitor.stop_monitoring(close_excel=False)
elif cmd == "exit":
if monitor.should_run:
monitor.stop_monitoring(close_excel=True)
sys.exit(0)
else:
print("不明なコマンド。start, stop, exit のいずれかを入力してください。")
except EOFError:
break
except KeyboardInterrupt:
print("\n終了中...")
if monitor.should_run:
monitor.stop_monitoring(close_excel=True)
sys.exit(0)
if __name__ == "__main__":
main()

コードの解説
このコードの主なポイントは:
- COMオブジェクトの扱い
- 2つのExcelインスタンスを適切に管理
- スレッドセーフな実装を心がけた
- データの更新処理
- 非同期処理でパフォーマンスを確保
- エラー発生時の適切なハンドリング
- ユーザーインターフェース
- 直感的な色分け表示
- 状態変化の視認性を重視
使用する上での注意点
- Excel 2019以降のバージョンで動作確認済み
- Windows環境での使用を想定
- 管理者権限が必要な場合があります
カスタマイズのポイント
このコードは以下のような拡張が可能です:
- 監視項目の追加
- アラート条件の変更
- データ保存形式のカスタマイズ
今回は基本的な実装にとどめていますが、必要に応じて機能を追加することができます。
このスクリプトは自動でエクセルファイルを作成します。Excelで事前にデザインを作成しておくと、最終的なレイアウトやフォーマットを明確にしておく上で非常に有用です。どのようなデータがどの位置に配置され、どのような書式が適用されるかを視覚的に確認でき、調整が必要な箇所も一目瞭然になります。
一方で、PythonスクリプトでExcelファイルを自動生成する場合、コード内でレイアウトや書式設定を直接指定することができ、動的なデータ生成に柔軟に対応できます。これらのアプローチのハイブリッドな方法として、事前にテンプレートとなるExcelファイルをデザインし、そのテンプレートにPythonでデータを書き込む方法もあります。これなら、初期のデザインはExcelでしっかり作り込んだ上で、自動化のメリットも享受できるため、わかりやすさと柔軟性の両方を実現できます。
ネットワーク監視ツールをブラウザで使えるようにしてみた
上で作ったExcelベースのネットワーク監視ツール、使ってみると結構便利なんですが、「他のパソコンからも見れたらいいのに」という思いが徐々に出てきました。そこで今回は、ブラウザベースの監視ツールにアップグレードすることにしました。

その前に…開発環境を整理しよう
実は前回のコードを書いているときに、パソコンの中がちょっとぐちゃぐちゃになってきているのを感じていました。Pythonのパッケージをインストールしまくっているせいで、「このパッケージ、どのプロジェクトで使ってたっけ?」という状態に…。
これは良くないので、今回はちゃんと仮想環境を作って開発することにしました。
仮想環境を作ってみる
コマンドプロンプトを開いて、以下のコマンドを実行します:
python -m venv venv
たった一行のコマンドですが、これで新しい仮想環境が作成されます。venv
という名前のフォルダができているはずです。
次に、この仮想環境を使えるようにする必要があります:
venv/scripts/activate
これで仮想環境が有効になりました。コマンドプロンプトの先頭に(venv)
という表示が出ていれば成功です。
必要なパッケージをインストール
VSCodeでスクリプトを開くと、いくつかのモジュールに黄色の波線がついているのが見えます。これは「このパッケージが見つからないよ」というVSCodeからの優しい警告です。
まずは不足しているパッケージをインストールしましょう:
pip install openpyxl netifaces
次に、ブラウザベースのツールに必要なパッケージもインストールします:
pip install streamlit pandas streamlit_autorefresh
なぜStreamlitを選んだのか?
「ブラウザで見れるようにする」というと、普通はWebフレームワーク(DjangoやFlaskなど)を使うところです。でも今回は少し違うアプローチを取りました。
Streamlitは、データ分析や機械学習の結果を簡単にWeb表示できるフレームワークです。特徴は:
- Python一筋で書ける(HTMLやJavaScriptの知識不要)
- データの可視化が簡単
- リアルタイムの更新が容易
- セットアップが超簡単
要するに、「Pythonのコードを書くだけでサクッとWebアプリが作れる」という、私のような面倒くさがり屋には最高のツールなんです。さらに、Streamlitは、シンプルなコードでWebアプリケーションを作成できる理由の一つに、多くの機能が既に組み込まれている点があります。具体的には、以下のような機能があらかじめ用意されています。
- UIコンポーネント: ボタン、スライダー、テキスト入力などのウィジェットが用意されているため、複雑なHTMLやJavaScriptを書く必要がありません。
- レイアウト管理: 自動でレイアウトが整えられるため、ユーザーインターフェースの設計が簡単です。
- リアルタイム更新: データの変更に合わせてアプリが自動的に更新される仕組みがあり、手動での更新処理を記述する必要がありません。
こんな感じで、短いコードで機能的なアプリケーションを迅速に開発することが可能となっています。
ネットワーク監視ツールの実行方法
開発環境が整ったところで、実際にプログラムを動かしてみましょう。今回のツールは2つのプログラムを同時に動かす必要があるので、少し変わった実行方法になります。
ファイルの準備
まずは2つのPythonファイルを用意します:
realtime_lan_monitor.py
:ネットワーク監視のメインプログラムapp.py
:ブラウザで表示するためのStreamlitアプリ
これらのファイルはC:\lan
フォルダに保存します(別の場所でも構いませんが、以降の説明はC:\lan
を前提としています)。実際にテストしてみたい方は以下からダウンロードしてください。
実行環境について
通常、VSCodeで開発するときは1つのウィンドウで完結することが多いのですが、今回は2つのプログラムを同時に動かす必要があります。
VSCodeでも実はワークスペースやタブグループを駆使すれば同じフォルダのウィンドウを複数起動することは可能です。でも今回は単純に、2つ目のプログラムはPowerShellで実行する方法を採用します。VSCodeでは同一フォルダのウィンドウを複数起動できないため、今回はPowerShellを利用することにしました。ちなみに、VSCodeの設定を変えればこの制限は回避できますが、今回は手間を考えてPowerShellを起動することにしました。
実行手順
1. メインプログラムの実行(VSCodeで)
VSCodeのターミナルで以下のコマンドを実行します:
python realtime_lan_monitor.py
これでネットワークの監視が始まります。
2. ブラウザツールの起動(PowerShellで)
新しくPowerShellを開いて、以下のコマンドを順番に実行します:
cd\
cd lan
venv/scripts/activate
streamlit run app.py
注意:PowerShellは新しく開くので、必ず仮想環境を再度有効化する必要があります。これを忘れると「streamlitコマンドが見つからない」というエラーが出ます。
実行後の確認
正常に起動すると以下のようになります:
- ブラウザが自動的に開き、監視画面が表示される
lan_monitor.log
というログファイルが生成される- Excelファイルも生成されるが、これは一時的なもの
ブラウザの画面は自動的に更新されるので、ネットワークの状態をリアルタイムで確認することができます。
トラブルシューティング
よくありがちな問題と対処方法:
- 「streamlitコマンドが見つからない」
→ 仮想環境が有効になっていない可能性があります - 「ポートがすでに使用されている」
→ 以前の実行が残っている可能性があります。PCを再起動するか、タスクマネージャーでPythonのプロセスを終了してください - 「権限がない」というエラー
→ PowerShellを管理者権限で実行してみてください
以下にそれぞれのコードの中身について詳しく解説していきます。特に、Streamlitを使ってどのようにブラウザ表示を実現しているのかについて見ていきましょう。
ネットワーク監視ツールのコード解説
先ほどは環境構築と実行方法について説明しました。今回は、実際のコードを見ていきましょう。特に、どうやってブラウザでネットワークの状態を表示しているのか、その仕組みを解説します。
ブラウザ表示用のコード(app.py)
まずは短い方のapp.pyから見ていきましょう。このファイルは意外とシンプルです:
import os
import sys
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
import logging
import time
import pandas as pd
import streamlit as st
from streamlit_autorefresh import st_autorefresh
from realtime_lan_monitor_csv import ImprovedNetworkMonitorCSV
# --------------------------------
# ログ設定: INFOレベル以上を出力
# --------------------------------
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
# --------------------------------
# Streamlit のページ設定
# --------------------------------
st.set_page_config(
page_title="LAN Monitoring",
layout="wide"
)
# --------------------------------
# 自動リフレッシュ (5秒)
# --------------------------------
st_autorefresh(interval=5000, limit=100, key="fizzbuzz")
# --------------------------------
# セッションステートでモニタ管理
# --------------------------------
if 'monitor' not in st.session_state:
try:
monitor = ImprovedNetworkMonitorCSV()
monitor.start_monitoring()
st.session_state.monitor = monitor
except Exception as e:
st.error(f"ネットワークモニタの初期化エラー: {e}")
st.title("Real-Time Network Monitoring (CSV version)")
monitor = st.session_state.get('monitor', None)
if monitor is None:
st.error("ネットワークモニタが初期化されていません。")
else:
# 状態表示
if monitor.should_run:
st.success("Monitoring is currently: RUNNING")
else:
st.warning("Monitoring is currently: STOPPED")
# 停止ボタン
if st.button("Stop Monitoring"):
monitor.stop_monitoring()
st.warning("モニタリングを停止しました。必要であればアプリを終了してください。")
# 再開ボタン
if st.button("Restart Monitoring"):
if not monitor.should_run:
monitor.start_monitoring()
st.success("モニタリングを再開しました。")
else:
st.info("すでにモニタリング中です。")
# デバイス一覧表示
devices = monitor.last_devices if hasattr(monitor, "last_devices") else []
if devices:
df = pd.DataFrame(devices)
st.dataframe(df)
else:
st.info("まだデバイス情報は取得されていません。スキャンが行われるまでお待ちください。")
st.write(f"Last update: {time.strftime('%Y-%m-%d %H:%M:%S')}")
コードの特徴
このコードには、いくつかの巧妙な工夫が含まれています:
- 自動更新の実装
st_autorefresh(interval=5000, limit=100, key="fizzbuzz")
5秒(5000ミリ秒)ごとにページを自動更新します。これにより、リアルタイムな状態監視が可能になります。
- セッション状態の管理
if 'monitor' not in st.session_state:
monitor = ImprovedNetworkMonitor()
ページが更新されても監視状態を維持するために、Streamlitのセッション機能を利用しています。
- インタラクティブな操作
if st.button("Stop Monitoring"):
monitor.stop_monitoring()
監視の停止/再開をブラウザ上のボタンで制御できます。
- データのテーブル表示
df = pd.DataFrame(devices)
st.dataframe(df)
PandasのDataFrameを使って、デバイス情報を見やすい表形式で表示します。
Streamlitならではの特徴
このコードの素晴らしいところは、たった数行程度でこれだけの機能を実現できている点です。同じことを通常のWebフレームワークで実装しようとすると、かなりの量のコードが必要になるでしょう。
特に:
- HTMLを一切書く必要がない
- JavaScriptも不要
- データの自動更新も数行で実装
- テーブル表示も1行
これがStreamlitの強みです。データ分析やモニタリングツールのような、データの表示が主体のアプリケーションを作るのに最適なフレームワークと言えます。
ネットワーク監視の本体部分(realtime_lan_monitor.py)についても解説します。
ネットワーク監視の本体コードを解説
前回はブラウザ表示用のコードを見ました。今回は、実際のネットワーク監視を行うメインのコード(realtime_lan_monitor.py)を見ていきましょう。
コードの全体構造
このコードは以下のような構造になっています:
- 設定とログ機能
- ネットワーク情報の取得
- デバイス監視の実装
- Excel出力の処理
- メインループの制御
順番に重要な部分を見ていきましょう。
設定管理とログ機能
import logging
logging.basicConfig(
filename='lan_monitor.log',
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
encoding='utf-8'
)
CONFIG_FILE = "config.json"
def load_config():
config = {
"excel_file": "improved_lan_monitor.xlsx",
"refresh_interval": 30,
"full_scan_interval": 600,
"dhcp_range_start": 100,
"dhcp_range_end": 200,
"priority_ips": [],
"check_shared_folders": True
}
# 設定ファイルが存在する場合は読み込み
if os.path.exists(CONFIG_FILE):
try:
with open(CONFIG_FILE, "r", encoding="utf-8") as f:
file_conf = json.load(f)
config.update(file_conf)
except Exception as e:
logging.error(f"設定ファイルの読み込みエラー: {e}", exc_info=True)
return config
この部分のポイント:
- ログをファイルとコンソールの両方に出力
- デフォルト設定を持ちつつ、JSON形式の設定ファイルで上書き可能
- エラーが発生してもデフォルト値で継続動作
効率的なネットワークスキャン
def scan_network(self, mode="regular"):
net_info = get_network_info()
targets = self.get_scan_targets(net_info, mode)
devices = []
new_active = set()
with ThreadPoolExecutor(max_workers=20) as executor:
futures = {executor.submit(ping_and_collect, ip, net_info,
self.known_devices, self.check_shared_folders): ip
for ip in targets}
for future in as_completed(futures):
ip = futures[future]
try:
_, status, device = future.result()
if status == "オンライン" and device:
new_active.add(ip)
self.known_devices[ip] = device
devices.append(device)
このコードの特徴:
- ThreadPoolExecutorを使用した並行処理
- 最大20スレッドで同時にPing実行
- 応答待ちによる遅延を最小化
スマートなスキャン戦略
スキャンには3つのモードがあります:
- 通常スキャン(regular)
- 既知のアクティブIPと優先IPのみをチェック
- 最も負荷が少なく高速
- フルスキャン(full)
- ネットワーク全体(1-254)をスキャン
- 10分(600秒)ごとに実行
- DHCPレンジスキャン(dhcp)
- DHCP範囲のみをスキャン(デフォルト:100-200)
- アクティブデバイスが少ない時に実行
def get_scan_targets(self, network_info, mode="regular"):
targets = set()
prefix = network_info["network_prefix"]
if mode == "full":
try:
net = ipaddress.IPv4Network(network_info["network_range"])
for ip in net.hosts():
targets.add(str(ip))
except Exception as e:
logging.error(f"fullスキャンエラー: {e}", exc_info=True)
for i in range(1, 255):
targets.add(f"{prefix}{i}")
リアルタイム監視の実装
監視ループは別スレッドで動作し、常に最新の状態を保持します:
def monitor_loop(self):
last_scan_time = 0
last_full_scan_time = 0
while self.should_run:
try:
now = time.time()
scan_mode = "regular"
# フルスキャンの判定
if now - last_full_scan_time >= self.full_scan_interval:
scan_mode = "full"
last_full_scan_time = now
# DHCPスキャンの判定
elif len(self.active_ips) < 3 and now - last_full_scan_time >= 60:
scan_mode = "dhcp"
なぜこの設計なのか?
- 省リソース設計
- 通常は既知のデバイスのみをチェック
- 必要な時だけフルスキャンを実行
- 適応的なスキャン
- デバイスが少ない時はDHCPレンジをチェック
- ネットワーク負荷を最小限に
- エラー耐性
- 各処理でtry-exceptを使用
- エラーが発生しても監視を継続
ネットワーク監視ツール:設定と応用
ここまでPythonとExcel、そしてStreamlitを使ったネットワーク監視ツールの作成と実行方法を見てきました。実際の使用方法や設定のカスタマイズ、そして応用例についてまとめてみましょう。
直接設定を変更する
添付したスクリプトでは、設定はコード内に直接記述されています。例えば、ImprovedNetworkMonitor
クラスの__init__
メソッド内で、次のようなデフォルト設定が定義されています:
def __init__(self):
self.config = load_config()
self.excel_file = self.config["excel_file"]
self.refresh_interval = self.config["refresh_interval"]
self.full_scan_interval = self.config["full_scan_interval"]
self.dhcp_range_start = self.config["dhcp_range_start"]
self.dhcp_range_end = self.config["dhcp_range_end"]
self.priority_ips = set(self.config["priority_ips"])
self.check_shared_folders = self.config["check_shared_folders"]
これらの設定を変更したい場合は、load_config
関数内のデフォルト値を直接編集します:
def load_config():
"""
設定を読み込みます。エラー時はデフォルト値を使用します。
"""
config = {
"excel_file": "improved_lan_monitor.xlsx", # ここを変更
"refresh_interval": 30, # ここを変更(秒単位)
"full_scan_interval": 600, # ここを変更(秒単位)
"dhcp_range_start": 100, # ここを変更
"dhcp_range_end": 200, # ここを変更
"priority_ips": [], # ここを変更(例: ["192.168.1.1", "192.168.1.5"])
"check_shared_folders": True # ここを変更(True/False)
}
# ...以下は同じ
実際の運用方法
1. 監視頻度のバランス
監視頻度を調整することで、ネットワークへの負荷とリアルタイム性のバランスを取れます:
refresh_interval
: 通常の更新間隔(秒)。頻繁すぎるとネットワーク負荷が高まります。full_scan_interval
: フルスキャンの間隔(秒)。長めに設定すると負荷が減ります。
例えば、小規模なホームネットワークなら:
"refresh_interval": 60, # 1分ごとに更新
"full_scan_interval": 3600, # 1時間ごとにフルスキャン
企業ネットワークなら:
"refresh_interval": 30, # 30秒ごとに更新
"full_scan_interval": 900, # 15分ごとにフルスキャン
2. 監視対象の最適化
DHCPの範囲やプライオリティIPを適切に設定することで、効率的な監視が可能になります:
"dhcp_range_start": 50,
"dhcp_range_end": 150,
"priority_ips": ["192.168.1.1", "192.168.1.10", "192.168.1.20"],
3. 自動起動の設定
Windowsのスタートアップに登録するバッチファイル(start_monitor.bat):
@echo off
cd C:\lan
call venv\Scripts\activate
start pythonw realtime_lan_monitor.py --run
timeout /t 5
start pythonw -m streamlit run app.py
このファイルをスタートアップフォルダ(Win+R
でshell:startup
と入力)に配置すると、Windows起動時に自動実行されます。
よくある質問と対処法
Q: 監視が遅い、または反応しないことがある
A: 以下を確認してください:
max_workers
の値を調整(スレッド数を増やす)- pingのタイムアウト値を短くする
- サブネットマスクが正しく取得できているか確認
Q: 特定のデバイスが検出されない
A: 考えられる原因:
- デバイスがpingに応答しない設定になっている
- ファイアウォールでICMPがブロックされている
- スリープ状態になっている
対処法:
- 優先IPリストに手動で追加
- 共有フォルダチェックをオフにしてみる
Q: エラーログが大量に出る
A: ログレベルを調整:
logging.basicConfig(
filename='lan_monitor.log',
level=logging.WARNING, # INFO → WARNING に変更
format='%(asctime)s [%(levelname)s] %(message)s',
encoding='utf-8'
)
応用アイデア
このツールを基にして、様々な拡張が考えられます:
- 監視結果の通知
def send_notification(device):
# Slackやメールで通知する処理
pass
- 長期的な統計収集
def save_statistics(devices):
# データベースに記録
conn = sqlite3.connect('network_stats.db')
# ...
- セキュリティ監視の強化
def detect_suspicious_devices(devices):
# 既知デバイスのリストと比較
# 未知のデバイスがあれば警告
pass
- インターネット接続の監視
def check_internet_connection():
# 外部サーバーへの接続をテスト
# 切断を検知して通知
pass
進化したネットワーク監視ツール:CSVバージョンの解説
前述までExcelを使ったネットワーク監視ツールとそのStreamlitによるブラウザ表示について解説してきました。次はより軽量で拡張性の高いCSVバージョンのスクリプトを紹介します。このバージョンでは、Excelファイルの代わりにCSVファイルを使用し、より効率的なログ記録と状態管理を実現しています。
CSVバージョンの特徴
Excelバージョンと比較して、CSVバージョンには以下のような特徴があります:
- 軽量な記録方式
- Excelの代わりにCSVファイルを使用
- イベントログとスナップショットを別々に記録
- 文字コードをUTF-8 with BOMに統一
- 拡張された機能
- 状態変化のイベント記録
- 定期的なネットワークスナップショット
- 改善されたエラーハンドリング
- 設定の集中管理
- すべての設定をCONFIGディクショナリに集約
- 将来的な設定ファイル化への準備
さらにいうと、ExcelとPythonの組み合わせだと、どうしてもファイルの書式設定や複雑な内部構造が原因でエラーが出やすいんです。一方で、CSV形式はテキストベースでシンプルなので、データの読み書きが容易になり、エラーの発生もかなり減ります。ただ、Excel独自の機能(セルの書式や数式など)は保存できなくなるので、その点だけは注意が必要です。

コードの重要な変更点
1. 設定の集中管理
CONFIG = {
"EVENT_LOG_FILE": "event_log.csv", # 状態変化の履歴
"SNAPSHOT_LOG_FILE": "snapshot_log.csv", # スナップショットの履歴
"CSV_ENCODING": "utf-8-sig",
"REFRESH_INTERVAL": 30, # 通常の更新間隔(秒)
"FULL_SCAN_INTERVAL": 600, # フルスキャン間隔(秒)
"SNAPSHOT_INTERVAL": 600, # スナップショット取得間隔(秒)
"DHCP_RANGE_START": 100,
"DHCP_RANGE_END": 200,
"PRIORITY_IPS": ["192.168.1.1"], # 例: ルーター等 常にスキャンするIP
"CHECK_SHARED_FOLDERS": True,
"EXCEL_FILE": "improved_lan_monitor.xlsx",
"PING_CONCURRENCY": 20, # Pingを並列実行するスレッド数
}
すべての設定が1箇所にまとめられているため、変更が容易になっています。
2. イベントログの実装
def append_event_log(ip, old_status, new_status, response_time, shared):
now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
file_exists = os.path.exists(CONFIG["EVENT_LOG_FILE"])
try:
with open(CONFIG["EVENT_LOG_FILE"], "a", newline="", encoding=CONFIG["CSV_ENCODING"]) as f:
writer = csv.writer(f)
if not file_exists:
writer.writerow(["timestamp", "ip", "old_status", "new_status", "response_time_ms", "shared"])
writer.writerow([now_str, ip, old_status, new_status, response_time, shared])
except Exception as e:
logging.error(f"Failed to append event log for IP {ip}: {e}")
デバイスの状態変化(オンライン→オフラインなど)が発生した時のみ記録することで、重要な変化を捉えつつ、ログファイルのサイズを抑えています。
3. スナップショットの実装
def append_snapshot(devices, scan_mode):
now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
online = [d for d in devices if d["status"] == "オンライン"]
offline = [d for d in devices if d["status"] == "オフライン"]
total_online = len(online)
total_offline = len(offline)
online_ips_str = ",".join([d["ip"] for d in online])
file_exists = os.path.exists(CONFIG["SNAPSHOT_LOG_FILE"])
try:
with open(CONFIG["SNAPSHOT_LOG_FILE"], "a", newline="", encoding=CONFIG["CSV_ENCODING"]) as f:
writer = csv.writer(f)
if not file_exists:
writer.writerow(["timestamp", "total_online", "total_offline", "online_ips", "scan_mode"])
writer.writerow([now_str, total_online, total_offline, online_ips_str, scan_mode])
except Exception as e:
logging.error(f"Failed to append snapshot log: {e}")
定期的なスナップショットでは、そのときのネットワーク全体の状態を記録します。これにより、時間ごとのデバイス数の変化や、どのIPがいつオンラインだったかを追跡することができます。
4. 効率的なネットワーク情報取得
def get_network_info():
hostname = socket.gethostname()
try:
local_ip = socket.gethostbyname(hostname)
except Exception as e:
logging.warning(f"Error getting local IP from hostname. Fallback method. Original error: {e}")
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
s.connect(('8.8.8.8', 1))
local_ip = s.getsockname()[0]
except Exception as ex:
logging.error(f"Fallback error getting local IP: {ex}")
local_ip = '127.0.0.1'
finally:
s.close()
標準的な方法でIPアドレス取得に失敗した場合のフォールバック方法が追加されています。これにより、より幅広い環境で正しく動作するようになっています。
5. スレッドセーフなデータアクセス
def scan_network(self, mode="regular"):
# ...
with self.lock:
old_status = self.known_devices.get(ip, {}).get("status", "不明")
# ...
with self.lock:
self.known_devices[ip] = dev_info
# ...
複数のスレッドからのデータアクセスを制御するために、ロックが追加されています。これにより、データの整合性が保たれ、競合状態(race condition)を防いでいます。
Streamlitアプリケーションの改良
CSVバージョンに対応したStreamlitアプリケーションでは、以下のような改良が行われています:
# 状態表示
if monitor.should_run:
st.success("Monitoring is currently: RUNNING")
else:
st.warning("Monitoring is currently: STOPPED")
よりわかりやすい状態表示により、現在の監視状態が一目でわかるようになっています。
CSVデータの活用方法
CSVファイルに保存されたデータは、さまざまな形で活用できます:
- 長期的なネットワーク統計
- デバイス数の時間変化をグラフ化
- 特定デバイスのオンライン率を計算
- 異常検知
- 通常と異なるパターンを検出
- 新規デバイスの接続を監視
- レポート生成
- 日次/週次のネットワーク状況レポート
- 問題のあったデバイスの一覧表示
- 外部ツールとの連携
- Excelでの詳細分析
- Pythonデータ分析ライブラリ(pandas, matplotlib)との連携
使い方の例
1. 基本的なコンソールコマンド
プログラムを起動すると、以下のようなガイド表示が出ます:
-------------------------------------------------
改良版LAN監視ツール (CSV保存版)
-------------------------------------------------
コマンド一覧:
start : 監視を開始します。
stop : 監視を停止します。
exit : プログラムを終了します。
-------------------------------------------------
使い方:
1) 'start' と入力してEnterキーを押してください。監視が開始します。
2) 監視を停止するには 'stop' と入力してEnterキーを押してください。
3) 終了するには 'exit' と入力してEnterキーを押してください。
-------------------------------------------------
2. CSV出力の確認
監視が開始されると、以下の2つのCSVファイルが生成されます:
event_log.csv
: デバイスの状態変化を記録snapshot_log.csv
: ネットワークの定期的なスナップショットを記録
これらのファイルはExcelやテキストエディタで開いて確認できます。
まとめ
CSVバージョンのネットワーク監視ツールは、Excelバージョンの良い点を継承しつつ、より軽量で拡張性の高い実装になっています。特に以下の点で優れています:
- データ保存がシンプル(CSVファイル)
- イベントベースのログ記録
- 定期的なスナップショット機能
- エラーハンドリングの強化
- 設定の集中管理
このツールをベースに、さらに機能を追加することで、より高度なネットワーク監視システムを構築することができます。例えば、アラート機能の追加や、収集したデータを元にした可視化ダッシュボードの開発などが考えられます。
次のステップとしては、これらのCSVデータを使った分析方法や、さらなる機能拡張について探っていきたいと思います。