#!/usr/bin/env python3 import subprocess import requests import re # === НАСТРОЙКИ === RAID_DEVICE = "/dev/md0" # RAID-массив MOUNT_POINT = "/mnt/storage" # Точка монтирования TELEGRAM_TOKEN = "" CHAT_ID = "" def run_cmd(cmd): try: result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=True) return result.stdout.strip() except subprocess.CalledProcessError as e: return f"Ошибка: {e.stderr.strip()}" def get_raid_status(raid_device): return run_cmd(["sudo", "mdadm", "--detail", raid_device]) def get_raid_disks(raid_device): """Получает список дисков, входящих в RAID массив""" output = run_cmd(["sudo", "mdadm", "--detail", raid_device]) disks = [] for line in output.splitlines(): # Ищем строки с устройствами в формате: номер, major, minor, raiddevice, состояние, устройство if match := re.search(r"/dev/(sd[a-z]+|nvme\d+n\d+|mmcblk\d+)", line): disk = match.group(0) if disk not in disks: disks.append(disk) return disks def get_raid_disks_from_mdstat(raid_device): """Получает диски массива из /proc/mdstat""" try: with open('/proc/mdstat', 'r') as f: content = f.read() # Ищем строку с нашим массивом lines = content.split('\n') for line in lines: if raid_device in line and 'raid' in line: # Ищем диски в строке disks = re.findall(r'(sd[a-z]+|nvme\d+n\d+|mmcblk\d+)', line) return [f"/dev/{disk}" for disk in disks if 'sd' in disk or 'nvme' in disk] except Exception as e: print(f"Ошибка чтения /proc/mdstat: {e}") return [] def parse_raid_details(output): info = {} devices = [] for line in output.splitlines(): # Общие параметры if match := re.search(r"Raid Level\s*:\s*(.+)", line): info["Level"] = match.group(1).strip() elif match := re.search(r"Array Size\s*:\s*(.+)", line): info["Size"] = match.group(1).strip() elif match := re.search(r"State\s*:\s*(.+)", line): info["State"] = match.group(1).strip() elif match := re.search(r"Active Devices\s*:\s*(.+)", line): info["Active"] = match.group(1).strip() elif match := re.search(r"Failed Devices\s*:\s*(.+)", line): info["Failed"] = match.group(1).strip() # Линии с устройствами - более гибкое регулярное выражение elif match := re.search(r"(\S+)\s+(/dev/\S+)", line): state, device = match.groups() if "active" in state or "faulty" in state or "spare" in state: # Извлекаем номер raid устройства если есть raid_num_match = re.search(r"\s+(\d+)\s+", line) raid_num = f"[{raid_num_match.group(1)}] " if raid_num_match else "" devices.append(f"{device} {raid_num}— {state}") info["Devices"] = "\n".join(devices) if devices else "нет данных" return info def get_smart_info(devices): info_lines = [] for dev in devices: try: output = subprocess.check_output(["sudo", "smartctl", "-a", dev], text=True) # Ищем ключевые атрибуты reallocated_match = re.search(r"Reallocated_Sector_Ct\s+\S+\s+\S+\s+\S+\s+\S+\s+\S+\s+\S+\s+\S+\s+\S+\s+\S+\s+(\d+)", output) pending_match = re.search(r"Current_Pending_Sector\s+\S+\s+\S+\s+\S+\s+\S+\s+\S+\s+\S+\s+\S+\s+\S+\s+\S+\s+(\d+)", output) uncorrectable_match = re.search(r"Offline_Uncorrectable\s+\S+\s+\S+\s+\S+\s+\S+\s+\S+\s+\S+\s+\S+\s+\S+\s+\S+\s+(\d+)", output) # Получаем значения, если не нашли — ставим 0 reallocated = int(reallocated_match.group(1)) if reallocated_match else 0 pending = int(pending_match.group(1)) if pending_match else 0 uncorrectable = int(uncorrectable_match.group(1)) if uncorrectable_match else 0 # Определяем статус if reallocated == 0 and pending == 0 and uncorrectable == 0: status = "OK ✅" else: status = "WARNING ⚠️" info_lines.append( f"{dev} — {status}\n" f" Reallocated sectors: {reallocated}\n" f" Pending sectors: {pending}\n" f" Uncorrectable sectors: {uncorrectable}\n" ) except subprocess.CalledProcessError as e: info_lines.append(f"{dev}: ошибка SMART: {e}") return "\n".join(info_lines) def get_disk_usage(mount_point): output = run_cmd(["df", "-h", mount_point]) lines = output.splitlines() if len(lines) >= 2: parts = re.split(r"\s+", lines[1]) filesystem, size, used, avail, percent, mount = parts[:6] return f"Использование: {used} из {size} ({percent}) свободно {avail}" return "Не удалось определить использование места" def send_to_telegram(message): url = f"https://api.telegram.org/bot{TELEGRAM_TOKEN}/sendMessage" payload = { "chat_id": CHAT_ID, "text": message, "parse_mode": "HTML" } try: requests.post(url, json=payload, timeout=10) except requests.RequestException as e: print(f"Ошибка отправки Telegram: {e}") def main(): raid_output = get_raid_status(RAID_DEVICE) raid_info = parse_raid_details(raid_output) usage_info = get_disk_usage(MOUNT_POINT) # Автоматически определяем диски массива raid_disks = get_raid_disks_from_mdstat(RAID_DEVICE) if not raid_disks: # Fallback: пытаемся получить из вывода mdadm raid_disks = get_raid_disks(RAID_DEVICE) smart_info = get_smart_info(raid_disks) # Остальной код... msg = ( f"RAID-массив {RAID_DEVICE}\n" f"Уровень: {raid_info.get('Level', 'неизвестно')}\n" f"Размер: {raid_info.get('Size', 'неизвестно')}\n" f"Состояние: {raid_info.get('State', 'неизвестно')}\n" f"Активных дисков: {raid_info.get('Active', 'неизвестно')}\n" f"Сбойных дисков: {raid_info.get('Failed', 'неизвестно')}\n\n" f"Диски:\n{raid_disks}\n\n" f"SMART-состояние дисков ({len(raid_disks)} шт.):\n{smart_info}\n" f"Место:\n{usage_info}" ) send_to_telegram(msg) if __name__ == "__main__": main()