Move iptogeo.py util into misc
This commit is contained in:
		
							
								
								
									
										157
									
								
								misc/iptogeo.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										157
									
								
								misc/iptogeo.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,157 @@ | ||||
| #!/usr/bin/env python | ||||
| # -*- coding: utf-8 -*- | ||||
|  | ||||
| # | ||||
| # Copyright 2016 Grégory Soutadé | ||||
| # | ||||
| # This file is part of iptogeo. | ||||
| # | ||||
| # iptogeo is free software: you can redistribute it and/or modify | ||||
| # it under the terms of the GNU General Public License as published by | ||||
| # the Free Software Foundation, either version 3 of the License, or | ||||
| # (at your option) any later version. | ||||
| # | ||||
| # iptogeo is distributed in the hope that it will be useful, | ||||
| # but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
| # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the | ||||
| # GNU General Public License for more details. | ||||
| # | ||||
| # You should have received a copy of the GNU General Public License | ||||
| # along with iptogeo.  If not, see <http://www.gnu.org/licenses/>. | ||||
| # | ||||
|  | ||||
| import socket | ||||
| import struct | ||||
|  | ||||
| class IPToGeoException(Exception): | ||||
|     pass | ||||
|  | ||||
| class IPToGeo(object): | ||||
|  | ||||
|     MAGIC = 0x179E08EF | ||||
|     VERSION = 1 | ||||
|     REQ = 1 | ||||
|     RESP = 0 | ||||
|     IPV4 = 4 | ||||
|     IPV6 = 16 | ||||
|  | ||||
|     IP_NOT_FOUND = 6 | ||||
|  | ||||
|     PACKET_SIZE = 32 | ||||
|      | ||||
|     ERRORS = {1 : 'Bad magic', | ||||
|               2 : 'Bad version', | ||||
|               3 : 'Bad request field' , | ||||
|               4 : 'Bad IP version', | ||||
|               5 : 'Unsupported IP version', | ||||
|               6 : 'IP not found'} | ||||
|  | ||||
|     MAX_REQUESTS = 50 | ||||
|  | ||||
|     def __init__(self, remote_addr='127.0.0.1', remote_port=53333, timeout=None, family=socket.AF_INET): | ||||
|         self._remote_addr = remote_addr | ||||
|         self._remote_port = remote_port | ||||
|         self._timeout = timeout | ||||
|         self._family = family | ||||
|         self._nb_requests_sent = self.MAX_REQUESTS # Force socket creation | ||||
|         self._socket = None | ||||
|  | ||||
|     def _create_socket(self): | ||||
|         if self._socket: | ||||
|             self._socket.close() | ||||
|         self._socket = socket.socket(self._family, socket.SOCK_STREAM) | ||||
|         if not self._timeout is None: | ||||
|             self._socket.settimeout(self._timeout) | ||||
|         self._socket.connect((self._remote_addr, self._remote_port)) | ||||
|  | ||||
|     def _extend_ipv6(self, ipv6): | ||||
|         tmp = '' | ||||
|         for s in ipv6.split(':'): | ||||
|             if not s: break | ||||
|             while len(s) != 4: | ||||
|                 s = '0' + s | ||||
|             tmp += s | ||||
|         while len(tmp) < 16*2: | ||||
|             tmp += '0' | ||||
|         res = '' | ||||
|         for i in range(0, 15*2, 2): | ||||
|             res += tmp[i] + tmp[i+1] + ':' | ||||
|         res += tmp[30] + tmp[31] | ||||
|  | ||||
|         return res | ||||
|  | ||||
|     def _create_request(self, ip, ip_type): | ||||
|         packet = b'' | ||||
|         packet += struct.pack('<IBBBBI', IPToGeo.MAGIC, IPToGeo.VERSION, IPToGeo.REQ, | ||||
|                               0, #err | ||||
|                               ip_type, # ip type | ||||
|                               0) # flags | ||||
|         for i in ip: | ||||
|             packet += struct.pack('<B', i) # ipv4 | ||||
|         packet += struct.pack('<III', 0, 0, 0) # ipv6 | ||||
|         packet += struct.pack('<I', 0) # country code | ||||
|  | ||||
|         return packet | ||||
|  | ||||
|     def _check_request(self, packet): | ||||
|         (magic, version, req, err, ip_type, flags, ipv4, ipv6b, ipv6c, ipv6d) = struct.unpack_from('<IBBBBIIIII', packet, 0) | ||||
|  | ||||
|         if magic != IPToGeo.MAGIC: | ||||
|             raise IPToGeoException('Invalid magic %08x' % (magic)) | ||||
|          | ||||
|         ip_res = '%08x' % (ipv4) | ||||
|         if ip_type == IPToGeo.IPV6: | ||||
|             ip_res += '%08x' % (ipv6b) | ||||
|             ip_res += '%08x' % (ipv6c) | ||||
|             ip_res += '%08x' % (ipv6d) | ||||
|  | ||||
|         if err == IPToGeo.IP_NOT_FOUND: return (ip_res, None) # IP not found | ||||
|         if err != 0: | ||||
|             raise IPToGeoException(IPToGeo.ERRORS[err]) | ||||
|  | ||||
|         (cc0, cc1, cc2, cc3) = struct.unpack_from('BBBB', packet, 7*4) | ||||
|  | ||||
|         return (ip_res, '%c%c%c%c' % (cc0, cc1, cc2, cc3)) | ||||
|  | ||||
|     def _send_request(self, packet, second_chance=True): | ||||
|         self._nb_requests_sent += 1 | ||||
|         if self._nb_requests_sent >= self.MAX_REQUESTS: | ||||
|             self._create_socket() | ||||
|             self._nb_requests_sent = 0 | ||||
|         try: | ||||
|             self._socket.send(packet) | ||||
|             packet = self._socket.recv(IPToGeo.PACKET_SIZE) | ||||
|             if not packet: | ||||
|                 raise socket.timeout | ||||
|             return packet | ||||
|         except socket.timeout as e: | ||||
|             if second_chance: | ||||
|                 self._nb_requests_sent = self.MAX_REQUESTS | ||||
|                 return self._send_request(packet, False) | ||||
|             else: | ||||
|                 raise e | ||||
|              | ||||
|     def ip_to_geo(self, ip): | ||||
|         ip_type = IPToGeo.IPV4 | ||||
|         if ip.find('.') >= 0: | ||||
|             splitted_ip = [int(a) for a in ip.split('.')] | ||||
|             if len(splitted_ip) != 4: | ||||
|                 raise Exception('Bad IP %s' % (ip)) | ||||
|         elif ip.find(':') >= 0: | ||||
|             splitted_ip = [int(a, 16) for a in self._extend_ipv6(ip).split(':')] | ||||
|             if len(splitted_ip) != 16: | ||||
|                 raise Exception('Bad IP %s' % (ip)) | ||||
|             ip_type = IPToGeo.IPV6 | ||||
|         else: | ||||
|             raise Exception('Bad IP %s' % (ip)) | ||||
|  | ||||
|         packet = self._create_request(splitted_ip, ip_type) | ||||
|         packet = self._send_request(packet) | ||||
|         (ip, country_code) = self._check_request(packet) | ||||
|         if country_code: | ||||
|             # convert to string | ||||
|             country_code = '%c%c' % (country_code[0], country_code[1]) | ||||
|         return (ip, country_code) | ||||
|  | ||||
|     def close(self): | ||||
|         self._socket.close() | ||||
		Reference in New Issue
	
	Block a user