I'm trying to create a packet sniffer using pcapy and impacket. I'm stuck with data extraction phase. Unfortunately impacket is not properly documented. At least i could n't find one. Could anyone tel me where to find the documentation or what functions i could use to extract data from captured packet?
edit
my current code
import datetime
import pcapy
import sys
from impacket.ImpactPacket import *
from impacket.ImpactDecoder import *
def main(argv):
dev='ppp0'
print "Sniffing device " + dev
cap = pcapy.open_live(dev , 65536 , 1 , 0)
while(1) :
try:
(header, packet) = cap.next()
eth= LinuxSLLDecoder().decode(packet)
ip=eth.child() #internet layer
trans=ip.child()#transport layer
try:
print 'protocol=',
if ip.get_ip_p() == UDP.protocol:
print 'UDP'
if ip.get_ip_p() == TCP.protocol:
print 'TCP','port=',trans.get_th_dport()
print trans.child()
if ip.get_ip_p() == ICMP.protocol:
print 'ICMP'
print 'src=',ip.get_ip_src(),'dest=',ip.get_ip_dst()
print ''
except:
pass
except pcapy.PcapError:
continue
if __name__ == "__main__":
main(sys.argv)
Sample Output
src= xxx.xxx.xxx.xx dest= xx.xxx.xx.xx
protocol= TCP port= 443
1703 0300 2400 0000 0000 0000 07e2 a2a5 ....$...........
09fe 5b15 3cf1 803d 0c83 8ada 082e 8269 ..[.<..=.......i
0007 8b33 7d6b 5c1a 01 ...3}k\..
What i want to do is extract more data, For example extract the url (if there is a url in packet)
Here is an example for a syn-port scanner with pcap and python and impacket. Maybe you can tak the important parts out of it.
'''
synscan.py ...
see scan.py for parameters
this works extremely well an a windows that likes to communicate
scanning hosts in same ethernet is possible
scanning host not within the same ethernet may success but does not need to
many algorithms were tried
- raw socket support needs higher previleges
and is impossible because windows does not allow to sniff with them
or to submit sniffable packets
-> not implemented here
"Why do you need special libraries for TCP-SYN scans?"
thats why.
using pcap the program is devided into phases
usually it succeeds in phase 1.
phase 0:
add targets and phase 1
phase 1+: (parallel)
send arp request to resolve target
bombard it with the right packets
sniff
phase 2:
send out udp to resolve mac address by sniffing
send out raw socket tcp syn requests (need higher previleges) optional
phase 3:
if not yet succeeded in phase 1: = mac not found
bombard all macs with packets
phase 4:
bombard broadcasting [mac ff:ff:ff:ff:ff:ff] with packets
phase 5:
clean up - no use
use DEBUG_PHASE to show phases
currently only ipv4 is supported
'''
import sys
import time
import thread
import pcap # pcapy
import impacket
import random
import impacket.ImpactDecoder as ImpactDecoder
import impacket.ImpactPacket as ImpactPacket
import array
import scan
from scan import *
DEFAULT_SOCKET_TIMEOUT = 20
NOTIFY_TIMEOUT = 2
# argument incdeces for socket.socket(...)
SOCK_INIT_FAMILY = 0
SOCK_INIT_TYPE = 1
SOCK_INIT_PROTO = 2
STATE_STATE = 1
STATE_TIME = 0
PCAP_ARGS = ()
PCAP_KW = dict(promisc = True, timeout_ms = 0)
DEBUG = False
DEBUG_IFACE = False and DEBUG # put out which devices are set up
DEBUG_IP = False and DEBUG # print ip debug output for ip packets v4
DEBUG_ARP = False and DEBUG # send arp communication debug out
DEBUG_SYN = False and DEBUG # print out the syn requests sent
DEBUG_PACKET = False and DEBUG # packet inspection as seen by scanner
DEBUG_PHASE = True and DEBUG # scanner phases - 5
DEBUG_STATE = False and DEBUG # debug output about the state
DEBUG_PHASE2 = False and DEBUG # debug output about what is sent in phase 2
# you need higher previleges for some of these operations
ETHER_BROADCAST = (0xff,) * 6 # mac ff:ff:ff:ff:ff:ff
# --- Conversions --------------------------------------------------------------
def ip_tuple(ip):
'''Decode an IP address [0.0.0.0] to a tuple of bytes'''
return tuple(map(int, ip.split('.')))
def tuple_ip(ip):
'''Encode a a tuple of bytes to an IP address [0.0.0.0]'''
return '.'.join(map(str, (ip[0], ip[1], ip[2], ip[3])))
# --- Packet Creation --------------------------------------------------------------
def generate_empty_arp_request():
# build ethernet frame
eth = ImpactPacket.Ethernet()
eth.set_ether_type(0x0806) # this is an ARP packet
eth.set_ether_dhost(ETHER_BROADCAST)# destination host (broadcast)
# build ARP packet
arp = ImpactPacket.ARP()
arp.set_ar_hrd(1)
arp.set_ar_hln(6) # ethernet address length = 6
arp.set_ar_pln(4) # ip address length = 4
arp.set_ar_pro(0x800) # protocol: ip
arp.set_ar_op(1) # opcode: request
arp.set_ar_tha(ETHER_BROADCAST) # target hardware address (broadcast)
eth.contains(arp)
return eth, arp
def generate_empty_ip_packet():
eth = ImpactPacket.Ethernet()
#### values to be set:
# type, shost, dhost
eth.set_ether_type(0x800)
ip = ImpactPacket.IP()
#### values to be set:
# version, IHL, TOS, total_length, ID, Flags, Fragment offset,
# TTL, Protocol, Checksum, source_addr, destination_addr, options
ip.set_ip_v(4)
ip.set_ip_hl(5) # 5 * 32 bit
ip.set_ip_tos(0) # usal packet -> type of service = 0
# total_length
ip.set_ip_id(random.randint(1, 0xffff))
ip.set_ip_df(0) # flags redundant
ip.set_ip_off(0)
ip.set_ip_ttl(250)
ip.set_ip_p(6) # tcp = 6
eth.contains(ip)
return eth, ip
# --- Scanner --------------------------------------------------------------
def start_scan(timeout):
'''return a scanner object
'''
# mac addresses are used to send ethernet packages
mac_addresses = {} # ip : set([mac])
# threadsave access to the targets
targets_lock = thread.allocate_lock()
targets = [] # (family, (ip, port, ...))
# list of target names
target_hosts = set() # host ips
def is_target(host):
return host in target_hosts
def add_target(family, address):
target_hosts.add(address[IP])
mac_addresses.setdefault(address[IP], set())
with targets_lock:
targets.append((family, address))
def store_ip_mac_resolution_for(host):
for family, socktype, proto, canonname, address in \
socket.getaddrinfo(host, 0):
mac_addresses.setdefault(address[IP], set())
def associate_ip_mac(ip, mac):
if ip in mac_addresses or is_target(ip):
if type(mac) is list:
hashable_array_constructor = ('B', ''.join(map(chr, mac)))
else:
hashable_array_constructor = (mac.typecode, mac.tostring())
mac_addresses[ip].add(hashable_array_constructor)
def get_macs(host):
macs = set()
empty_set = set()
for family, socktype, proto, canonname, (ip, port) in \
socket.getaddrinfo(host, 0):
macs.update(mac_addresses.get(ip, empty_set))
return [array.array(*mac) for mac in macs]
def get_local_macs():
macs = set()
for ip in get_host_ips():
for mac in get_macs(ip):
macs.add((ip, tuple(mac.tolist())))
return macs
def ip_known(ip):
return bool(mac_addresses.get(ip, False))
def save_ip_mac_resolution(ether, ip_header):
source_ip = ip_header.get_ip_src()
source_mac = ether.get_ether_shost()
associate_ip_mac(source_ip, source_mac)
destination_ip = ip_header.get_ip_dst()
destination_mac = ether.get_ether_dhost()
associate_ip_mac(destination_ip, destination_mac)
## parse data directly from pcap
def find_connection_response(data):
# Parse the Ethernet packet
decoder = ImpactDecoder.EthDecoder()
find_connection_response_ethernet(decoder.decode(data))
def find_connection_response_ethernet(ether):
eth_type = ether.get_ether_type()
if eth_type == 0x800:
# Received an IP-packet (2048)
# Parse the IP packet inside the Ethernet packet
find_connection_response_ip(ether, ether.child())
elif eth_type == 0x0806:
store_mac_of_target(ether)
## arp response handling
def store_mac_of_target(ether):
arp = ether.child()
if arp.get_ar_op() in (2, ):
if DEBUG_ARP:print 'response'
# Received ARP Response
source_mac_addr = arp.get_ar_sha()
source_ip_addr = tuple_ip(arp.get_ar_spa())
destination_mac_addr = arp.get_ar_tha()
destination_ip_addr = tuple_ip(arp.get_ar_tpa())
if DEBUG_ARP:print source_mac_addr, source_ip_addr, destination_mac_addr, destination_ip_addr
if is_target(destination_ip_addr):
if DEBUG_ARP:print 'intersting:', destination_ip_addr, destination_mac_addr
associate_ip_mac(destination_ip_addr, destination_mac_addr)
if is_target(source_ip_addr):
if DEBUG_ARP:print 'intersting:', source_ip_addr, source_mac_addr
associate_ip_mac(source_ip_addr, source_mac_addr)
## tcp syn-ack response handling
def find_connection_response_ip(ether, ip_header):
save_ip_mac_resolution(ether, ip_header)
if ip_header.get_ip_p() == 0x6:
# Received a TCP-packet
# Parse the TCP packet inside the IP packet
if DEBUG_IP > 2:
print 'received ip packet: %s to %s' % (ip_header.get_ip_src(), \
ip_header.get_ip_dst())
source_ip = ip_header.get_ip_src()
destination_ip = ip_header.get_ip_dst()
if not is_target(source_ip):
return
if DEBUG_IP > 1:print 'found interest in: %s' % ip_header.get_ip_src()
find_connection_response_tcp(ip_header, ip_header.child())
def find_connection_response_tcp(ip_header, tcp_header):
# Only process SYN-ACK packets
source_ip = ip_header.get_ip_src()
source_port = tcp_header.get_th_sport()
destination_ip = ip_header.get_ip_dst()
destination_port = tcp_header.get_th_sport()
print targets
if tcp_header.get_SYN() and tcp_header.get_ACK():
# Get the source and destination IP addresses
# Print the results
if DEBUG_IP: print("Connection attempt %s:(%s) <- %s:%s" % \
(source_ip, source_port, \
destination_ip, destination_port))
if source_ip in target_hosts:
put_port(source_port)
elif tcp_header.get_SYN() and not tcp_header.get_ACK() and source_ip in get_host_ips():
# someone sent a syn request along
# asuming the acknoledge will come here, too
target = (socket.AF_INET, (destination_ip, destination_port))
if DEBUG_IP: print("Connection attempt %s:(%s) --> %s:%s" % \
(source_ip, source_port, \
destination_ip, destination_port))
with targets_lock:
try:
targets.remove(target)
except ValueError:
pass
def put_port(port):
sys.stdout.write(str(port) + '\n')
## syn packet sending
def send_syn(family, addr):
if family == socket.AF_INET:
send_syn_ipv4(addr)
elif family == socket.AF_INET6:
pass
else:
sys.stderr.write('Warning: in send_syn: family %s not supported\n' \
% family)
def send_syn_ipv4(address):
for packet in iter_syn_packets(address):
if DEBUG_PACKET:
print 'packet', id(packet)
send_packet(packet)
def iter_syn_packets(address):
for tcp in iter_tcp_packets(address):
for eth, ip in iter_eth_packets(address):
ip.contains(tcp)
packet = eth.get_packet()
yield packet
def get_host_ips():
return socket.gethostbyname_ex(socket.gethostname())[2]
def iter_eth_packets((target_ip, port)):
eth, ip = generate_empty_ip_packet()
for source_ip in get_host_ips():
ip.set_ip_src(source_ip)
ip.set_ip_dst(target_ip)
for source_mac in get_macs(source_ip):
eth.set_ether_shost(source_mac)
for target_mac in get_macs(target_ip):
eth.set_ether_dhost(target_mac)
yield eth, ip
def get_devices():
return scanning.values()
def iter_tcp_packets((_, target_port)):
tcp = ImpactPacket.TCP()
#### values to set:
# source port, destination port, sequence number, window, flags
source_port = random.randint(2048, 0xffff)
tcp.set_th_sport(source_port)
tcp.set_th_dport(target_port)
tcp.set_th_seq(random.randint(1, 0x7fffffff))
tcp.set_th_win(32768) # window -> discovered this as default
tcp.set_SYN()
yield tcp
# waiting and scanner interaction
keep_running = [1] # True
def wait():
if keep_running:
keep_running.pop() # keep_running = False
while scanning:
time.sleep(0.01)
## raw_input()
def add_scan((socketargs, addr)):
ip = addr[IP]
port = addr[PORT]
family = socketargs[SOCK_INIT_FAMILY]
if ip_known(ip):
send_syn(family, addr)
else:
add_target(family, addr)
notify(family, addr)
notified = {}
def notify(family, addr):
now = time.time()
if family == socket.AF_INET:
ip = addr[IP]
if notified.get(ip, 0) < now - NOTIFY_TIMEOUT:
notified[ip] = now
send_who_is_ipv4(ip)
elif family == socket.AF_INET6:
pass
else:
raise ValueError('unknown protocol family type %i' % family)
scanning_lock = thread.allocate_lock()
scanning = {} # device_name : device
def send_who_is_ipv4(target_ip):
eth, arp = generate_empty_arp_request()
arp.set_ar_tpa(ip_tuple(target_ip)) # target protocol address
for ip, mac in get_local_macs():
arp.set_ar_spa(ip_tuple(ip)) # source protocol address
arp.set_ar_sha(mac) # source hardware address
eth.set_ether_shost(mac) # source hardware address
if DEBUG_ARP: print 'send_who_is_ipv4: %s%s -> %s' % (ip, mac, target_ip)
send_packet(eth.get_packet())
def send_packet(packet):
t = -time.time()
for device in get_devices():
if DEBUG_PACKET:print device, repr(packet)
device.sendpacket(packet)
t -= time.time() - 0.002
if t > 0:
time.sleep(t)
def scan(device_name, device):
if DEBUG_IFACE: print 'dev up: %s' % device_name
with scanning_lock:
if device_name in scanning:
return
scanning[device_name] = device
try:
while device_name in scanning:
time, data = next(device)
find_connection_response(str(data))
finally:
with scanning_lock:
scanning.pop(device_name, None )
if DEBUG_IFACE: print 'dev down: %s' % device_name
def start_scans():
for device_name in pcap.findalldevs():
start_scan(device_name)
start_scan(pcap.lookupdev())
def start_scan(device_name):
device = pcap.pcap(device_name, *PCAP_ARGS, **PCAP_KW)
thread.start_new(scan, (device_name, device))
def notify_loop():
targets_lock.acquire()
while targets or phase:
targets_lock.release()
try:
do_notify()
except:
traceback.print_exc()
# iterate over scanner phases
try:
phases[0]()
except:
traceback.print_exc()
targets_lock.acquire()
targets_lock.release()
def get_state():
return len(targets)
last_state = [time.time(), get_state()]
def state_has_not_changed_for(timeout):
now = time.time()
state = get_state()
if state != last_state[STATE_STATE]:
last_state[STATE_TIME] = now
last_state[STATE_STATE] = state
if DEBUG_STATE: print 'state old:', last_state[STATE_TIME] + timeout < now
return last_state[STATE_TIME] + timeout < now
def reset_state():
now = time.time()
state = get_state()
last_state[STATE_TIME] = now
last_state[STATE_STATE] = state
target_save = [] # needed between phase 3 and 4
phases = []
phase = phases.append
@phase
def do_scanner_phase():
# wait for wait()
if keep_running: return
if DEBUG_PHASE: print 'initiated phase 1 = waiting'
reset_state()
phases.pop(0)
if not targets:
give_up()
@phase
def do_scanner_phase():
# wait to timeout without exiting wait
# send ip packets to the host to enable
if not state_has_not_changed_for(timeout): return
if DEBUG_PHASE: print 'initiated phase 2 = send packets'
send_packets_to_addresses_to_sniff_mac()
reset_state()
phases.pop(0)
if not targets:
give_up()
@phase
def do_scanner_phase():
# wait to timeout without exiting wait
# set all ip hosts to have all mac addresses
if not state_has_not_changed_for(timeout): return
if DEBUG_PHASE: print 'initiated phase 3 = send to all'
target_save.extend(targets[:])
associate_all_ip_with_all_mac_addresses()
reset_state()
phases.pop(0)
if not targets:
give_up()
@phase
def do_scanner_phase():
# wait to timeout without exiting wait
# start broadcasting instead of using real mac address
if not state_has_not_changed_for(timeout): return
if DEBUG_PHASE: print 'initiated phase 4 = broadcast'
if add_broadcast_to_all_mac_addresses():
with targets_lock:
targets.extend(target_save)
reset_state()
give_up()
@phase
def do_scanner_phase():
# wait to timeout without exiting wait
# give up
if not state_has_not_changed_for(timeout): return
if DEBUG_PHASE: print 'initiated phase 5 = give up'
for device_name in scanning.keys():
scanning.pop(device_name)
reset_state()
phases.insert(0, phases.pop(-1))
@phase
def do_scanner_phase():
pass
def give_up():
phases.insert(0, phases.pop(-2))
def send_packets_to_addresses_to_sniff_mac():
udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
for host in target_hosts:
send_udp(udp_sock, host)
try:
raw_sock = socket.socket(socket.AF_INET, socket.SOCK_RAW)
except:
sys.stderr.write('higher previleges needed to perform raw socket packet send\n')
return
for target in targets:
send_raw(raw_sock, target)
def send_raw(raw_sock, (family, addr)):
if family == socket.AF_INET:
send_raw_ipv4(raw_sock, addr)
elif family == socket.AF_INET6:
pass # todo: ipv6
else:
raise ValueError('invalid family %s' % (family,))
def send_raw_ipv4(raw_sock, addr):
for tcp in iter_tcp_packets(addr):
if DEBUG_PHASE2: print 'sending tcp raw', repr(tcp.get_packet()), addr
try:
raw_sock.sendto(tcp.get_packet(), addr)
except ():
pass
def send_udp(s, host):
# send an udp packet to sniff mac address
try:
s.sendto(':)', (host, random.randint(0, 0xffff)))
except socket_error as e:
if DEBUG_PHASE2: print 'failed: send to %r %s' % (host, e)
else:
if DEBUG_PHASE2: print 'succeded: send to %r' % (host,)
s.close()
def associate_all_ip_with_all_mac_addresses():
macs = set()
for mac in mac_addresses.values():
macs.update(mac)
for mac in mac_addresses.values():
mac.update(macs)
if DEBUG_PHASE: print 'macs:', [mac for mac in macs]
def add_broadcast_to_all_mac_addresses():
updated_mac = False
BC = ('B', ETHER_BROADCAST)
for mac in mac_addresses.values():
updated_mac = updated_mac or not BC in mac
mac.add(('B', ETHER_BROADCAST))
return updated_mac
def do_notify():
t = time.time()
notified = set()
for target in targets[:]:
ip = target[1][IP]
if ip in notified:
continue
if DEBUG_SYN:
print 'nofifying %s' % ip,
if ip_known(ip):
if DEBUG_SYN:print 'send_syn', target[PORT]
send_syn(*target)
targets.remove(target)
else:
if DEBUG_SYN:print 'notify'
notify(*target)
notified.add(ip)
t -= time.time() - NOTIFY_TIMEOUT
if t > 0:
time.sleep(t)
def start_notify_loop():
thread.start_new(notify_loop, ())
store_ip_mac_resolution_for(socket.gethostname())
start_scans()
start_notify_loop()
return obj(wait = wait, add_scan = add_scan)
def main():
host, ports, timeout = parseArgs(DEFAULT_SOCKET_TIMEOUT)
scanner = start_scan(timeout)
for connection in connections(host, ports):
scanner.add_scan(connection)
scanner.wait()
if __name__ == '__main__':
main()