# -*- coding: utf-8 -*- # # Copyright Grégory Soutadé 2020 # This file is part of iwla # iwla is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # iwla is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with iwla. If not, see . # import re from iwla import IWLA from iplugin import IPlugin from display import * from misc.geoiplookup import geoiplookup """ Post analysis hook Filter users with given user conditions Plugin requirements : None Conf values needed : filtered_users : list of filters filtered_ip : list of ip (string) create_filtered_page* Filter can be a function or a list of filter description combined by AND operator Filter description can be a function or a list of 3 elements : * Field to match in visits * Operator '=', '==', '!=', '>', '>=', '<', '<=' for int value * Operator '=', '==', '!=', 'in', 'match' for str value * Target value For easiest config, you can indicate both 'remote_addr' or 'ip' in field element function prototype is func(iwla, hit) and must return True or False Example : def my_filter(iwla, hit): return True filtered_users = [ [['viewed_pages', '>=', '5'], ['viewed_hits', '>=', '5']], [['viewed_hits', '>=', '5'], my_filter], my_filter, ] Output files : None Statistics creation : visits : remote_ip => filtered geo_location Statistics update : visits : remote_ip => keep_requests Statistics deletion : None """ class IWLAPostAnalysisFilterUsers(IPlugin): def _check_filter(self, _filter): if len(_filter) != 3: raise Exception('Bad filter ' + ' '.join(_filter)) (field, operator, value) = _filter try: if type(value) == str: value = int(value, 10) _filter[2] = value if operator not in ('=', '==', '!=', '>', '>=', '<', '<='): raise Exception('Bad filter ' + ' '.join(_filter)) except Exception as e: if field == 'ip': _filter[0] = 'remote_ip' if operator not in ('=', '==', '!=', 'in', 'match'): raise Exception('Bad filter ' + ' '.join(_filter)) if operator == 'match': _filter[2] = re.compile(value) def load(self): self.filters = self.iwla.getConfValue('filtered_users', []) self.ip_filters = self.iwla.getConfValue('filtered_ip', []) for _filter in self.filters: if type(_filter) == list: for sub_filter in _filter: if not callable(sub_filter): self._check_filter(sub_filter) elif callable(_filter): continue else: raise Exception(f'Invalid filter {_filter}') return True def __do_filter(self, hit, _filter): (field, operator, value) = _filter if not field in hit.keys(): return False hit_value = hit[field] # In dict, we have something like : {dayX : value} were day0 is total if type(value) == int and type(hit_value) == dict: hit_value = list(hit_value.values())[0] if operator == '=' or operator == '==': return hit_value == value elif operator == 'in': return hit_value in value elif operator == 'match': return re.match(value, hit_value) elif operator == '!=': return hit_value != value elif operator == '<': return hit_value < value elif operator == '<=': return hit_value <= value elif operator == '>': return hit_value > value elif operator == '>=': return hit_value >= value def _do_filter(self, ip, hits): if ip in self.ip_filters: return True for _filter in self.filters: if callable(_filter): if _filter(self.iwla, hits[ip]): return True continue # Must match all sub filters filtered = True for sub_filter in _filter: if callable(sub_filter): if not sub_filter(self.iwla, hits[ip]): filtered = False break elif not self.__do_filter(hits[ip], sub_filter): filtered = False break if filtered: return True return False def hook(self): if not len(self.filters) and not len(self.ip_filters): return hits = self.iwla.getValidVisitors() for (key,value) in hits.items(): # Already filtered if value.get('filtered', False): continue if self._do_filter(key, hits): value['filtered'] = True value['keep_requests'] = True try: if not value.get('geo_location', None): value['geo_location'] = geoiplookup(value['remote_ip']) except Exception as e: print(e)