GnomeShellGenericMonitor/example.py
2020-03-30 10:45:26 +02:00

204 lines
6.8 KiB
Python
Executable File

#!/usr/bin/python3
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
'''
Display number of mail from gmail account and Pidgin icon if there is some unread Pidgin message
'''
import json
import time
import requests
from requests.auth import HTTPBasicAuth
import xml.dom.minidom
import getpass
import dbus
from dbus.mainloop.glib import DBusGMainLoop
import gi
gi.require_version('Gst', '1.0')
from gi.repository import GObject, GLib, Gst
from threading import Thread
from signal import signal, SIGINT
import sys
MAIL_ADDRESS='XXX@gmail.com'
class DBUSItem:
def __init__(self, name, text='', style='', icon='', iconStyle=''):
self.name = name
self.text = text
self.style= style
self.icon = icon
self.iconStyle = iconStyle
def toMap(self):
myMap = {"name":self.name}
for p in ('text', 'style', 'icon'):
if self.__dict__[p]:
myMap[p] = self.__dict__[p]
if self.iconStyle:
myMap['icon-style'] = self.iconStyle
return [myMap]
def getMail(user, password):
res = DBUSItem('mail')
address = "https://mail.google.com/mail/feed/atom"
auth=HTTPBasicAuth(user, password)
req = requests.get(address, auth=auth)
if req.status_code == requests.codes.ok:
dom = xml.dom.minidom.parseString(req.text)
try:
nb_messages = int(dom.getElementsByTagName('fullcount')[0].firstChild.nodeValue)
if nb_messages == 1:
res.text = '1 msg'
elif nb_messages > 1:
res.text = '%d msgs' % (nb_messages)
res.style = 'color:white'
except Exception as e:
res.text = str(e)
else:
res.text = 'Mail error %d' % (req.status_code)
return res.toMap()
def getEvents(mail_user, mail_password, proxy):
encoder = json.JSONEncoder()
res = {'group':'Mail', 'items':[]}
res['items'] += getMail(mail_user, mail_password)
proxy.notify(encoder.encode(res), dbus_interface='com.soutade.GenericMonitor')
class StockThread(Thread):
SLEEP_TIME = 30
def stop(self):
self._stopLoop = True
def run(self):
self._stopLoop = False
mail_password = getpass.getpass('Enter password for address %s: ' % (MAIL_ADDRESS))
while not self._stopLoop:
getEvents(MAIL_ADDRESS, mail_password, systray_proxy)
# Be more reactive on signal capture
for i in range(0, StockThread.SLEEP_TIME):
if self._stopLoop: break
time.sleep(1)
def signalHandler(signal_received, frame):
stockThread.stop()
mainLoop.quit()
stockThread.join()
encoder = json.JSONEncoder()
res = {'groups':['Mail', 'Pidgin']}
systray_proxy.deleteGroups(encoder.encode(res), dbus_interface='com.soutade.GenericMonitor')
sys.exit(0)
PURPLE_CONV_UPDATE_UNSEEN = 4
PURPLE_MESSAGE_SEND = 0
class PidginConversation:
STATUS_SEEN = 0
STATUS_UNSEEN = 1
def __init__(self, _id):
self._id = _id
self.status = PidginConversation.STATUS_SEEN
self.nbMessages = 1
def updateNbMessages(self):
self.nbMessages += 1
# Status is seen until it changes
self.status = PidginConversation.STATUS_SEEN
def setSeen(self):
self.status = PidginConversation.STATUS_SEEN
def setUnseen(self):
self.nbMessages = 0
self.status = PidginConversation.STATUS_UNSEEN
def isSeen(self):
return (self.status == PidginConversation.STATUS_SEEN)
pidgin_conversations = {}
def pidginMessageReceived(account, sender, message, conversation, flags):
pidginConversation = pidgin_conversations.get(conversation, None)
if not pidginConversation:
pidgin_conversations[conversation] = PidginConversation(conversation)
else:
pidginConversation.updateNbMessages()
def pidginMessageWrote(account, sender, message, conversation, flags):
if not (flags & (1 << PURPLE_MESSAGE_SEND)):
return
pidginConversation = pidgin_conversations.get(conversation, None)
if not pidginConversation:
pidgin_conversations[conversation] = PidginConversation(conversation)
pidginConversation = pidgin_conversations[conversation]
pidginConversation.setSeen()
pidginConversation.nbMessages = 0
def pidginConversationUpdated(conversation, _type):
if _type != PURPLE_CONV_UPDATE_UNSEEN:
return
pidginConversation = pidgin_conversations.get(conversation, None)
if not pidginConversation:
return
if pidginConversation.isSeen():
if pidginConversation.nbMessages > 0:
pidginConversation.setUnseen()
else:
pidginConversation.setSeen()
encoder = json.JSONEncoder()
# Message not seen by user
if not pidginConversation.isSeen():
res = {'group':'Pidgin', 'items':[]}
res['items'] += DBUSItem('pidgin', icon='/usr/share/icons/hicolor/22x22/apps/pidgin.png', iconStyle='icon-size:22px').toMap()
systray_proxy.notify(encoder.encode(res), dbus_interface='com.soutade.GenericMonitor')
else:
deleteIcon = True
# Are all messages seen ?
for pConv in pidgin_conversations.values():
if not pConv.isSeen():
deleteIcon = False
break
if deleteIcon:
res = {'items':['pidgin@Pidgin']}
systray_proxy.deleteItems(encoder.encode(res), dbus_interface='com.soutade.GenericMonitor')
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
bus = dbus.SessionBus()
systray_proxy = bus.get_object('org.gnome.Shell', '/com/soutade/GenericMonitor')
bus.add_signal_receiver(pidginMessageReceived, 'ReceivedImMsg', 'im.pidgin.purple.PurpleInterface')
bus.add_signal_receiver(pidginMessageReceived, 'ReceivedChatMsg', 'im.pidgin.purple.PurpleInterface')
bus.add_signal_receiver(pidginMessageWrote, 'WroteImMsg', 'im.pidgin.purple.PurpleInterface')
bus.add_signal_receiver(pidginMessageWrote, 'WroteChatMsg', 'im.pidgin.purple.PurpleInterface')
bus.add_signal_receiver(pidginConversationUpdated, 'ConversationUpdated', 'im.pidgin.purple.PurpleInterface')
stockThread = StockThread()
stockThread.start()
signal(SIGINT, signalHandler)
mainLoop = GLib.MainLoop()
mainLoop.run()