Initial commit
This commit is contained in:
77
tests/iptogeo.py
Normal file
77
tests/iptogeo.py
Normal file
@@ -0,0 +1,77 @@
|
||||
#!/usr/bin/env python
|
||||
# -*- coding: utf-8
|
||||
|
||||
import socket
|
||||
import struct
|
||||
|
||||
class IPToGeoException(Exception):
|
||||
pass
|
||||
|
||||
class IPToGeo(object):
|
||||
|
||||
MAGIC = 0x179E08EF
|
||||
VERSION = 1
|
||||
REQ = 1
|
||||
RESP = 0
|
||||
IPV4 = 32
|
||||
IPV6 = 128
|
||||
|
||||
IP_NOT_FOUND = 6
|
||||
|
||||
PACKET_SIZE = 32
|
||||
|
||||
ERRORS = {1 : 'Bad magic',
|
||||
2 : 'Bad version',
|
||||
3 : 'Bad request field' ,
|
||||
4 : 'Bad IP version',
|
||||
5 : 'Unsupported IP version',
|
||||
6 : 'IP not found'}
|
||||
|
||||
def __init__(self, remote_addr='127.0.0.1', remote_port=53333, timeout=None):
|
||||
self._remote_addr = remote_addr
|
||||
self._remote_port = remote_port
|
||||
self._timeout = timeout
|
||||
|
||||
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
self._socket.connect((remote_addr, remote_port))
|
||||
|
||||
def _create_request(self, int_ip):
|
||||
packet = ''
|
||||
packet += struct.pack('<IBBBBI', IPToGeo.MAGIC, IPToGeo.VERSION, IPToGeo.REQ,
|
||||
0, #err
|
||||
IPToGeo.IPV4, # ip type
|
||||
0) # flags
|
||||
packet += struct.pack('<I', int_ip) # ipv4
|
||||
packet += struct.pack('<III', 0, 0, 0) # ipv6
|
||||
packet += struct.pack('<I', 0) # country code
|
||||
|
||||
return packet
|
||||
|
||||
def _check_request(self, packet):
|
||||
(magic, version, req, err, ip_type, flags, ipv4, ipv6b, ipv6c, ipv6d) = struct.unpack_from('<IBBBBIIIII', packet, 0)
|
||||
|
||||
if magic != IPToGeo.MAGIC:
|
||||
raise IPToGeoException('Invalid magic %08x' % (magic))
|
||||
|
||||
if err == 6: return (ipv4, None) # IP not found
|
||||
if err != 0:
|
||||
raise IPToGeoException(IPToGeo.ERRORS[err])
|
||||
|
||||
(cc0, cc1, cc2, cc3) = struct.unpack_from('BBBB', packet, 7*4)
|
||||
|
||||
return (ipv4, '%c%c%c%c' % (cc0, cc1, cc2, cc3))
|
||||
|
||||
def ip_to_geo(self, ip):
|
||||
splitted_ip = [int(a) for a in ip.split('.')]
|
||||
int_ip = splitted_ip[0] << 24
|
||||
int_ip |= splitted_ip[1] << 16
|
||||
int_ip |= splitted_ip[2] << 8
|
||||
int_ip |= splitted_ip[3] << 0
|
||||
|
||||
packet = self._create_request(int_ip)
|
||||
self._socket.send(packet)
|
||||
packet = self._socket.recv(IPToGeo.PACKET_SIZE)
|
||||
|
||||
(ip, country_code) = self._check_request(packet)
|
||||
|
||||
return (ip, country_code)
|
||||
24
tests/test_ip_to_geo.py
Executable file
24
tests/test_ip_to_geo.py
Executable file
@@ -0,0 +1,24 @@
|
||||
#!/usr/bin/env python
|
||||
# -*- coding: utf-8
|
||||
|
||||
from iptogeo import IPToGeo
|
||||
|
||||
iptogeo = IPToGeo()
|
||||
|
||||
def test_ip(ip):
|
||||
(ipres, country_code) = iptogeo.ip_to_geo(ip)
|
||||
|
||||
if not country_code:
|
||||
print 'Country code for %s (%08x) not found' % (ip, ipres)
|
||||
else:
|
||||
print 'Country code for %s (%08x) is %s' % (ip, ipres, country_code)
|
||||
|
||||
test_ip('1.5.7.3')
|
||||
test_ip('1.5.255.4')
|
||||
test_ip('1.6.255.4')
|
||||
test_ip('2.0.0.0')
|
||||
test_ip('127.0.0.1')
|
||||
test_ip('1.55.3.12')
|
||||
test_ip('1.57.0.0')
|
||||
|
||||
|
||||
Reference in New Issue
Block a user