#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Author: Hendrik Schutter, mail@hendrikschutter.com Date of creation: 2023/02/26 Date of last modification: 2023/02/26 """ from subprocess import PIPE, Popen from mac_vendor_lookup import MacLookup import smtplib import email.utils from email.mime.text import MIMEText import time import config def send_alert_mail(mac_addr): server = smtplib.SMTP(config.mail_server_domain, config.mail_server_port) server.starttls() server.login(config.mail_from_address, config.mail_server_password) try: mac_vendor = MacLookup().lookup(mac_addr) except: mac_vendor = " Vendor not found" timeLong = time.strftime("%d.%m.%Y %H:%M:%S") body = "Hallo Admin,\n\nneue MAC-Adresse gefunden!\n\nDatum: "+ timeLong + "\nMAC: " + str(mac_addr) +"\nVendor: " + mac_vendor + "\n\nVersion: 1.0 - 26.02.2023" msg = MIMEText(body) msg['Subject'] = 'New MAC found: ' + str(mac_addr) + " - " + mac_vendor msg['To'] = email.utils.formataddr((config.mail_to_name, config.mail_to_address )) msg['From'] = email.utils.formataddr((config.mail_from_name, config.mail_from_address)) server.sendmail(config.mail_from_address, config.mail_to_address , msg.as_string()) server.quit() def query_mac_from_switch(): mac_addresses = list() command = "snmpwalk -v 2c -O vqe -c " + config.switch_snmp_community + " " + config.switch_ip_addr + " 1.3.6.1.2.1.17.4.3.1.1" with Popen(command, stdout=PIPE, stderr=None, shell=True) as process: output = process.communicate()[0].decode("utf-8") for mac in output.split("\n"): mac = mac.replace(" ", ":") mac = mac.replace('"', "") mac = mac[0:-1] if(len(mac) == 17): mac_addresses.append(mac) return mac_addresses def watch(): alerted_mac_addresses = list() while(True): mac_addresses = query_mac_from_switch() for mac_address in mac_addresses: if mac_address not in config.trusted_mac_addresses: if mac_address not in alerted_mac_addresses: alerted_mac_addresses.append(mac_address) send_alert_mail(mac_address) time.sleep(10) if __name__ == "__main__": print("updating MAC vendors ...") MacLookup().update_vendors() print("update done\n") try: watch() except: pass