Files
WPanda/Dev/network_scann/arp_scanner.py
2025-12-15 09:59:42 +01:00

314 lines
12 KiB
Python

#!/usr/bin/env python3
"""
Simple ARP Network Scanner
Discovers devices on local network using ARP protocol
"""
import subprocess
import re
import json
import time
from datetime import datetime
from pathlib import Path
class ARPScanner:
def __init__(self, config_file="config.json"):
self.config_file = Path(config_file)
self.config = self.load_config()
self.data_file = Path(self.config['database']['devices_file'])
self.known_devices = self.load_devices()
def load_config(self):
"""Load configuration from file"""
if self.config_file.exists():
try:
with open(self.config_file, 'r') as f:
return json.load(f)
except:
pass
# Default configuration
return {
"network": {
"subnet": "192.168.1.0/24",
"auto_detect": True
},
"scanning": {
"ping_timeout": 1,
"ping_delay": 0.01,
"max_threads": 10
},
"database": {
"devices_file": "known_devices.json"
},
"logging": {
"enabled": False,
"log_file": "network_scanner.log",
"log_level": "INFO"
}
}
def load_devices(self):
"""Load previously discovered devices"""
if self.data_file.exists():
try:
with open(self.data_file, 'r') as f:
return json.load(f)
except:
return {}
return {}
def save_devices(self):
"""Save devices to file"""
with open(self.data_file, 'w') as f:
json.dump(self.known_devices, f, indent=2)
def get_network_info(self):
"""Get network range from config or auto-detect"""
print(f"Auto-detect: {self.config['network']['auto_detect']}")
print(f"Config subnet: {self.config['network']['subnet']}")
if not self.config['network']['auto_detect']:
print(f"Using configured subnet: {self.config['network']['subnet']}")
return self.config['network']['subnet']
interface = None
try:
# Check if we're on Windows or Linux
import platform
system = platform.system().lower()
if system == 'windows':
# Windows network info
result = subprocess.run(['ipconfig'], capture_output=True,
text=True, encoding='utf-8', errors='ignore')
for line in result.stdout.split('\n'):
if 'IPv4' in line and 'Address' in line:
ip_match = re.search(r'(\d+\.\d+\.\d+\.\d+)', line)
if ip_match:
ip = ip_match.group(1)
return self.calculate_network_range(ip, 24)
else:
# Linux network info
result = subprocess.run(['ip', 'route', 'show', 'default'],
capture_output=True, text=True, encoding='utf-8', errors='ignore')
if result.returncode == 0:
line = result.stdout.strip()
parts = line.split()
for i, part in enumerate(parts):
if part == 'dev':
interface = parts[i+1]
elif part == 'via':
gateway = parts[i+1]
# Get network range
if interface:
result = subprocess.run(['ip', 'addr', 'show', interface],
capture_output=True, text=True, encoding='utf-8', errors='ignore')
for line in result.stdout.split('\n'):
if 'inet ' in line:
ip_match = re.search(r'inet (\d+\.\d+\.\d+\.\d+)/(\d+)', line)
if ip_match:
ip = ip_match.group(1)
cidr = int(ip_match.group(2))
return self.calculate_network_range(ip, cidr)
except:
pass
return self.config['network']['subnet'] # fallback to config
def calculate_network_range(self, ip, cidr):
"""Calculate network range from IP and CIDR"""
parts = ip.split('.')
if cidr == 24:
return f"{parts[0]}.{parts[1]}.{parts[2]}.0/24"
elif cidr == 16:
return f"{parts[0]}.{parts[1]}.0.0/16"
else:
return f"{parts[0]}.{parts[1]}.{parts[2]}.0/24"
def scan_arp_table(self):
"""Read current ARP table"""
try:
import platform
system = platform.system().lower()
if system == 'windows':
result = subprocess.run(['arp', '-a'], capture_output=True,
text=True, encoding='utf-8', errors='ignore')
else:
result = subprocess.run(['arp', '-n'], capture_output=True,
text=True, encoding='utf-8', errors='ignore')
devices = {}
for line in result.stdout.split('\n'):
if re.match(r'\d+\.\d+\.\d+\.\d+', line):
parts = line.split()
if len(parts) >= 2:
# Windows format: " 192.168.1.1 00-11-22-33-44-55 dynamic"
# Linux format: "192.168.1.1 ether 00:11:22:33:44:55 C eth0"
ip = parts[0]
mac = parts[1] if len(parts) > 1 else ""
# Clean MAC address format
mac = mac.replace('-', ':').upper()
if mac and mac != '(incomplete)' and len(mac) == 17:
devices[ip] = {
'mac': mac,
'last_seen': datetime.now().isoformat()
}
return devices
except:
return {}
def ping_sweep(self, network_range):
"""Ping sweep to populate ARP table"""
print("Performing ping sweep...")
import platform
system = platform.system().lower()
# Extract network part
if '/24' in network_range:
base = network_range.replace('/24', '')
base_parts = base.split('.')
ping_timeout = self.config['scanning']['ping_timeout']
ping_delay = self.config['scanning']['ping_delay']
# Only ping first 50 IPs for faster testing
for i in range(1, 51):
ip = f"{base_parts[0]}.{base_parts[1]}.{base_parts[2]}.{i}"
try:
if system == 'windows':
subprocess.run(['ping', '-n', '1', '-w', str(ping_timeout * 1000), ip],
capture_output=True, timeout=1, encoding='utf-8', errors='ignore')
else:
subprocess.run(['ping', '-c', '1', '-W', str(ping_timeout), ip],
capture_output=True, timeout=1, encoding='utf-8', errors='ignore')
except:
pass
time.sleep(ping_delay) # Configurable delay
def filter_devices_by_network(self, devices, network_range):
"""Filter devices to only include those in the specified network"""
if '/24' in network_range:
base = network_range.replace('/24', '')
base_parts = base.split('.')
filtered_devices = {}
for ip, info in devices.items():
ip_parts = ip.split('.')
if (len(ip_parts) == 4 and
ip_parts[0] == base_parts[0] and
ip_parts[1] == base_parts[1] and
ip_parts[2] == base_parts[2]):
filtered_devices[ip] = info
return filtered_devices
return devices
def scan_network(self):
"""Full network scan"""
print("Starting ARP network scan...")
# Get network range
network_range = self.get_network_info()
print(f"Scanning network: {network_range}")
# First read existing ARP table
print("Reading existing ARP table...")
current_devices = self.scan_arp_table()
print(f"Found {len(current_devices)} devices in ARP table")
# Filter devices to only include those in our network range
if current_devices:
current_devices = self.filter_devices_by_network(current_devices, network_range)
print(f"Found {len(current_devices)} devices in target network")
# If no devices found, try ping sweep
if not current_devices:
print("No devices found in target network, trying ping sweep...")
self.ping_sweep(network_range)
current_devices = self.scan_arp_table()
current_devices = self.filter_devices_by_network(current_devices, network_range)
print(f"After ping sweep: {len(current_devices)} devices found")
return current_devices
def compare_devices(self, current_devices):
"""Compare current devices with known ones"""
new_devices = {}
returned_devices = {}
for ip, info in current_devices.items():
if ip not in self.known_devices:
new_devices[ip] = info
info['first_seen'] = datetime.now().isoformat()
info['status'] = 'NEW'
else:
if self.known_devices[ip].get('status') == 'offline':
returned_devices[ip] = info
info['status'] = 'returned'
else:
info['status'] = 'online'
# Check for offline devices
offline_devices = {}
for ip in self.known_devices:
if ip not in current_devices:
offline_devices[ip] = self.known_devices[ip]
offline_devices[ip]['status'] = 'offline'
return new_devices, returned_devices, offline_devices
def update_devices(self, current_devices):
"""Update known devices database"""
for ip, info in current_devices.items():
self.known_devices[ip] = info
def run(self):
"""Main scan function"""
print(f"Scan started at: {datetime.now()}")
current_devices = self.scan_network()
if current_devices:
new_devices, returned_devices, offline_devices = self.compare_devices(current_devices)
# Update database
self.update_devices(current_devices)
self.save_devices()
# Results
print(f"\n=== Scan Results ===")
print(f"Total devices found: {len(current_devices)}")
if new_devices:
print(f"\n🆕 NEW DEVICES ({len(new_devices)}):")
for ip, info in new_devices.items():
print(f" {ip} - {info['mac']}")
if returned_devices:
print(f"\n🔄 RETURNED DEVICES ({len(returned_devices)}):")
for ip, info in returned_devices.items():
print(f" {ip} - {info['mac']}")
if offline_devices:
print(f"\n⚫ OFFLINE DEVICES ({len(offline_devices)}):")
for ip, info in offline_devices.items():
print(f" {ip} - {info['mac']}")
if not new_devices and not returned_devices:
print("\n✅ No new devices detected")
else:
print("No devices found")
if __name__ == "__main__":
scanner = ARPScanner()
scanner.run()