-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
61 lines (50 loc) · 2.17 KB
/
main.py
File metadata and controls
61 lines (50 loc) · 2.17 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
#Importing Libraries
from scapy.all import sniff, IP, TCP, UDP
import csv
from datetime import datetime
import os
from util import build_filter
from analyzer import analyze_packet
#Setting up Log File
LOG_FILE = "data/packet_log.csv"
os.makedirs("data", exist_ok=True)
if not os.path.exists(LOG_FILE):
with open(LOG_FILE, mode="w", newline="") as f:
writer = csv.writer(f)
writer.writerow(["Timestamp", "Source IP", "Destination IP", "Protocol", "Length"])
#Packet Processing Function
def process_packet(packet):
if packet.haslayer(IP):
ip_layer = packet[IP]
protocol = "TCP" if packet.haslayer(TCP) else "UDP" if packet.haslayer(UDP) else "Other"
row = [
datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
ip_layer.src,
ip_layer.dst,
protocol,
len(packet)
]
with open(LOG_FILE, "a", newline="") as f:
writer = csv.writer(f)
writer.writerow(row)
#analyze packet for suspicious activity
analyze_packet(ip_layer.src, ip_layer.dst, protocol, len(packet))
print(f"[{row[0]}] {row[1]} -> {row[2]} ({row[3]}, {row[4]} bytes)")
if __name__ == "__main__":
print("Starting packet capture... Press Ctrl+C to stop.")
#Optional Filters for Users
proto = input("Enter protocol to filter (tcp/udp) or leave blank for all: ").strip() or None
src = input("Enter source IP to filter or leave blank for all: ").strip() or None
dst = input("Enter destination IP to filter or leave blank for all: ").strip() or None
port = input("Enter port to filter or leave blank for all: ").strip() or None
port = int(port) if port else None
#Filter Expression
bpf_filter = build_filter(protocol=proto, src_ip=src, dst_ip=dst, port=port)
if bpf_filter:
print(f"Applying filter: {bpf_filter}")
else:
print("No filter applied, capturing all packets.")
try:
sniff(prn=process_packet, store=False, filter=bpf_filter, timeout=30) #it will not store packets in memory, timeout after 30 seconds
except KeyboardInterrupt:
print("Packet capture stopped by user")