Files
raid-monitor/raid_monitor.py
2026-02-14 12:33:43 +00:00

172 lines
6.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
import subprocess
import requests
import re
# === НАСТРОЙКИ ===
RAID_DEVICE = "/dev/md0" # RAID-массив
MOUNT_POINT = "/mnt/storage" # Точка монтирования
TELEGRAM_TOKEN = ""
CHAT_ID = ""
def run_cmd(cmd):
try:
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=True)
return result.stdout.strip()
except subprocess.CalledProcessError as e:
return f"Ошибка: {e.stderr.strip()}"
def get_raid_status(raid_device):
return run_cmd(["sudo", "mdadm", "--detail", raid_device])
def get_raid_disks(raid_device):
"""Получает список дисков, входящих в RAID массив"""
output = run_cmd(["sudo", "mdadm", "--detail", raid_device])
disks = []
for line in output.splitlines():
# Ищем строки с устройствами в формате: номер, major, minor, raiddevice, состояние, устройство
if match := re.search(r"/dev/(sd[a-z]+|nvme\d+n\d+|mmcblk\d+)", line):
disk = match.group(0)
if disk not in disks:
disks.append(disk)
return disks
def get_raid_disks_from_mdstat(raid_device):
"""Получает диски массива из /proc/mdstat"""
try:
with open('/proc/mdstat', 'r') as f:
content = f.read()
# Ищем строку с нашим массивом
lines = content.split('\n')
for line in lines:
if raid_device in line and 'raid' in line:
# Ищем диски в строке
disks = re.findall(r'(sd[a-z]+|nvme\d+n\d+|mmcblk\d+)', line)
return [f"/dev/{disk}" for disk in disks if 'sd' in disk or 'nvme' in disk]
except Exception as e:
print(f"Ошибка чтения /proc/mdstat: {e}")
return []
def parse_raid_details(output):
info = {}
devices = []
for line in output.splitlines():
# Общие параметры
if match := re.search(r"Raid Level\s*:\s*(.+)", line):
info["Level"] = match.group(1).strip()
elif match := re.search(r"Array Size\s*:\s*(.+)", line):
info["Size"] = match.group(1).strip()
elif match := re.search(r"State\s*:\s*(.+)", line):
info["State"] = match.group(1).strip()
elif match := re.search(r"Active Devices\s*:\s*(.+)", line):
info["Active"] = match.group(1).strip()
elif match := re.search(r"Failed Devices\s*:\s*(.+)", line):
info["Failed"] = match.group(1).strip()
# Линии с устройствами - более гибкое регулярное выражение
elif match := re.search(r"(\S+)\s+(/dev/\S+)", line):
state, device = match.groups()
if "active" in state or "faulty" in state or "spare" in state:
# Извлекаем номер raid устройства если есть
raid_num_match = re.search(r"\s+(\d+)\s+", line)
raid_num = f"[{raid_num_match.group(1)}] " if raid_num_match else ""
devices.append(f"{device} {raid_num}{state}")
info["Devices"] = "\n".join(devices) if devices else "нет данных"
return info
def get_smart_info(devices):
info_lines = []
for dev in devices:
try:
output = subprocess.check_output(["sudo", "smartctl", "-a", dev], text=True)
# Ищем ключевые атрибуты
reallocated_match = re.search(r"Reallocated_Sector_Ct\s+\S+\s+\S+\s+\S+\s+\S+\s+\S+\s+\S+\s+\S+\s+\S+\s+\S+\s+(\d+)", output)
pending_match = re.search(r"Current_Pending_Sector\s+\S+\s+\S+\s+\S+\s+\S+\s+\S+\s+\S+\s+\S+\s+\S+\s+\S+\s+(\d+)", output)
uncorrectable_match = re.search(r"Offline_Uncorrectable\s+\S+\s+\S+\s+\S+\s+\S+\s+\S+\s+\S+\s+\S+\s+\S+\s+\S+\s+(\d+)", output)
# Получаем значения, если не нашли — ставим 0
reallocated = int(reallocated_match.group(1)) if reallocated_match else 0
pending = int(pending_match.group(1)) if pending_match else 0
uncorrectable = int(uncorrectable_match.group(1)) if uncorrectable_match else 0
# Определяем статус
if reallocated == 0 and pending == 0 and uncorrectable == 0:
status = "OK ✅"
else:
status = "WARNING ⚠️"
info_lines.append(
f"{dev}{status}\n"
f" Reallocated sectors: {reallocated}\n"
f" Pending sectors: {pending}\n"
f" Uncorrectable sectors: {uncorrectable}\n"
)
except subprocess.CalledProcessError as e:
info_lines.append(f"{dev}: ошибка SMART: {e}")
return "\n".join(info_lines)
def get_disk_usage(mount_point):
output = run_cmd(["df", "-h", mount_point])
lines = output.splitlines()
if len(lines) >= 2:
parts = re.split(r"\s+", lines[1])
filesystem, size, used, avail, percent, mount = parts[:6]
return f"Использование: {used} из {size} ({percent}) свободно {avail}"
return "Не удалось определить использование места"
def send_to_telegram(message):
url = f"https://api.telegram.org/bot{TELEGRAM_TOKEN}/sendMessage"
payload = {
"chat_id": CHAT_ID,
"text": message,
"parse_mode": "HTML"
}
try:
requests.post(url, json=payload, timeout=10)
except requests.RequestException as e:
print(f"Ошибка отправки Telegram: {e}")
def main():
raid_output = get_raid_status(RAID_DEVICE)
raid_info = parse_raid_details(raid_output)
usage_info = get_disk_usage(MOUNT_POINT)
# Автоматически определяем диски массива
raid_disks = get_raid_disks_from_mdstat(RAID_DEVICE)
if not raid_disks:
# Fallback: пытаемся получить из вывода mdadm
raid_disks = get_raid_disks(RAID_DEVICE)
smart_info = get_smart_info(raid_disks)
# Остальной код...
msg = (
f"<b>RAID-массив {RAID_DEVICE}</b>\n"
f"Уровень: {raid_info.get('Level', 'неизвестно')}\n"
f"Размер: {raid_info.get('Size', 'неизвестно')}\n"
f"Состояние: {raid_info.get('State', 'неизвестно')}\n"
f"Активных дисков: {raid_info.get('Active', 'неизвестно')}\n"
f"Сбойных дисков: {raid_info.get('Failed', 'неизвестно')}\n\n"
f"<b>Диски:</b>\n{raid_disks}\n\n"
f"<b>SMART-состояние дисков ({len(raid_disks)} шт.):</b>\n{smart_info}\n"
f"<b>Место:</b>\n{usage_info}"
)
send_to_telegram(msg)
if __name__ == "__main__":
main()