delete some old file

This commit is contained in:
2026-02-12 11:58:41 +01:00
parent 5ae2747835
commit 7f54f37978
7 changed files with 55 additions and 337 deletions

View File

@@ -1,109 +0,0 @@
# Microsoft Graph API
import asyncio
import json
from azure.identity.aio import ClientSecretCredential
from msgraph import GraphServiceClient
async def main():
# Helyettesítsd a saját adataiddal
tenant_id = "tenantID"
client_id = "clientID"
client_secret = "clientSecret"
# Credential létrehozása
credential = ClientSecretCredential(tenant_id, client_id, client_secret)
# Graph client létrehozása
graph_client = GraphServiceClient(credential)
try:
# Bejelentkezett felhasználó adatainak lekérése
print("Bejelentkezett felhasználó adatai:")
user = await graph_client.me.get()
print(f" Név: {user.display_name}")
print(f" Email: {user.mail or user.user_principal_name}")
print(f" ID: {user.id}")
# Jogosultságok (permission scopes) lekérése
print("\nJogosultságok ellenőrzése...")
# Service principal lekérése
sp = await graph_client.serviceprincipals.get(request_configuration={
"query_parameters": {"filter": f"appId eq '{client_id}'"}
})
if sp.value:
app_sp = sp.value[0]
print(f"\nAlkalmazás neve: {app_sp.display_name}")
print(f"App ID: {app_sp.app_id}")
# App role assignments lekérése
try:
app_role_assignments = await graph_client.serviceprincipals[app_sp.id].app_role_assignments.get()
print("\nEngedélyezett app role jogosultságok:")
if app_role_assignments.value:
for assignment in app_role_assignments.value:
print(f" - Resource ID: {assignment.resource_display_name or assignment.resource_id}")
print(f" App Role ID: {assignment.app_role_id}")
print(f" Principal Type: {assignment.principal_type}")
print(f" Granted To: {assignment.principal_display_name}")
print()
else:
print(" Nincsenek app role jogosultságok beállítva")
except Exception as e:
print(f" App role lekérési hiba: {e}")
# OAuth2 permission scopes lekérése
try:
oauth2_permission_grants = await graph_client.serviceprincipals[app_sp.id].oauth2_permission_grants.get()
print("\nEngedélyezett OAuth2 permission scopes:")
if oauth2_permission_grants.value:
for grant in oauth2_permission_grants.value:
print(f" - Client ID: {grant.client_id}")
print(f" Consent Type: {grant.consent_type}")
print(f" Scopes: {grant.scope}")
print()
else:
print(" Nincsenek OAuth2 permission scope-ok beállítva")
except Exception as e:
print(f" OAuth2 permission lekérési hiba: {e}")
# Current user permissions lekérése
try:
print("\nJelenlegi felhasználó jogosultságai:")
me = await graph_client.me.get()
# User memberof check
memberof = await graph_client.me.member_of.get()
if memberof.value:
print(" Csoporttagságok:")
for group in memberof.value:
print(f" - {group.display_name}")
# User app role assignments
user_app_roles = await graph_client.me.app_role_assignments.get()
if user_app_roles.value:
print(" Felhasználó app role jogosultságai:")
for role in user_app_roles.value:
print(f" - {role.resource_display_name}: {role.app_role_id}")
except Exception as e:
print(f" Felhasználói jogosultságok lekérési hiba: {e}")
# Alapvető API hívás teszt
print("\nAPI hívás teszt:")
users = await graph_client.users.get(request_configuration={
"query_parameters": {"top": 5}
})
print(f" Felhasználók száma: {len(users.value) if users.value else 0}")
except Exception as e:
print(f"Hiba történt: {e}")
finally:
await credential.close()
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,42 +0,0 @@
import requests
import json
import os
TENANT_ID = os.environ.get("AZURE_TENANT_ID")
CLIENT_ID = os.environ.get("AZURE_CLIENT_ID")
CLIENT_SECRET = os.environ.get("AZURE_CLIENT_SECRET")
SCOPE = "https://graph.microsoft.com/.default"
TOKEN_URL = f"https://login.microsoftonline.com/{TENANT_ID}/oauth2/v2.0/token"
def get_access_token():
headers = {
"Content-Type": "application/x-www-form-urlencoded"
}
data = {
"grant_type": "client_credentials",
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET,
"scope": SCOPE
}
try:
response = requests.post(TOKEN_URL, headers=headers, data=data)
response.raise_for_status()
token_data = response.json()
access_token = token_data.get("access_token")
if access_token:
print(access_token)
return access_token
else:
print(f"Error: 'access_token' not found in response. Response: {token_data}", file=os.sys.stderr)
return None
except requests.exceptions.RequestException as e:
print(f"Error making request to Azure AD: {e}", file=os.sys.stderr)
if hasattr(response, 'status_code'):
print(f"Response status code: {response.status_code}", file=os.sys.stderr)
print(f"Response body: {response.text}", file=os.sys.stderr)
return None
if __name__ == "__main__":
get_access_token()

View File

@@ -1,102 +0,0 @@
import os
import requests
import msal
# ==============================================================================
# KONFIGURATION
# ==============================================================================
# Daten aus der Azure App Registration
TENANT_ID = "caee3499-03f8-4175-9fa8-a935248d0ece"
CLIENT_ID = "3a08b279-1fc3-419f-a77e-31f12a0f65f7"
CLIENT_SECRET = "Rk-8Q~nJ.sZ-xUiNxtEDdzVgoFFosODLVHX~jdrh"
# Überwachtes Postfach
USER_EMAIL = "i.meszely@aps-hh.de"
# Microsoft Graph API Endpunkte
GRAPH_API_ENDPOINT = "https://graph.microsoft.com/v1.0"
AUTHORITY_URL = f"https://login.microsoftonline.com/{TENANT_ID}"
SCOPES = ["https://graph.microsoft.com/.default"]
def get_graph_api_token():
"""Ruft das Zugriffstoken für die Microsoft Graph API ab."""
app = msal.ConfidentialClientApplication(
client_id=CLIENT_ID,
authority=AUTHORITY_URL,
client_credential=CLIENT_SECRET
)
result = app.acquire_token_silent(scopes=SCOPES, account=None)
if not result:
result = app.acquire_token_for_client(scopes=SCOPES)
if "access_token" in result:
return result["access_token"]
else:
print("Fehler beim Abrufen des Tokens!")
print(result.get("error"))
print(result.get("error_description"))
return None
def list_inbox_emails(access_token):
"""Listet ungelesene E-Mails aus dem INBOX."""
headers = {"Authorization": f"Bearer {access_token}"}
# Nur notwendige Felder aus Effizienzgründen abrufen
messages_url = (
f"{GRAPH_API_ENDPOINT}/users/{USER_EMAIL}/mailFolders/inbox/messages?"
f"$filter=isRead eq false&"
f"$select=from,subject,receivedDateTime&"
f"$orderby=receivedDateTime desc"
)
response = requests.get(messages_url, headers=headers)
response.raise_for_status()
messages = response.json().get("value", [])
if not messages:
print("Keine neuen ungelesenen E-Mails im INBOX-Ordner.")
return
print(f"\n{len(messages)} ungelesene E-Mails gefunden:")
print("=" * 60)
for i, message in enumerate(messages, 1):
from_email = message.get("from", {}).get("emailAddress", {})
sender = from_email.get("address", "Unbekannt")
subject = message.get("subject", "Kein Betreff")
received = message.get("receivedDateTime", "")
# Datum formatieren
if received:
try:
from datetime import datetime
dt = datetime.fromisoformat(received.replace('Z', '+00:00'))
formatted_date = dt.strftime("%Y-%m-%d %H:%M")
except:
formatted_date = received[:19] # Einfache Formatierung bei Fehler
else:
formatted_date = "Unbekannt"
print(f"\n{i}. {sender}")
print(f" Betreff: {subject}")
print(f" Zeit: {formatted_date}")
def main():
"""Hauptfunktion."""
print("Graph API Token wird abgerufen...")
access_token = get_graph_api_token()
if not access_token:
return
try:
list_inbox_emails(access_token)
except requests.HTTPError as e:
print(f"Fehler beim Abrufen der E-Mails: {e}")
if "403" in str(e):
print("403 Fehler: Wahrscheinlich fehlende Mail.Read Berechtigung.")
print("Überprüfen Sie die API-Berechtigungen in der Azure App Registration!")
except Exception as e:
print(f"Unerwarteter Fehler: {e}")
if __name__ == "__main__":
main()

View File

@@ -1,44 +0,0 @@
import webbrowser
from datetime import datetime
import json
import os
import msal
GRAPH_API_ENDPOINT = 'https://graph.microsoft.com/v1.0'
def generate_access_token(app_id, scopes):
# Save Session Token as a token file
access_token_cache = msal.SerializableTokenCache()
# read the token file
if os.path.exists('ms_graph_api_token.json'):
access_token_cache.deserialize(open("ms_graph_api_token.json", "r").read())
token_detail = json.load(open('ms_graph_api_token.json',))
token_detail_key = list(token_detail['AccessToken'].keys())[0]
token_expiration = datetime.fromtimestamp(int(token_detail['AccessToken'][token_detail_key]['expires_on']))
if datetime.now() > token_expiration:
os.remove('ms_graph_api_token.json')
access_token_cache = msal.SerializableTokenCache()
# assign a SerializableTokenCache object to the client instance
client = msal.PublicClientApplication(client_id=app_id, token_cache=access_token_cache)
accounts = client.get_accounts()
if accounts:
# load the session
token_response = client.acquire_token_silent(scopes, accounts[0])
else:
# authetnicate your accoutn as usual
flow = client.initiate_device_flow(scopes=scopes)
print('user_code: ' + flow['user_code'])
webbrowser.open('https://microsoft.com/devicelogin')
token_response = client.acquire_token_by_device_flow(flow)
with open('ms_graph_api_token.json', 'w') as _f:
_f.write(access_token_cache.serialize())
return token_response
if __name__ == '__main__':
...

View File

@@ -0,0 +1,54 @@
import mysql.connector
from datetime import datetime, timedelta
# MySQL connection details
MYSQL_HOST = "10.102.1.65"
MYSQL_USER = "svc.emailtopdf"
MYSQL_PASSWORD = "zZUHrps62skLKfr9yQwQ"
MYSQL_DATABASE = "emailtopdf"
MYSQL_TABLE = "emailtopdf"
DATE_COLUMN = "timestamp"
def delete_old_records():
"""
Deletes records older than one week from the specified MySQL table.
"""
try:
# Connect to the MySQL database
conn = mysql.connector.connect(
host=MYSQL_HOST,
user=MYSQL_USER,
password=MYSQL_PASSWORD,
database=MYSQL_DATABASE
)
cursor = conn.cursor()
# Calculate the date one week ago
one_week_ago = datetime.now() - timedelta(days=7)
formatted_date = one_week_ago.strftime('%Y-%m-%d %H:%M:%S')
# SQL query to delete old records
query = f"DELETE FROM {MYSQL_TABLE} WHERE {DATE_COLUMN} < %s"
# Execute the query
cursor.execute(query, (formatted_date,))
# Get the number of deleted rows
deleted_rows = cursor.rowcount
# Commit the transaction
conn.commit()
print(f"Successfully deleted {deleted_rows} records older than one week.")
except mysql.connector.Error as err:
print(f"Error: {err}")
finally:
# Close the connection
if 'conn' in locals() and conn.is_connected():
cursor.close()
conn.close()
print("MySQL connection is closed.")
if __name__ == "__main__":
delete_old_records()

View File

@@ -0,0 +1 @@
mysql-connector-python

View File

@@ -1,40 +0,0 @@
import imaplib
import os
import ssl
IMAP_SERVER = "outlook.office365.com"
IMAP_PORT = 993
O365_IMAP_USERNAME="i.meszely@aps-hh.de"
O365_IMAP_PASSWORD="virgI6774#"
def test_imap_connection(username, password):
try:
# Create a default SSL context
ssl_context = ssl.create_default_context()
# Connect to the IMAP server over SSL
print(f"Connecting to IMAP server: {IMAP_SERVER}:{IMAP_PORT}...")
with imaplib.IMAP4_SSL(IMAP_SERVER, IMAP_PORT, ssl_context=ssl_context) as mail:
print("Connection established. Attempting to log in...")
mail.login(username, password)
print(f"Successfully logged in as {username}.")
mail.logout()
print("Logged out.")
return True
except imaplib.IMAP4.error as e:
print(f"IMAP login failed for {username}: {e}")
return False
except Exception as e:
print(f"An unexpected error occurred: {e}")
return False
if __name__ == "__main__":
# WARNING: Hardcoding credentials is not recommended for security reasons.
# Consider using environment variables or a secure configuration method.
username = O365_IMAP_USERNAME
password = O365_IMAP_PASSWORD
print(f"Testing IMAP connection for user: {username}")
if test_imap_connection(username, password):
print("\nIMAP connection test completed successfully!")
else:
print("\nIMAP connection test failed.")