Files
gmail-forwarder/app.py
2025-11-01 13:39:52 +03:00

171 lines
6.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import time
import traceback
import re
import email
import logging
from io import BytesIO
from imapclient import IMAPClient
from telegram import Bot
from email.header import decode_header
from email.utils import collapse_rfc2231_value
from dotenv import load_dotenv
# === ЗАГРУЗКА КОНФИГА ===
load_dotenv()
GMAIL_USER = os.getenv("GMAIL_USER")
GMAIL_PASS = os.getenv("GMAIL_PASS")
TOKEN = os.getenv("TOKEN")
CHAT_ID = os.getenv("CHAT_ID")
ALLOWED_SENDERS = [
addr.strip().lower()
for addr in os.getenv("ALLOWED_SENDER", "").split(",")
if addr.strip()
]
CHECK_INTERVAL = int(os.getenv("CHECK_INTERVAL", 60))
# === НАСТРОЙКА ЛОГГЕРА ===
os.makedirs("logs", exist_ok=True)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s | %(levelname)-8s | %(message)s",
handlers=[
logging.FileHandler("logs/app.log", encoding="utf-8"),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# === TELEGRAM BOT ===
bot = Bot(token=TOKEN)
# === ВСПОМОГАТЕЛЬНЫЕ ФУНКЦИИ ===
def escape_markdown(text):
if not text:
return ""
escape_chars = r'_*[]()~`>#+-=|{}.!'
return re.sub(f'([{re.escape(escape_chars)}])', r'\\\1', text)
def decode_mime_words(s):
if not s:
return ""
decoded_fragments = decode_header(s)
result = ""
for fragment, encoding in decoded_fragments:
if isinstance(fragment, bytes):
encoding = encoding or "utf-8"
try:
fragment = fragment.decode(encoding, errors="ignore")
except Exception as e:
logger.debug(f"Ошибка декодирования MIME-фрагмента: {e}")
fragment = fragment.decode("utf-8", errors="ignore")
result += fragment
return result
def get_filename(part):
filename = part.get_filename()
if filename:
filename = collapse_rfc2231_value(filename)
filename = decode_mime_words(filename)
return filename
# === ОСНОВНАЯ ЛОГИКА ===
def fetch_new_emails():
try:
logger.info("Подключение к Gmail IMAP...")
with IMAPClient("imap.gmail.com", ssl=True) as server:
server.login(GMAIL_USER, GMAIL_PASS)
logger.info("Успешный вход в Gmail")
server.select_folder("INBOX")
messages = server.search(["UNSEEN"])
logger.info(f"Найдено новых писем: {len(messages)}")
for uid, msg_data in server.fetch(messages, ["RFC822"]).items():
try:
msg = email.message_from_bytes(msg_data[b"RFC822"])
from_raw = decode_mime_words(msg.get("from", ""))
subject = decode_mime_words(msg.get("subject", ""))
logger.info(f"Обработка письма: '{subject}' от {from_raw}")
# Фильтр по адресу
if ALLOWED_SENDERS:
if not any(allowed in from_raw.lower() for allowed in ALLOWED_SENDERS):
logger.info(f"Пропущено письмо от неразрешённого адреса: {from_raw}")
continue
text = ""
attachments = []
if msg.is_multipart():
for part in msg.walk():
content_type = part.get_content_type()
disposition = part.get("Content-Disposition", "")
if content_type == "text/plain" and "attachment" not in disposition:
try:
text += part.get_payload(decode=True).decode(errors="ignore")
except Exception as e:
logger.warning(f"Ошибка чтения текстовой части: {e}")
elif "attachment" in disposition:
filename = get_filename(part)
payload = part.get_payload(decode=True)
if filename and payload:
attachments.append((filename, payload))
else:
text = msg.get_payload(decode=True).decode(errors="ignore")
# Формируем сообщение
message_text = f"📧 *{escape_markdown(subject)}*\nОт: {escape_markdown(from_raw)}\n\n{escape_markdown(text[:3500])}"
# Отправка текста
try:
bot.send_message(chat_id=CHAT_ID, text=message_text, parse_mode="MarkdownV2")
logger.info("Сообщение успешно отправлено в Telegram")
except Exception as e:
logger.error(f"Ошибка при отправке текста письма в Telegram: {e}")
# Отправка вложений
for filename, payload in attachments:
try:
bio = BytesIO(payload)
bio.name = filename
bot.send_document(chat_id=CHAT_ID, document=bio)
logger.info(f"Вложение отправлено: {filename}")
except Exception as e:
logger.error(f"Не удалось отправить вложение {filename}: {e}")
# Отмечаем как прочитанное
server.add_flags(uid, [b'\\Seen'])
logger.info("Письмо отмечено как прочитанное")
except Exception as e:
logger.error(f"Ошибка при обработке письма UID {uid}: {e}")
traceback.print_exc()
server.logout()
logger.info("Отключено от Gmail")
except Exception as e:
logger.error(f"Ошибка при подключении к Gmail: {e}")
traceback.print_exc()
# === ТОЧКА ВХОДА ===
if __name__ == "__main__":
logger.info("🚀 Сервис пересылки писем запущен")
while True:
try:
fetch_new_emails()
except Exception as e:
logger.error(f"Необработанная ошибка: {e}")
traceback.print_exc()
logger.info(f"Ожидание {CHECK_INTERVAL} секунд до следующей проверки...")
time.sleep(CHECK_INTERVAL)