nodhcp/nodhcp/lib/pcap.py

146 lines
5.1 KiB
Python

import ctypes as c
ARP_REQUEST = "\x08\x06\x00\x01\x08\x00\x06\x04\x00\x01"
ARP_REPLY = "\x08\x06\x00\x01\x08\x00\x06\x04\x00\x02"
class SockAddr(c.Structure):
_fields_ = [("sa_family", c.c_ushort), ("sa_data", c.c_char * 14)]
class PcapAddr(c.Structure):
pass
PcapAddr._fields_ = [("next", c.POINTER(PcapAddr)),
("addr", c.POINTER(SockAddr)),
("netmask", c.POINTER(SockAddr)),
("sockaddr", c.POINTER(SockAddr)),
("dstaddr", c.POINTER(SockAddr))]
class PcapIfT(c.Structure):
pass
PcapIfT._fields_ = [("next", c.POINTER(PcapIfT)),
("name", c.c_char_p),
("description", c.c_char_p),
("addresses", c.POINTER(PcapAddr)),
("flags", c.c_uint)]
class BpfInsn(c.Structure):
_fields_ = [("code", c.c_ushort), ("jt", c.c_char), ("jf", c.c_char), ("k", c.c_uint32)]
class BpfProgram(c.Structure):
_fields_ = [("bf_len", c.c_uint), ("bpf_insn", c.POINTER(BpfInsn))]
class Timeval(c.Structure):
_fields_ = [("tv_sec", c.c_long), ("tv_usec", c.c_long)]
class PcapPktHdr(c.Structure):
_fields_ = [("ts", Timeval), ("caplen", c.c_uint32), ("len", c.c_uint32)]
PcapCallback = c.CFUNCTYPE(c.py_object, c.POINTER(PcapPktHdr), c.POINTER(c.c_byte))
PCAP_IF_LOOPBACK = 1
LINKTYPE_ETHERNET = 1
PCAP_NETMASK_UNKNOWN = 0xffffffff
NULL = 0
CAPTURE_LIMIT = 2048
#MAC_ADDR_LEN = 6
#class EthernetHeader(c.Structure):
# _fields_ = [("dst", c.c_ubyte * MAC_ADDR_LEN),
# ("src", c.c_ubyte * MAC_ADDR_LEN),
# ("type", c.c_ushort)]
MAC_ADDR_LEN = 100
class EthernetHeader(c.Structure):
_fields_ = [("foo", c.c_uint32 * MAC_ADDR_LEN)]
# pcap.pcap_lookupdev
# pcap.pcap_sendpacket
class Interface:
def __init__(self, name, pcap, handle):
self._name = name
self.pcap = pcap
self.handle = handle
def __del__(self):
self.pcap.close_device(self.handle)
@property
def name(self):
return self._name.decode("utf-8", "ignore")
def collect_address(self):
self.pcap.set_filter(self.handle, "(ip or ip6) and not net 127.0.0.0/8 and not net fe80::/64")
i = 0
self.pcap.capture_packets(self.handle)
#for packet in self.pcap.capture_packets(handle):
# if i > 10:
# break
# i += 1
# print(packet)
def buf_to_str(buf):
return "".join([s for s in buf])
class Pcap():
def __init__(self, lib):
self._devs_p = c.POINTER(PcapIfT)
self.lib = lib
def assert_pcap(self, return_code, handle):
if return_code < 0:
error = self.lib.pcap_geterr(handle)
raise OSError(c.c_char_p(error).value.decode("utf-8", "ignore"))
def find_devices(self):
err = c.create_string_buffer(255)
devs = self._devs_p()
result = self.lib.pcap_findalldevs(c.byref(devs), err)
if result < 0:
raise OSError(buf_to_str(error_buf))
if not devs:
return []
found_devs = []
this_dev = devs
while True:
dev = this_dev.contents
if not (dev.flags & PCAP_IF_LOOPBACK):
handle = self.open_live(dev.name)
if handle != NULL:
if self.lib.pcap_datalink(handle) == LINKTYPE_ETHERNET:
interface = Interface(dev.name, self, handle)
found_devs.append(interface)
else:
self.close_device(handle)
if not dev.next:
self.lib.pcap_freealldevs(devs)
return found_devs
this_dev = dev.next
def close_device(self, device):
self.lib.pcap_close(device)
def open_live(self, name):
err = c.create_string_buffer(255)
return self.lib.pcap_open_live(name, CAPTURE_LIMIT, 1, 1000, err)
def format_mac(self, dst):
return "%x:%x:%x:%x:%x:%x" % \
(c.c_ubyte(dst[0]).value,c.c_ubyte(dst[1]).value,c.c_ubyte(dst[2]).value,c.c_ubyte(dst[3]).value,c.c_ubyte(dst[4]).value,c.c_ubyte(dst[5]).value)
def handle_package(self, header, package):
caplen = header.contents.caplen
header_len = header.contents.len
#assert caplen <= header_len
#assert caplen >= c.sizeof(EthernetHeader)
ethernet_header = c.cast(package, c.POINTER(EthernetHeader)).contents
import pry; pry()
def capture_packets(self, handle):
cb = PcapCallback(self.handle_package)
self.lib.pcap_loop(handle, 3, cb, c.py_object(self))
def set_filter(self, handle, filter):
bpf_filter = BpfProgram()
filter_string = c.create_string_buffer(filter.encode("utf-8"))
status = self.lib.pcap_compile(handle, c.byref(bpf_filter), filter_string, 1, PCAP_NETMASK_UNKNOWN)
self.assert_pcap(status, handle)
status = self.lib.pcap_setfilter(handle, c.byref(bpf_filter))
self.assert_pcap(status, handle)
#static u_int16_t ether_packet(u_char *args, const struct pcap_pkthdr *pkthdr, co nst u_char *p) {
# struct ether_header *eptr = (struct ether_header*)p;
# assert(pkthdr->caplen <= pkthdr->len);
# assert(pkthdr->caplen >= sizeof(struct ether_header));
# return eptr->ether_type;
# }