From 33e74eccd80684b5a09130f43ae19a4386787ecd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gr=C3=A9gory=20Soutad=C3=A9?= Date: Thu, 7 May 2020 08:42:14 +0200 Subject: [PATCH] Rework examples with a class that encapsulate all generic monitor stuff --- examples/dbusitem.py | 29 ----- examples/genericmonitor.py | 141 +++++++++++++++++++++ examples/mail.py | 243 +++++++++++++++++-------------------- examples/timer.py | 68 +++-------- 4 files changed, 272 insertions(+), 209 deletions(-) delete mode 100644 examples/dbusitem.py create mode 100644 examples/genericmonitor.py diff --git a/examples/dbusitem.py b/examples/dbusitem.py deleted file mode 100644 index 7997918..0000000 --- a/examples/dbusitem.py +++ /dev/null @@ -1,29 +0,0 @@ - -class DBUSItem: - def __init__(self, name, text='', style='', icon='', iconStyle='', onClick='', box=''): - self.name = name - self.text = text - self.style= style - self.icon = icon - self.iconStyle = iconStyle - self.onClick = onClick - self.box = box - - self._checkValues() - - def _checkValues(self): - if self.onClick and not self.onClick in ('signal', 'delete'): - raise ValueError('Invalid onClick value') - if self.box and not self.box in ('left', 'center', 'right'): - raise ValueError('Invalid box value') - - def toMap(self): - myMap = {"name":self.name} - for p in ('text', 'style', 'icon', 'box'): - if self.__dict__[p]: - myMap[p] = self.__dict__[p] - if self.iconStyle: - myMap['icon-style'] = self.iconStyle - if self.onClick: - myMap['on-click'] = self.onClick - return [myMap] diff --git a/examples/genericmonitor.py b/examples/genericmonitor.py new file mode 100644 index 0000000..4ec3891 --- /dev/null +++ b/examples/genericmonitor.py @@ -0,0 +1,141 @@ +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# + +import json +import dbus +from dbus.mainloop.glib import DBusGMainLoop +import gi +gi.require_version('Gst', '1.0') +from gi.repository import GObject, GLib, Gst + + +class GenericMonitor: + + def setupMonitor(self): + self._activated = True + self._encoder = json.JSONEncoder() + self._dbus_interface = 'com.soutade.GenericMonitor' + + dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) + self._dbus = dbus.SessionBus() + + self.systray_proxy = self._dbus.get_object('org.gnome.Shell', '/com/soutade/GenericMonitor') + + self._dbus.add_signal_receiver(self.onClick, 'onClick', self._dbus_interface) + self._dbus.add_signal_receiver(self.onDblClick, 'onDblClick', self._dbus_interface) + self._dbus.add_signal_receiver(self.onRightClick, 'onRightClick', self._dbus_interface) + self._dbus.add_signal_receiver(self.onDblRightClick, 'onDblRightClick', self._dbus_interface) + + self._dbus.add_signal_receiver(self.onActivate, 'onActivate', self._dbus_interface) + self._dbus.add_signal_receiver(self.onDeactivate, 'onDeactivate', self._dbus_interface) + + def runMainLoop(self): + self._mainLoop = GLib.MainLoop() + self._mainLoop.run() + + def stopMainLoop(self): + self._mainLoop.quit() + + # Generic Monitor functions + def notify(self, group): + if self._activated: + if type(group) == GenericMonitorGroup: + group = group.getValues() + self.systray_proxy.notify(self._encoder.encode(group), dbus_interface=self._dbus_interface) + + def deleteItems(self, items): + if self._activated: + self.systray_proxy.deleteItems(self._encoder.encode(items), dbus_interface=self._dbus_interface) + + def deleteGroups(self, groups): + if self._activated: + self.systray_proxy.deleteGroups(self._encoder.encode(groups), dbus_interface=self._dbus_interface) + + # Generic Monitor signals + def onClick(self, sender): + pass + + def onRightClick(self, sender): + pass + + def onDblClick(self, sender): + pass + + def onDblRightClick(self, sender): + pass + + def onActivate(self): + self._activated = True + + def onDeactivate(self): + self._activated = False + + # DBUS method + def add_signal_receiver(self, callback, signalName, interface): + self._dbus.add_signal_receiver(callback, signalName, interface) + + +class GenericMonitorItem: + def __init__(self, name, text='', style='', icon='', iconStyle='', onClick='', box=''): + self.name = name + self.text = text + self.style= style + self.icon = icon + self.iconStyle = iconStyle + self.onClick = onClick + self.box = box + + self._checkValues() + + def _checkValues(self): + if self.onClick and not self.onClick in ('signal', 'delete'): + raise ValueError('Invalid onClick value') + if self.box and not self.box in ('left', 'center', 'right'): + raise ValueError('Invalid box value') + + def toMap(self): + myMap = {"name":self.name} + for p in ('text', 'style', 'icon', 'box'): + if self.__dict__[p]: + myMap[p] = self.__dict__[p] + if self.iconStyle: + myMap['icon-style'] = self.iconStyle + if self.onClick: + myMap['on-click'] = self.onClick + return [myMap] + +class GenericMonitorGroup: + def __init__(self, name, items=[]): + self.name = name + if type(items) != list: + self.items = [items] + else: + self.items = items + + def addItem(self, item): + self.items.append(item) + + def addItems(self, items): + for item in items: + self.addItem(item) + + def getValues(self): + res = {'group': self.name, 'items':[]} + for item in self.items: + res['items'] += item.toMap() + return res + + def __str__(self): + return str(self.getValues()) diff --git a/examples/mail.py b/examples/mail.py index fe15eac..3ec6a2c 100755 --- a/examples/mail.py +++ b/examples/mail.py @@ -19,83 +19,15 @@ Display number of mail from gmail account and Pidgin icon if there is some unread Pidgin message ''' -import json import time import requests from requests.auth import HTTPBasicAuth import xml.dom.minidom import getpass -import dbus -from dbus.mainloop.glib import DBusGMainLoop -import gi -gi.require_version('Gst', '1.0') -from gi.repository import GObject, GLib, Gst from threading import Thread from signal import signal, SIGINT import sys -from dbusitem import DBUSItem - -MAIL_ADDRESS='XXX@gmail.com' - -def getMail(user, password): - res = DBUSItem('mail') - - address = "https://mail.google.com/mail/feed/atom" - auth=HTTPBasicAuth(user, password) - req = requests.get(address, auth=auth) - if req.status_code == requests.codes.ok: - dom = xml.dom.minidom.parseString(req.text) - try: - nb_messages = int(dom.getElementsByTagName('fullcount')[0].firstChild.nodeValue) - if nb_messages == 1: - res.text = '1 msg' - elif nb_messages > 1: - res.text = '%d msgs' % (nb_messages) - res.style = 'color:white' - except Exception as e: - res.text = str(e) - else: - res.text = 'Mail error %d' % (req.status_code) - - return res.toMap() - -def getEvents(mail_user, mail_password, proxy): - encoder = json.JSONEncoder() - res = {'group':'Mail', 'items':[]} - res['items'] += getMail(mail_user, mail_password) - try: - proxy.notify(encoder.encode(res), dbus_interface='com.soutade.GenericMonitor') - except dbus.exceptions.DBusException as e: - print(str(e)) - pass - -class EventThread(Thread): - SLEEP_TIME = 30 - - def stop(self): - self._stopLoop = True - - def run(self): - self._stopLoop = False - mail_password = getpass.getpass('Enter password for address %s: ' % (MAIL_ADDRESS)) - while not self._stopLoop: - getEvents(MAIL_ADDRESS, mail_password, systray_proxy) - # Be more reactive on signal capture - for i in range(0, EventThread.SLEEP_TIME): - if self._stopLoop: break - time.sleep(1) - -def signalHandler(signal_received, frame): - eventThread.stop() - mainLoop.quit() - eventThread.join() - encoder = json.JSONEncoder() - res = {'groups':['Mail', 'Pidgin']} - try: - systray_proxy.deleteGroups(encoder.encode(res), dbus_interface='com.soutade.GenericMonitor') - except Exception as e: - pass - sys.exit(0) +from genericmonitor import GenericMonitor, GenericMonitorGroup, GenericMonitorItem PURPLE_CONV_UPDATE_UNSEEN = 4 PURPLE_MESSAGE_SEND = 0 @@ -124,79 +56,128 @@ class PidginConversation: def isSeen(self): return (self.status == PidginConversation.STATUS_SEEN) -pidgin_conversations = {} -def pidginMessageReceived(account, sender, message, conversation, flags): - pidginConversation = pidgin_conversations.get(conversation, None) - if not pidginConversation: - pidgin_conversations[conversation] = PidginConversation(conversation) - else: - pidginConversation.updateNbMessages() +class EventThread(Thread,GenericMonitor): + SLEEP_TIME = 30 + MAIL_ADDRESS='XXX@gmail.com' -def pidginMessageWrote(account, sender, message, conversation, flags): - if not (flags & (1 << PURPLE_MESSAGE_SEND)): - return - pidginConversation = pidgin_conversations.get(conversation, None) - if not pidginConversation: - pidgin_conversations[conversation] = PidginConversation(conversation) - pidginConversation = pidgin_conversations[conversation] - pidginConversation.setSeen() - pidginConversation.nbMessages = 0 + def stop(self): + self._stopLoop = True + self.stopMainLoop() -def pidginConversationUpdated(conversation, _type): - if _type != PURPLE_CONV_UPDATE_UNSEEN: - return + def _getMail(self): + mailItem = GenericMonitorItem('mail') + mailGroup = GenericMonitorGroup('Mail', [mailItem]) - pidginConversation = pidgin_conversations.get(conversation, None) - if not pidginConversation: - return - - if pidginConversation.isSeen(): - if pidginConversation.nbMessages > 0: - pidginConversation.setUnseen() - else: - pidginConversation.setSeen() - - encoder = json.JSONEncoder() - # Message not seen by user - if not pidginConversation.isSeen(): - res = {'group':'Pidgin', 'items':[]} - res['items'] += DBUSItem('pidgin', icon='/usr/share/icons/hicolor/22x22/apps/pidgin.png', iconStyle='icon-size:22px').toMap() - try: - systray_proxy.notify(encoder.encode(res), dbus_interface='com.soutade.GenericMonitor') - except dbus.exceptions.DBusException as e: - print(str(e)) - pass - else: - deleteIcon = True - # Are all messages seen ? - for pConv in pidgin_conversations.values(): - if not pConv.isSeen(): - deleteIcon = False - break - if deleteIcon: - res = {'items':['pidgin@Pidgin']} + address = "https://mail.google.com/mail/feed/atom" + auth = HTTPBasicAuth(self.MAIL_ADDRESS, self._mail_password) + req = requests.get(address, auth=auth) + if req.status_code == requests.codes.ok: + dom = xml.dom.minidom.parseString(req.text) try: - systray_proxy.deleteItems(encoder.encode(res), dbus_interface='com.soutade.GenericMonitor') - except dbus.exceptions.DBusException as e: - print(str(e)) - pass + nb_messages = int(dom.getElementsByTagName('fullcount')[0].firstChild.nodeValue) + if nb_messages == 1: + mailItem.text = '1 msg' + elif nb_messages > 1: + mailItem.text = '%d msgs' % (nb_messages) + mailItem.style = 'color:white' + except Exception as e: + mailItem.text = str(e) + else: + mailItem.text = 'Mail error %d' % (req.status_code) + + self.notify(mailGroup) -dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) + def getEvents(self): + self._getMail() -bus = dbus.SessionBus() -systray_proxy = bus.get_object('org.gnome.Shell', '/com/soutade/GenericMonitor') + def run(self): + self._stopLoop = False + self.setupMonitor() + self.pidgin_conversations = {} -bus.add_signal_receiver(pidginMessageReceived, 'ReceivedImMsg', 'im.pidgin.purple.PurpleInterface') -bus.add_signal_receiver(pidginMessageReceived, 'ReceivedChatMsg', 'im.pidgin.purple.PurpleInterface') -bus.add_signal_receiver(pidginMessageWrote, 'WroteImMsg', 'im.pidgin.purple.PurpleInterface') -bus.add_signal_receiver(pidginMessageWrote, 'WroteChatMsg', 'im.pidgin.purple.PurpleInterface') -bus.add_signal_receiver(pidginConversationUpdated, 'ConversationUpdated', 'im.pidgin.purple.PurpleInterface') + self.add_signal_receiver(self.pidginMessageReceived, 'ReceivedImMsg', 'im.pidgin.purple.PurpleInterface') + self.add_signal_receiver(self.pidginMessageReceived, 'ReceivedChatMsg', 'im.pidgin.purple.PurpleInterface') + self.add_signal_receiver(self.pidginMessageWrote, 'WroteImMsg', 'im.pidgin.purple.PurpleInterface') + self.add_signal_receiver(self.pidginMessageWrote, 'WroteChatMsg', 'im.pidgin.purple.PurpleInterface') + self.add_signal_receiver(self.pidginConversationUpdated, 'ConversationUpdated', 'im.pidgin.purple.PurpleInterface') + + self._mail_password = getpass.getpass('Enter password for address %s: ' % (self.MAIL_ADDRESS)) + + while not self._stopLoop: + self.getEvents() + # Be more reactive on signal capture + for i in range(0, self.SLEEP_TIME): + if self._stopLoop: break + time.sleep(1) + + def pidginMessageReceived(self, account, sender, message, conversation, flags): + pidginConversation = self.pidgin_conversations.get(conversation, None) + if not pidginConversation: + self.pidgin_conversations[conversation] = PidginConversation(conversation) + else: + pidginConversation.updateNbMessages() + + def pidginMessageWrote(self, account, sender, message, conversation, flags): + if not (flags & (1 << PURPLE_MESSAGE_SEND)): + return + pidginConversation = self.pidgin_conversations.get(conversation, None) + if not pidginConversation: + self.pidgin_conversations[conversation] = PidginConversation(conversation) + pidginConversation = self.pidgin_conversations[conversation] + pidginConversation.setSeen() + pidginConversation.nbMessages = 1 + + def displayIcon(self): + pidginItem = GenericMonitorItem('pidgin', icon='/usr/share/icons/hicolor/22x22/apps/pidgin.png', iconStyle='icon-size:22px') + pidginGroup = GenericMonitorGroup('Pidgin', pidginItem) + self.notify(pidginGroup) + + def pidginConversationUpdated(self, conversation, _type): + if _type != PURPLE_CONV_UPDATE_UNSEEN: + return + + pidginConversation = self.pidgin_conversations.get(conversation, None) + if not pidginConversation: + return + + if pidginConversation.isSeen(): + if pidginConversation.nbMessages > 0: + pidginConversation.setUnseen() + else: + pidginConversation.setSeen() + + # Message not seen by user + if not pidginConversation.isSeen(): + self.displayIcon() + else: + deleteIcon = True + # Are all messages seen ? + for pConv in self.pidgin_conversations.values(): + if not pConv.isSeen(): + deleteIcon = False + break + if deleteIcon: + items = {'items':['pidgin@Pidgin']} + self.deleteItems(items) + + def onActivate(self): + super().onActivate() + for pConv in self.pidgin_conversations.values(): + if not pConv.isSeen(): + self.displayIcon() + break + +def signalHandler(signal_received, frame): + eventThread.stop() + eventThread.join() + groups = {'groups':['Mail', 'Pidgin']} + eventThread.deleteGroups(groups) + sys.exit(0) eventThread = EventThread() eventThread.start() signal(SIGINT, signalHandler) -mainLoop = GLib.MainLoop() -mainLoop.run() +eventThread.runMainLoop() diff --git a/examples/timer.py b/examples/timer.py index fbabaea..e44bcc9 100755 --- a/examples/timer.py +++ b/examples/timer.py @@ -25,27 +25,21 @@ Display two timers with interactive behaviour : * Right click : switch timer ''' -import json import time -import dbus -from dbus.mainloop.glib import DBusGMainLoop -import gi -gi.require_version('Gst', '1.0') -from gi.repository import GObject, GLib, Gst from threading import Thread from signal import signal, SIGINT import sys -from dbusitem import DBUSItem +from genericmonitor import GenericMonitor, GenericMonitorGroup, GenericMonitorItem -class TimerThread(Thread): +class TimerThread(Thread,GenericMonitor): def stop(self): self._stopLoop = True + self.stopMainLoop() def _displayTimerValue(self): - encoder = json.JSONEncoder() - res = {'group':'Timer', 'items':[]} - item = DBUSItem('timer', onClick='signal', box='right') + item = GenericMonitorItem('timer', onClick='signal', box='right') + group = GenericMonitorGroup('Timer', item) curValue = self.timers[self.curTimer] item.text = '%02d:%02d' % (int(curValue/60)%60, curValue%60) if curValue >= (60*60): @@ -57,13 +51,10 @@ class TimerThread(Thread): else: style = 'color:#215D9C' item.style = style - res['items'] += item.toMap() - try: - systray_proxy.notify(encoder.encode(res), dbus_interface='com.soutade.GenericMonitor') - except: - pass + self.notify(group) def run(self): + self.setupMonitor() self.timers = [0, 0] self.curTimer = 0 self.timerPaused = False @@ -98,48 +89,27 @@ class TimerThread(Thread): self.timerPaused = True self._displayTimerValue() - def onDblRightClick(self, sender): - pass + def onActivate(self): + super().onActivate() + self.timerPaused = self._lastState + self._displayTimerValue() -def onClick(sender): - timerThread.onClick(sender) - -def onRightClick(sender): - timerThread.onRightClick(sender) - -def onDblClick(sender): - timerThread.onDblClick(sender) - -def onDblRightClick(sender): - timerThread.onDblRightClick(sender) - + def onDeactivate(self): + super().onDeactivate() + self._lastState = self.timerPaused + if not self.timerPaused and self.curTimer == 0: + self.timerPaused = True def signalHandler(signal_received, frame): timerThread.stop() - mainLoop.quit() timerThread.join() - encoder = json.JSONEncoder() - res = {'groups':['Timer']} - try: - systray_proxy.deleteGroups(encoder.encode(res), dbus_interface='com.soutade.GenericMonitor') - except Exception as e: - pass + groups = {'groups':['Timer']} + timerThread.deleteGroups(groups) sys.exit(0) -dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) - -bus = dbus.SessionBus() -systray_proxy = bus.get_object('org.gnome.Shell', '/com/soutade/GenericMonitor') - -bus.add_signal_receiver(onClick, 'onClick', 'com.soutade.GenericMonitor') -bus.add_signal_receiver(onDblClick, 'onDblClick', 'com.soutade.GenericMonitor') -bus.add_signal_receiver(onRightClick, 'onRightClick', 'com.soutade.GenericMonitor') -bus.add_signal_receiver(onDblRightClick, 'onDblRightClick', 'com.soutade.GenericMonitor') - timerThread = TimerThread() timerThread.start() signal(SIGINT, signalHandler) -mainLoop = GLib.MainLoop() -mainLoop.run() +timerThread.runMainLoop()