Scapy Python: Tạo Gói Tin Mạng Tùy Chỉnh — Kiểm Thử Giao Thức, Mô Phỏng Tấn Công và Debug Mạng Chuyên Sâu

Networking tutorial - IT technology blog
Networking tutorial - IT technology blog

Sáu tháng trước tôi đã bỏ Wireshark làm công cụ debug chính và chuyển gần như hoàn toàn sang Scapy. Không phải vì Wireshark kém — nó vẫn đang mở trong tab khác ngay lúc này — nhưng khi tôi thấy những gì Scapy có thể làm mà các công cụ capture thụ động đơn giản là không thể, cách tiếp cận xử lý sự cố mạng của tôi đã thay đổi hoàn toàn.

Scapy cho phép bạn xây dựng gói tin từ đầu, gửi chúng, chụp phản hồi và phân tích mọi thứ trong Python. Không chỉ quan sát — mà còn chủ động tạo và inject. Khoảng cách giữa “Tôi có thể thấy traffic” và “Tôi có thể tạo ra bất kỳ traffic nào tôi muốn” là rất lớn. Đặc biệt khi bạn đang debug một firewall rule hoạt động sai, hoặc kiểm tra xem server có thực sự phản hồi đúng với đầu vào bị lỗi không.

Bắt Đầu Nhanh — Chạy Được Trong 5 Phút

Cài Scapy bên trong virtual environment. Truy cập raw socket yêu cầu quyền root hoặc Administrator:

python3 -m venv scapy-env
source scapy-env/bin/activate
pip install scapy

Mở interactive shell để test nhanh:

sudo scapy

Gửi ICMP ping đầu tiên và nhận phản hồi:

from scapy.all import *

# Tạo một yêu cầu ICMP echo đơn giản
packet = IP(dst="8.8.8.8") / ICMP()
response = sr1(packet, timeout=2, verbose=False)

if response:
    print(f"Nhận phản hồi từ {response.src}, TTL={response.ttl}")
else:
    print("Không có phản hồi")

Đó là toàn bộ mô hình tư duy của Scapy trong ba dòng: xây dựng layer bằng /, gửi bằng sr1() (send + receive 1), kiểm tra kết quả. Mọi protocol stack chỉ là các layer xếp chồng lên nhau.

Các Layer Gói Tin, Sniffing và File PCAP

Cách Xếp Chồng Layer Hoạt Động

Mỗi giao thức trong Scapy là một class. Xếp chúng lại bằng toán tử /, và Scapy sẽ điền các giá trị mặc định hợp lý cho bất kỳ field nào bạn không chỉ định:

from scapy.all import *

# Kiểm tra các field của một layer
ls(TCP)

# Xây dựng Ethernet + IP + TCP + payload kiểu HTTP
pkt = Ether() / IP(dst="192.168.1.1") / TCP(dport=80, flags="S") / Raw(load="GET / HTTP/1.0\r\n\r\n")

# Hiển thị phân tích gói tin đầy đủ
pkt.show()

Chạy pkt.show() in ra mọi field — checksum, MAC nguồn, sequence number, tất cả. Lệnh đơn giản đó đã giúp tôi tiết kiệm hàng giờ trên một dự án mà gói tin đang bị corrupt âm thầm bởi một thiết bị NAT không tính lại checksum sau khi viết lại địa chỉ.

Chụp Traffic Trực Tiếp

Scapy cũng có thể chụp gói tin, với khả năng truy cập Python đầy đủ vào mọi field:

from scapy.all import sniff, IP, TCP

def analyze_packet(pkt):
    if pkt.haslayer(IP) and pkt.haslayer(TCP):
        src = pkt[IP].src
        dst = pkt[IP].dst
        dport = pkt[TCP].dport
        flags = pkt[TCP].flags
        print(f"{src} -> {dst}:{dport} [{flags}]")

# Chụp 20 gói tin TCP trên eth0
sniff(iface="eth0", filter="tcp", prn=analyze_packet, count=20)

Cú pháp BPF filter khớp chính xác với tcpdump — nếu bạn đã viết tcpdump filter trước đây, chúng hoạt động ở đây không cần thay đổi. Điểm khác biệt thực sự là callback. Python đầy đủ có nghĩa là bạn có thể ghi các gói tin phù hợp vào database, kích hoạt cảnh báo, sửa đổi và re-inject, hoặc kết nối với bất kỳ công cụ nào khác trong workflow của bạn.

Đọc và Ghi File PCAP

from scapy.all import rdpcap, wrpcap

# Tải một capture có sẵn
packets = rdpcap("capture.pcap")

# Kiểm tra các gói tin cụ thể
for pkt in packets:
    if pkt.haslayer("DNS"):
        print(pkt["DNS"].qd.qname)

# Ghi các gói tin đã lọc vào file mới
dns_packets = [p for p in packets if p.haslayer("DNS")]
wrpcap("dns_only.pcap", dns_packets)

Sử Dụng Nâng Cao — Kiểm Thử Giao Thức và Mô Phỏng Mạng

Kiểm Thử TCP Handshake

Một trong những script tôi dùng nhiều nhất kiểm tra xem một cổng cụ thể có mở không và đo thời gian round-trip SYN/SYN-ACK chính xác. Ping cho bạn biết host có thể kết nối được. Script này cho biết service có thực sự đang phản hồi không — và nhanh như thế nào:

from scapy.all import *
import time

def test_tcp_port(host, port):
    syn = IP(dst=host) / TCP(dport=port, sport=RandShort(), flags="S", seq=1000)
    
    start = time.time()
    response = sr1(syn, timeout=3, verbose=False)
    elapsed = (time.time() - start) * 1000
    
    if response is None:
        return f"{host}:{port} — không có phản hồi (bị lọc)"
    
    tcp_resp = response[TCP]
    if tcp_resp.flags == 0x12:  # SYN-ACK
        return f"{host}:{port} — MỞ, RTT={elapsed:.1f}ms"
    elif tcp_resp.flags == 0x14:  # RST-ACK
        return f"{host}:{port} — ĐÓNG"
    else:
        return f"{host}:{port} — cờ không mong đợi: {tcp_resp.flags}"

print(test_tcp_port("192.168.1.1", 22))
print(test_tcp_port("192.168.1.1", 443))

ARP Spoofing cho Kiểm Thử trong Lab

ARP spoofing là một trong những thứ hữu ích nhất để tái hiện trong lab kiểm soát khi kiểm thử phân đoạn mạng. Nếu bạn hiểu chính xác cuộc tấn công trông như thế nào ở cấp độ gói tin, việc xác minh rằng switch của bạn thực sự chặn nó sẽ trở nên đơn giản. Chỉ chạy lệnh này trong môi trường biệt lập nơi bạn có sự cho phép rõ ràng:

from scapy.all import Ether, ARP, sendp
import time

def arp_spoof(target_ip, spoof_ip, iface="eth0"):
    """
    Gửi ARP giả mạo tới mục tiêu, giả vờ chúng ta là spoof_ip.
    Chỉ dùng trong lab — cần có sự cho phép.
    """
    # Lấy địa chỉ MAC của mục tiêu trước
    target_mac = getmacbyip(target_ip)
    if not target_mac:
        print(f"Không thể phân giải MAC cho {target_ip}")
        return
    
    # Xây dựng gói ARP reply
    pkt = Ether(dst=target_mac) / ARP(
        op=2,          # ARP reply
        pdst=target_ip,
        hwdst=target_mac,
        psrc=spoof_ip  # Giả vờ là IP này
    )
    
    print(f"Đang gửi ARP giả mạo: {spoof_ip} đang ở MAC của chúng ta -> {target_ip}")
    sendp(pkt, iface=iface, verbose=False)

# Ví dụ: kiểm tra xem 192.168.1.2 có chấp nhận ARP giả mạo từ IP gateway không
arp_spoof("192.168.1.2", "192.168.1.1")

Sau khi gửi, kiểm tra bảng ARP trên mục tiêu bằng arp -n. Nếu Dynamic ARP Inspection được cấu hình đúng trên switch của bạn, gói tin sẽ bị drop trước khi đến. Nếu bảng ARP của mục tiêu thực sự thay đổi — bạn có một lỗ hổng đáng vá trước khi người khác tìm thấy.

Fuzzing Truy Vấn DNS

Các DNS client thông thường không cho bạn kiểm soát những gì đưa vào định dạng query trên wire. Scapy thì có:

from scapy.all import IP, UDP, DNS, DNSQR, sr1

def dns_query(server, name, qtype="A"):
    query = IP(dst=server) / UDP(dport=53) / DNS(
        rd=1,
        qd=DNSQR(qname=name, qtype=qtype)
    )
    resp = sr1(query, timeout=3, verbose=False)
    
    if resp and resp.haslayer(DNS):
        dns = resp[DNS]
        print(f"Mã phản hồi: {dns.rcode}")
        if dns.ancount > 0:
            for i in range(dns.ancount):
                print(f"  Câu trả lời: {dns.an[i].rdata}")
    else:
        print("Không nhận được phản hồi DNS")

dns_query("8.8.8.8", "example.com")
dns_query("8.8.8.8", "example.com", qtype="MX")

Mẹo Thực Tế Sau Sáu Tháng Dùng Hàng Ngày

Luôn Đặt verbose=False Trong Script

Output mặc định thì ổn khi dùng tương tác. Trong script tự động, nó sẽ làm ngập log của bạn. Thêm verbose=False vào mọi lời gọi send(), sr1(), và srp() bên ngoài interactive shell.

Dùng conf.iface Cho Interface Mặc Định

from scapy.all import conf

# Đặt một lần ở đầu script của bạn
conf.iface = "eth0"

# Tất cả các lần gửi tiếp theo sẽ tự động dùng interface này

Gửi Song Song với sendpfast()

Để kiểm thử tải, sendpfast() dùng tcpreplay bên dưới. Nó nhanh hơn đáng kể so với lặp send() — sự khác biệt có ý nghĩa khi bạn đang cố bão hòa một đường link hoặc stress-test khả năng xử lý gói tin của firewall:

from scapy.all import sendpfast, IP, ICMP

packets = [IP(dst=f"192.168.1.{i}") / ICMP() for i in range(1, 255)]
sendpfast(packets, mbps=10, loop=2, iface="eth0")

Lọc với BPF vs Python

BPF filter (tham số filter=) chạy ở cấp kernel trước khi gói tin thậm chí đến Python. Dùng chúng cho bất cứ thứ gì quan trọng về hiệu năng. Để lọc phía Python cho logic mà BPF không thể diễn đạt — khớp nội dung payload, phân tích có trạng thái trên nhiều gói tin, những thứ đó.

Chú Ý Checksum

Scapy tính lại checksum tự động khi gửi gói tin mới. Nhưng nếu bạn đang tải từ PCAP và phát lại, hãy xóa các field checksum rõ ràng để buộc tính lại:

del pkt[IP].chksum
del pkt[TCP].chksum
# Scapy sẽ tính lại vào lần truy cập hoặc gửi tiếp theo

Bỏ qua điều này tốn của tôi cả một buổi chiều. Tôi đang truy tìm thứ trông như một firewall bị lỗi — thực ra nó đang drop đúng các gói tin có checksum không hợp lệ từ script replay của tôi.

Chạy Trong tmux hoặc screen

Các capture dài và flood test nên được chạy trong một session tách biệt. Mất kết nối SSH giữa chừng sẽ làm hỏng file PCAP và để lại raw socket mở. Tôi luôn attach vào tmux new -s scapy-test trước khi bắt đầu bất cứ thứ gì chạy lâu hơn một phút.

Sau sáu tháng, cách tôi tiếp cận các vấn đề mạng đã thực sự thay đổi. Khi có gì đó hoạt động không như mong đợi, tôi không còn cố gắng tạo ra đúng điều kiện thông qua các thay đổi ở tầng ứng dụng và hy vọng đúng gói tin xuất hiện nữa. Tôi chỉ cần xây dựng gói tin mà mình muốn và gửi nó. Đó là sự khác biệt — bạn ngừng đuổi theo triệu chứng và đi thẳng vào nguyên nhân.

Share: