HOME/Articles/

socket example test raw socket (snippet)

Article Outline

Python socket example 'test raw socket'

Functions in program:

  • def main():
  • def mac_to_str(mac_addr):

Modules used in program:

  • import struct
  • import ipaddress
  • import io
  • import ctypes
  • import time
  • import socket

python test raw socket

Python socket example: test raw socket

import socket
import time
import ctypes
import io
import ipaddress
import struct

ETH_P_ALL = 3
interface = "ens33"

# イーサネットタイプ
IPv4 = 0x0800
IPv6 = 0x86dd
ARP  = 0x806

# IPプロトコルタイプ
TCP =  6
UDP = 17

class EtherNetFrame(ctypes.BigEndianStructure):
    _fields_ = (
        #("preamble", ctypes.c_ubyte*7),
        #("sfd", ctypes.c_ubyte),
        ("d_host", ctypes.c_uint8*6),
        ("s_host", ctypes.c_uint8*6),
        ("type", ctypes.c_uint16),
        ("payload", ctypes.c_uint8*1500)
    )

class Ipv4Headr(ctypes.BigEndianStructure):
    _fields_ = (
        ("vit", ctypes.c_uint16),
        ("lenth", ctypes.c_uint16),
        ("id", ctypes.c_uint16),
        ("flag", ctypes.c_uint16),
        ("ttlp", ctypes.c_uint8),
        ("type", ctypes.c_uint8),
        ("chksm", ctypes.c_uint16),
        ("s_ip", ctypes.c_uint32),
        ("d_ip", ctypes.c_uint32),
        ("data", ctypes.c_uint8*1480)
    )

class UdpHeadr(ctypes.BigEndianStructure):
    _fields_ = (
        ("s_port", ctypes.c_uint16),
        ("d_port", ctypes.c_uint16),
        ("len", ctypes.c_uint16),
        ("chksm", ctypes.c_uint16),
        ("data", ctypes.c_uint8*1472)
    )
class TcpHeadr(ctypes.BigEndianStructure):
#class TcpHeadr(ctypes.Structure):
    _fields_ = (
        ("s_port", ctypes.c_uint16),
        ("d_port", ctypes.c_uint16),
        ("seq", ctypes.c_uint32),
        ("ack", ctypes.c_uint32),
        ("drc", ctypes.c_uint16),
        ("win", ctypes.c_uint16),
        ("chck", ctypes.c_uint16),
        ("ptr", ctypes.c_uint16),
        ("data", ctypes.c_uint8*1472)
    )
# バイトデータ形式のmacアドレスを文字列に変換する。
def mac_to_str(mac_addr):
    r = ""
    for i in range(6):
        val = mac_addr[i]
        val2 = '{0:02x}'.format(val)
        r = r + val2
    return r

def main():
    s=socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.htons(ETH_P_ALL))
    mr_ifindex = socket.if_nametoindex(interface)   # c_type is int
    mr_type = 1                     # c_type is unsigned short
    mr_alen = 0                                     # c_type is unsigned short
    mr_address = b'\0'                              # c_type is unsigned char[8]
    packet_mreq = struct.pack('iHH8s',
                            mr_ifindex,
                            mr_type,
                            mr_alen,
                            mr_address)

    s.setsockopt(1, 1, packet_mreq)
    s.bind((interface, 0))
    while True:
        data = s.recv(1514)
        etf = EtherNetFrame()
        # 受信データを変換
        buffer = io.BytesIO(data)
        buffer.readinto(etf)

        # 送信先macアドレスを取得
        d_host_adr = mac_to_str(etf.d_host)
        # 送信元macアドレスを取得
        s_host_adr = mac_to_str(etf.s_host)
        # タイプを取得する
        type = etf.type
        # ペイロードを取得する。
        payload = etf.payload

        # ipv4パケットのみ処理を行う
        if type == IPv4:
            buffer = io.BytesIO(payload)
            ipv4 = Ipv4Headr()
            buffer.readinto(ipv4)
            # 送信元IPアドレスを取得する。
            src_ip = ipaddress.IPv4Address(ipv4.s_ip)
            # 送信先IPアドレスを取得する。
            dist_ip = ipaddress.IPv4Address(ipv4.d_ip)
            # ipヘッダからプロトコルタイプを取得する
            if ipv4.type == TCP:
                buffer = io.BytesIO(ipv4.data)
                tcpdata = TcpHeadr()
                buffer.readinto(tcpdata)
                print(s_host_adr, d_host_adr, src_ip, dist_ip, tcpdata.s_port, tcpdata.d_port)
    s.close()
if __name__ == "__main__":
    main()