Rework examples with a class that encapsulate all generic monitor stuff

This commit is contained in:
Grégory Soutadé 2020-05-07 08:42:14 +02:00
parent 3b7247f97e
commit 33e74eccd8
4 changed files with 272 additions and 209 deletions

View File

@ -1,29 +0,0 @@
class DBUSItem:
def __init__(self, name, text='', style='', icon='', iconStyle='', onClick='', box=''):
self.name = name
self.text = text
self.style= style
self.icon = icon
self.iconStyle = iconStyle
self.onClick = onClick
self.box = box
self._checkValues()
def _checkValues(self):
if self.onClick and not self.onClick in ('signal', 'delete'):
raise ValueError('Invalid onClick value')
if self.box and not self.box in ('left', 'center', 'right'):
raise ValueError('Invalid box value')
def toMap(self):
myMap = {"name":self.name}
for p in ('text', 'style', 'icon', 'box'):
if self.__dict__[p]:
myMap[p] = self.__dict__[p]
if self.iconStyle:
myMap['icon-style'] = self.iconStyle
if self.onClick:
myMap['on-click'] = self.onClick
return [myMap]

141
examples/genericmonitor.py Normal file
View File

@ -0,0 +1,141 @@
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
import json
import dbus
from dbus.mainloop.glib import DBusGMainLoop
import gi
gi.require_version('Gst', '1.0')
from gi.repository import GObject, GLib, Gst
class GenericMonitor:
def setupMonitor(self):
self._activated = True
self._encoder = json.JSONEncoder()
self._dbus_interface = 'com.soutade.GenericMonitor'
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
self._dbus = dbus.SessionBus()
self.systray_proxy = self._dbus.get_object('org.gnome.Shell', '/com/soutade/GenericMonitor')
self._dbus.add_signal_receiver(self.onClick, 'onClick', self._dbus_interface)
self._dbus.add_signal_receiver(self.onDblClick, 'onDblClick', self._dbus_interface)
self._dbus.add_signal_receiver(self.onRightClick, 'onRightClick', self._dbus_interface)
self._dbus.add_signal_receiver(self.onDblRightClick, 'onDblRightClick', self._dbus_interface)
self._dbus.add_signal_receiver(self.onActivate, 'onActivate', self._dbus_interface)
self._dbus.add_signal_receiver(self.onDeactivate, 'onDeactivate', self._dbus_interface)
def runMainLoop(self):
self._mainLoop = GLib.MainLoop()
self._mainLoop.run()
def stopMainLoop(self):
self._mainLoop.quit()
# Generic Monitor functions
def notify(self, group):
if self._activated:
if type(group) == GenericMonitorGroup:
group = group.getValues()
self.systray_proxy.notify(self._encoder.encode(group), dbus_interface=self._dbus_interface)
def deleteItems(self, items):
if self._activated:
self.systray_proxy.deleteItems(self._encoder.encode(items), dbus_interface=self._dbus_interface)
def deleteGroups(self, groups):
if self._activated:
self.systray_proxy.deleteGroups(self._encoder.encode(groups), dbus_interface=self._dbus_interface)
# Generic Monitor signals
def onClick(self, sender):
pass
def onRightClick(self, sender):
pass
def onDblClick(self, sender):
pass
def onDblRightClick(self, sender):
pass
def onActivate(self):
self._activated = True
def onDeactivate(self):
self._activated = False
# DBUS method
def add_signal_receiver(self, callback, signalName, interface):
self._dbus.add_signal_receiver(callback, signalName, interface)
class GenericMonitorItem:
def __init__(self, name, text='', style='', icon='', iconStyle='', onClick='', box=''):
self.name = name
self.text = text
self.style= style
self.icon = icon
self.iconStyle = iconStyle
self.onClick = onClick
self.box = box
self._checkValues()
def _checkValues(self):
if self.onClick and not self.onClick in ('signal', 'delete'):
raise ValueError('Invalid onClick value')
if self.box and not self.box in ('left', 'center', 'right'):
raise ValueError('Invalid box value')
def toMap(self):
myMap = {"name":self.name}
for p in ('text', 'style', 'icon', 'box'):
if self.__dict__[p]:
myMap[p] = self.__dict__[p]
if self.iconStyle:
myMap['icon-style'] = self.iconStyle
if self.onClick:
myMap['on-click'] = self.onClick
return [myMap]
class GenericMonitorGroup:
def __init__(self, name, items=[]):
self.name = name
if type(items) != list:
self.items = [items]
else:
self.items = items
def addItem(self, item):
self.items.append(item)
def addItems(self, items):
for item in items:
self.addItem(item)
def getValues(self):
res = {'group': self.name, 'items':[]}
for item in self.items:
res['items'] += item.toMap()
return res
def __str__(self):
return str(self.getValues())

View File

@ -19,83 +19,15 @@
Display number of mail from gmail account and Pidgin icon if there is some unread Pidgin message Display number of mail from gmail account and Pidgin icon if there is some unread Pidgin message
''' '''
import json
import time import time
import requests import requests
from requests.auth import HTTPBasicAuth from requests.auth import HTTPBasicAuth
import xml.dom.minidom import xml.dom.minidom
import getpass import getpass
import dbus
from dbus.mainloop.glib import DBusGMainLoop
import gi
gi.require_version('Gst', '1.0')
from gi.repository import GObject, GLib, Gst
from threading import Thread from threading import Thread
from signal import signal, SIGINT from signal import signal, SIGINT
import sys import sys
from dbusitem import DBUSItem from genericmonitor import GenericMonitor, GenericMonitorGroup, GenericMonitorItem
MAIL_ADDRESS='XXX@gmail.com'
def getMail(user, password):
res = DBUSItem('mail')
address = "https://mail.google.com/mail/feed/atom"
auth=HTTPBasicAuth(user, password)
req = requests.get(address, auth=auth)
if req.status_code == requests.codes.ok:
dom = xml.dom.minidom.parseString(req.text)
try:
nb_messages = int(dom.getElementsByTagName('fullcount')[0].firstChild.nodeValue)
if nb_messages == 1:
res.text = '1 msg'
elif nb_messages > 1:
res.text = '%d msgs' % (nb_messages)
res.style = 'color:white'
except Exception as e:
res.text = str(e)
else:
res.text = 'Mail error %d' % (req.status_code)
return res.toMap()
def getEvents(mail_user, mail_password, proxy):
encoder = json.JSONEncoder()
res = {'group':'Mail', 'items':[]}
res['items'] += getMail(mail_user, mail_password)
try:
proxy.notify(encoder.encode(res), dbus_interface='com.soutade.GenericMonitor')
except dbus.exceptions.DBusException as e:
print(str(e))
pass
class EventThread(Thread):
SLEEP_TIME = 30
def stop(self):
self._stopLoop = True
def run(self):
self._stopLoop = False
mail_password = getpass.getpass('Enter password for address %s: ' % (MAIL_ADDRESS))
while not self._stopLoop:
getEvents(MAIL_ADDRESS, mail_password, systray_proxy)
# Be more reactive on signal capture
for i in range(0, EventThread.SLEEP_TIME):
if self._stopLoop: break
time.sleep(1)
def signalHandler(signal_received, frame):
eventThread.stop()
mainLoop.quit()
eventThread.join()
encoder = json.JSONEncoder()
res = {'groups':['Mail', 'Pidgin']}
try:
systray_proxy.deleteGroups(encoder.encode(res), dbus_interface='com.soutade.GenericMonitor')
except Exception as e:
pass
sys.exit(0)
PURPLE_CONV_UPDATE_UNSEEN = 4 PURPLE_CONV_UPDATE_UNSEEN = 4
PURPLE_MESSAGE_SEND = 0 PURPLE_MESSAGE_SEND = 0
@ -124,30 +56,88 @@ class PidginConversation:
def isSeen(self): def isSeen(self):
return (self.status == PidginConversation.STATUS_SEEN) return (self.status == PidginConversation.STATUS_SEEN)
pidgin_conversations = {}
def pidginMessageReceived(account, sender, message, conversation, flags): class EventThread(Thread,GenericMonitor):
pidginConversation = pidgin_conversations.get(conversation, None) SLEEP_TIME = 30
MAIL_ADDRESS='XXX@gmail.com'
def stop(self):
self._stopLoop = True
self.stopMainLoop()
def _getMail(self):
mailItem = GenericMonitorItem('mail')
mailGroup = GenericMonitorGroup('Mail', [mailItem])
address = "https://mail.google.com/mail/feed/atom"
auth = HTTPBasicAuth(self.MAIL_ADDRESS, self._mail_password)
req = requests.get(address, auth=auth)
if req.status_code == requests.codes.ok:
dom = xml.dom.minidom.parseString(req.text)
try:
nb_messages = int(dom.getElementsByTagName('fullcount')[0].firstChild.nodeValue)
if nb_messages == 1:
mailItem.text = '1 msg'
elif nb_messages > 1:
mailItem.text = '%d msgs' % (nb_messages)
mailItem.style = 'color:white'
except Exception as e:
mailItem.text = str(e)
else:
mailItem.text = 'Mail error %d' % (req.status_code)
self.notify(mailGroup)
def getEvents(self):
self._getMail()
def run(self):
self._stopLoop = False
self.setupMonitor()
self.pidgin_conversations = {}
self.add_signal_receiver(self.pidginMessageReceived, 'ReceivedImMsg', 'im.pidgin.purple.PurpleInterface')
self.add_signal_receiver(self.pidginMessageReceived, 'ReceivedChatMsg', 'im.pidgin.purple.PurpleInterface')
self.add_signal_receiver(self.pidginMessageWrote, 'WroteImMsg', 'im.pidgin.purple.PurpleInterface')
self.add_signal_receiver(self.pidginMessageWrote, 'WroteChatMsg', 'im.pidgin.purple.PurpleInterface')
self.add_signal_receiver(self.pidginConversationUpdated, 'ConversationUpdated', 'im.pidgin.purple.PurpleInterface')
self._mail_password = getpass.getpass('Enter password for address %s: ' % (self.MAIL_ADDRESS))
while not self._stopLoop:
self.getEvents()
# Be more reactive on signal capture
for i in range(0, self.SLEEP_TIME):
if self._stopLoop: break
time.sleep(1)
def pidginMessageReceived(self, account, sender, message, conversation, flags):
pidginConversation = self.pidgin_conversations.get(conversation, None)
if not pidginConversation: if not pidginConversation:
pidgin_conversations[conversation] = PidginConversation(conversation) self.pidgin_conversations[conversation] = PidginConversation(conversation)
else: else:
pidginConversation.updateNbMessages() pidginConversation.updateNbMessages()
def pidginMessageWrote(account, sender, message, conversation, flags): def pidginMessageWrote(self, account, sender, message, conversation, flags):
if not (flags & (1 << PURPLE_MESSAGE_SEND)): if not (flags & (1 << PURPLE_MESSAGE_SEND)):
return return
pidginConversation = pidgin_conversations.get(conversation, None) pidginConversation = self.pidgin_conversations.get(conversation, None)
if not pidginConversation: if not pidginConversation:
pidgin_conversations[conversation] = PidginConversation(conversation) self.pidgin_conversations[conversation] = PidginConversation(conversation)
pidginConversation = pidgin_conversations[conversation] pidginConversation = self.pidgin_conversations[conversation]
pidginConversation.setSeen() pidginConversation.setSeen()
pidginConversation.nbMessages = 0 pidginConversation.nbMessages = 1
def pidginConversationUpdated(conversation, _type): def displayIcon(self):
pidginItem = GenericMonitorItem('pidgin', icon='/usr/share/icons/hicolor/22x22/apps/pidgin.png', iconStyle='icon-size:22px')
pidginGroup = GenericMonitorGroup('Pidgin', pidginItem)
self.notify(pidginGroup)
def pidginConversationUpdated(self, conversation, _type):
if _type != PURPLE_CONV_UPDATE_UNSEEN: if _type != PURPLE_CONV_UPDATE_UNSEEN:
return return
pidginConversation = pidgin_conversations.get(conversation, None) pidginConversation = self.pidgin_conversations.get(conversation, None)
if not pidginConversation: if not pidginConversation:
return return
@ -157,46 +147,37 @@ def pidginConversationUpdated(conversation, _type):
else: else:
pidginConversation.setSeen() pidginConversation.setSeen()
encoder = json.JSONEncoder()
# Message not seen by user # Message not seen by user
if not pidginConversation.isSeen(): if not pidginConversation.isSeen():
res = {'group':'Pidgin', 'items':[]} self.displayIcon()
res['items'] += DBUSItem('pidgin', icon='/usr/share/icons/hicolor/22x22/apps/pidgin.png', iconStyle='icon-size:22px').toMap()
try:
systray_proxy.notify(encoder.encode(res), dbus_interface='com.soutade.GenericMonitor')
except dbus.exceptions.DBusException as e:
print(str(e))
pass
else: else:
deleteIcon = True deleteIcon = True
# Are all messages seen ? # Are all messages seen ?
for pConv in pidgin_conversations.values(): for pConv in self.pidgin_conversations.values():
if not pConv.isSeen(): if not pConv.isSeen():
deleteIcon = False deleteIcon = False
break break
if deleteIcon: if deleteIcon:
res = {'items':['pidgin@Pidgin']} items = {'items':['pidgin@Pidgin']}
try: self.deleteItems(items)
systray_proxy.deleteItems(encoder.encode(res), dbus_interface='com.soutade.GenericMonitor')
except dbus.exceptions.DBusException as e:
print(str(e))
pass
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) def onActivate(self):
super().onActivate()
for pConv in self.pidgin_conversations.values():
if not pConv.isSeen():
self.displayIcon()
break
bus = dbus.SessionBus() def signalHandler(signal_received, frame):
systray_proxy = bus.get_object('org.gnome.Shell', '/com/soutade/GenericMonitor') eventThread.stop()
eventThread.join()
bus.add_signal_receiver(pidginMessageReceived, 'ReceivedImMsg', 'im.pidgin.purple.PurpleInterface') groups = {'groups':['Mail', 'Pidgin']}
bus.add_signal_receiver(pidginMessageReceived, 'ReceivedChatMsg', 'im.pidgin.purple.PurpleInterface') eventThread.deleteGroups(groups)
bus.add_signal_receiver(pidginMessageWrote, 'WroteImMsg', 'im.pidgin.purple.PurpleInterface') sys.exit(0)
bus.add_signal_receiver(pidginMessageWrote, 'WroteChatMsg', 'im.pidgin.purple.PurpleInterface')
bus.add_signal_receiver(pidginConversationUpdated, 'ConversationUpdated', 'im.pidgin.purple.PurpleInterface')
eventThread = EventThread() eventThread = EventThread()
eventThread.start() eventThread.start()
signal(SIGINT, signalHandler) signal(SIGINT, signalHandler)
mainLoop = GLib.MainLoop() eventThread.runMainLoop()
mainLoop.run()

View File

@ -25,27 +25,21 @@ Display two timers with interactive behaviour :
* Right click : switch timer * Right click : switch timer
''' '''
import json
import time import time
import dbus
from dbus.mainloop.glib import DBusGMainLoop
import gi
gi.require_version('Gst', '1.0')
from gi.repository import GObject, GLib, Gst
from threading import Thread from threading import Thread
from signal import signal, SIGINT from signal import signal, SIGINT
import sys import sys
from dbusitem import DBUSItem from genericmonitor import GenericMonitor, GenericMonitorGroup, GenericMonitorItem
class TimerThread(Thread): class TimerThread(Thread,GenericMonitor):
def stop(self): def stop(self):
self._stopLoop = True self._stopLoop = True
self.stopMainLoop()
def _displayTimerValue(self): def _displayTimerValue(self):
encoder = json.JSONEncoder() item = GenericMonitorItem('timer', onClick='signal', box='right')
res = {'group':'Timer', 'items':[]} group = GenericMonitorGroup('Timer', item)
item = DBUSItem('timer', onClick='signal', box='right')
curValue = self.timers[self.curTimer] curValue = self.timers[self.curTimer]
item.text = '%02d:%02d' % (int(curValue/60)%60, curValue%60) item.text = '%02d:%02d' % (int(curValue/60)%60, curValue%60)
if curValue >= (60*60): if curValue >= (60*60):
@ -57,13 +51,10 @@ class TimerThread(Thread):
else: else:
style = 'color:#215D9C' style = 'color:#215D9C'
item.style = style item.style = style
res['items'] += item.toMap() self.notify(group)
try:
systray_proxy.notify(encoder.encode(res), dbus_interface='com.soutade.GenericMonitor')
except:
pass
def run(self): def run(self):
self.setupMonitor()
self.timers = [0, 0] self.timers = [0, 0]
self.curTimer = 0 self.curTimer = 0
self.timerPaused = False self.timerPaused = False
@ -98,48 +89,27 @@ class TimerThread(Thread):
self.timerPaused = True self.timerPaused = True
self._displayTimerValue() self._displayTimerValue()
def onDblRightClick(self, sender): def onActivate(self):
pass super().onActivate()
self.timerPaused = self._lastState
def onClick(sender): self._displayTimerValue()
timerThread.onClick(sender)
def onRightClick(sender):
timerThread.onRightClick(sender)
def onDblClick(sender):
timerThread.onDblClick(sender)
def onDblRightClick(sender):
timerThread.onDblRightClick(sender)
def onDeactivate(self):
super().onDeactivate()
self._lastState = self.timerPaused
if not self.timerPaused and self.curTimer == 0:
self.timerPaused = True
def signalHandler(signal_received, frame): def signalHandler(signal_received, frame):
timerThread.stop() timerThread.stop()
mainLoop.quit()
timerThread.join() timerThread.join()
encoder = json.JSONEncoder() groups = {'groups':['Timer']}
res = {'groups':['Timer']} timerThread.deleteGroups(groups)
try:
systray_proxy.deleteGroups(encoder.encode(res), dbus_interface='com.soutade.GenericMonitor')
except Exception as e:
pass
sys.exit(0) sys.exit(0)
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
bus = dbus.SessionBus()
systray_proxy = bus.get_object('org.gnome.Shell', '/com/soutade/GenericMonitor')
bus.add_signal_receiver(onClick, 'onClick', 'com.soutade.GenericMonitor')
bus.add_signal_receiver(onDblClick, 'onDblClick', 'com.soutade.GenericMonitor')
bus.add_signal_receiver(onRightClick, 'onRightClick', 'com.soutade.GenericMonitor')
bus.add_signal_receiver(onDblRightClick, 'onDblRightClick', 'com.soutade.GenericMonitor')
timerThread = TimerThread() timerThread = TimerThread()
timerThread.start() timerThread.start()
signal(SIGINT, signalHandler) signal(SIGINT, signalHandler)
mainLoop = GLib.MainLoop() timerThread.runMainLoop()
mainLoop.run()