98 lines
3.4 KiB
Python
98 lines
3.4 KiB
Python
import smbclient
|
|
import smbclient.path
|
|
from datetime import datetime, timezone, timedelta
|
|
import requests
|
|
import sys
|
|
|
|
# --- KONFIGURÁCIÓ ---
|
|
# Töltsd ki a saját adataiddal!
|
|
SERVER = "10.102.1.56" # Windows szerver IP
|
|
SHARE = "D$" # Megosztás neve (C$, D$, stb.)
|
|
FOLDER_PATH = "Schnittstellen/Bremen" # Útvonal a megosztáson belül (NE tegyél elé perjelet!)
|
|
USERNAME = "user" # Windows felhasználónév
|
|
PASSWORD = "password" # Windows jelszó
|
|
|
|
# Küszöbértékek
|
|
MAX_FILES = 2
|
|
MAX_AGE_MINUTES = 45
|
|
|
|
# n8n integráció
|
|
N8N_WEBHOOK_URL = "https://n8n.domain.hu/webhook/folder-alert"
|
|
TEST_MODE = True # True: csak kiírja az eredményt, nem küld n8n üzenetet
|
|
|
|
def check_folder():
|
|
print(f"--- Ellenőrzés indítása: {datetime.now()} ---")
|
|
print(f"Szerver: {SERVER}, Mappa: {SHARE}\\{FOLDER_PATH}")
|
|
|
|
try:
|
|
# Kapcsolat felépítése
|
|
smbclient.register_session(SERVER, username=USERNAME, password=PASSWORD)
|
|
|
|
# Windows-stílusú elérési út összeállítása
|
|
full_path = f"\\\\{SERVER}\\{SHARE}\\{FOLDER_PATH.replace('/', '\\')}"
|
|
|
|
if not smbclient.path.exists(full_path):
|
|
print(f"HIBA: A megadott elérési út nem létezik: {full_path}")
|
|
return
|
|
|
|
now = datetime.now(timezone.utc)
|
|
old_files = []
|
|
|
|
# Fájlok listázása és ellenőrzése
|
|
print("Fájlok vizsgálata...")
|
|
for file_entry in smbclient.scandir(full_path):
|
|
if file_entry.is_file():
|
|
# Utolsó módosítás lekérése (st_mtime)
|
|
mtime = datetime.fromtimestamp(file_entry.stat().st_mtime, timezone.utc)
|
|
age = now - mtime
|
|
age_minutes = int(age.total_seconds() / 60)
|
|
|
|
print(f" - {file_entry.name}: {age_minutes} perce módosítva")
|
|
|
|
if age > timedelta(minutes=MAX_AGE_MINUTES):
|
|
old_files.append({
|
|
"name": file_entry.name,
|
|
"age_minutes": age_minutes
|
|
})
|
|
|
|
# Logika: Ha több mint MAX_FILES régi fájl van
|
|
if len(old_files) > MAX_FILES:
|
|
print(f"\n[!] FIGYELEM: {len(old_files)} db régi fájl található!")
|
|
if not TEST_MODE:
|
|
send_n8n_alert(len(old_files), old_files)
|
|
else:
|
|
print("[Teszt mód] n8n hívás kihagyva.")
|
|
else:
|
|
print(f"\n[OK] Minden rendben. ({len(old_files)} régi fájl)")
|
|
|
|
except Exception as e:
|
|
print(f"\n[!] Váratlan hiba történt: {e}")
|
|
finally:
|
|
# Kapcsolat lezárása
|
|
try:
|
|
smbclient.reset_connection(SERVER)
|
|
except:
|
|
pass
|
|
print("--- Ellenőrzés vége ---\n")
|
|
|
|
def send_n8n_alert(count, files):
|
|
payload = {
|
|
"status": "warning",
|
|
"server": SERVER,
|
|
"folder": FOLDER_PATH,
|
|
"old_files_count": count,
|
|
"files": files,
|
|
"timestamp": datetime.now().isoformat()
|
|
}
|
|
try:
|
|
response = requests.post(N8N_WEBHOOK_URL, json=payload, timeout=10)
|
|
if response.status_code == 200:
|
|
print("n8n értesítés sikeresen elküldve.")
|
|
else:
|
|
print(f"n8n hiba: HTTP {response.status_code}")
|
|
except Exception as e:
|
|
print(f"Nem sikerült elérni az n8n-t: {e}")
|
|
|
|
if __name__ == "__main__":
|
|
check_folder()
|