219 lines
7.1 KiB
Python
Executable File
219 lines
7.1 KiB
Python
Executable File
#!/usr/bin/python3
|
|
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
#
|
|
|
|
'''
|
|
Display number of mail from gmail account and Pidgin icon if there is some unread Pidgin message
|
|
'''
|
|
|
|
import json
|
|
import time
|
|
import requests
|
|
from requests.auth import HTTPBasicAuth
|
|
import xml.dom.minidom
|
|
import getpass
|
|
import dbus
|
|
from dbus.mainloop.glib import DBusGMainLoop
|
|
import gi
|
|
gi.require_version('Gst', '1.0')
|
|
from gi.repository import GObject, GLib, Gst
|
|
from threading import Thread
|
|
from signal import signal, SIGINT
|
|
import sys
|
|
|
|
MAIL_ADDRESS='XXX@gmail.com'
|
|
|
|
class DBUSItem:
|
|
def __init__(self, name, text='', style='', icon='', iconStyle=''):
|
|
self.name = name
|
|
self.text = text
|
|
self.style= style
|
|
self.icon = icon
|
|
self.iconStyle = iconStyle
|
|
|
|
def toMap(self):
|
|
myMap = {"name":self.name}
|
|
for p in ('text', 'style', 'icon'):
|
|
if self.__dict__[p]:
|
|
myMap[p] = self.__dict__[p]
|
|
if self.iconStyle:
|
|
myMap['icon-style'] = self.iconStyle
|
|
return [myMap]
|
|
|
|
def getMail(user, password):
|
|
res = DBUSItem('mail')
|
|
|
|
address = "https://mail.google.com/mail/feed/atom"
|
|
auth=HTTPBasicAuth(user, password)
|
|
req = requests.get(address, auth=auth)
|
|
if req.status_code == requests.codes.ok:
|
|
dom = xml.dom.minidom.parseString(req.text)
|
|
try:
|
|
nb_messages = int(dom.getElementsByTagName('fullcount')[0].firstChild.nodeValue)
|
|
if nb_messages == 1:
|
|
res.text = '1 msg'
|
|
elif nb_messages > 1:
|
|
res.text = '%d msgs' % (nb_messages)
|
|
res.style = 'color:white'
|
|
except Exception as e:
|
|
res.text = str(e)
|
|
else:
|
|
res.text = 'Mail error %d' % (req.status_code)
|
|
|
|
return res.toMap()
|
|
|
|
def getEvents(mail_user, mail_password, proxy):
|
|
encoder = json.JSONEncoder()
|
|
res = {'group':'Mail', 'items':[]}
|
|
res['items'] += getMail(mail_user, mail_password)
|
|
try:
|
|
proxy.notify(encoder.encode(res), dbus_interface='com.soutade.GenericMonitor')
|
|
except dbus.exceptions.DBusException as e:
|
|
print(str(e))
|
|
pass
|
|
|
|
class StockThread(Thread):
|
|
SLEEP_TIME = 30
|
|
|
|
def stop(self):
|
|
self._stopLoop = True
|
|
|
|
def run(self):
|
|
self._stopLoop = False
|
|
mail_password = getpass.getpass('Enter password for address %s: ' % (MAIL_ADDRESS))
|
|
while not self._stopLoop:
|
|
getEvents(MAIL_ADDRESS, mail_password, systray_proxy)
|
|
# Be more reactive on signal capture
|
|
for i in range(0, StockThread.SLEEP_TIME):
|
|
if self._stopLoop: break
|
|
time.sleep(1)
|
|
|
|
def signalHandler(signal_received, frame):
|
|
stockThread.stop()
|
|
mainLoop.quit()
|
|
stockThread.join()
|
|
encoder = json.JSONEncoder()
|
|
res = {'groups':['Mail', 'Pidgin']}
|
|
try:
|
|
systray_proxy.deleteGroups(encoder.encode(res), dbus_interface='com.soutade.GenericMonitor')
|
|
except Exception as e:
|
|
pass
|
|
sys.exit(0)
|
|
|
|
PURPLE_CONV_UPDATE_UNSEEN = 4
|
|
PURPLE_MESSAGE_SEND = 0
|
|
|
|
class PidginConversation:
|
|
STATUS_SEEN = 0
|
|
STATUS_UNSEEN = 1
|
|
|
|
def __init__(self, _id):
|
|
self._id = _id
|
|
self.status = PidginConversation.STATUS_SEEN
|
|
self.nbMessages = 1
|
|
|
|
def updateNbMessages(self):
|
|
self.nbMessages += 1
|
|
# Status is seen until it changes
|
|
self.status = PidginConversation.STATUS_SEEN
|
|
|
|
def setSeen(self):
|
|
self.status = PidginConversation.STATUS_SEEN
|
|
|
|
def setUnseen(self):
|
|
self.nbMessages = 0
|
|
self.status = PidginConversation.STATUS_UNSEEN
|
|
|
|
def isSeen(self):
|
|
return (self.status == PidginConversation.STATUS_SEEN)
|
|
|
|
pidgin_conversations = {}
|
|
|
|
def pidginMessageReceived(account, sender, message, conversation, flags):
|
|
pidginConversation = pidgin_conversations.get(conversation, None)
|
|
if not pidginConversation:
|
|
pidgin_conversations[conversation] = PidginConversation(conversation)
|
|
else:
|
|
pidginConversation.updateNbMessages()
|
|
|
|
def pidginMessageWrote(account, sender, message, conversation, flags):
|
|
if not (flags & (1 << PURPLE_MESSAGE_SEND)):
|
|
return
|
|
pidginConversation = pidgin_conversations.get(conversation, None)
|
|
if not pidginConversation:
|
|
pidgin_conversations[conversation] = PidginConversation(conversation)
|
|
pidginConversation = pidgin_conversations[conversation]
|
|
pidginConversation.setSeen()
|
|
pidginConversation.nbMessages = 0
|
|
|
|
def pidginConversationUpdated(conversation, _type):
|
|
if _type != PURPLE_CONV_UPDATE_UNSEEN:
|
|
return
|
|
|
|
pidginConversation = pidgin_conversations.get(conversation, None)
|
|
if not pidginConversation:
|
|
return
|
|
|
|
if pidginConversation.isSeen():
|
|
if pidginConversation.nbMessages > 0:
|
|
pidginConversation.setUnseen()
|
|
else:
|
|
pidginConversation.setSeen()
|
|
|
|
encoder = json.JSONEncoder()
|
|
# Message not seen by user
|
|
if not pidginConversation.isSeen():
|
|
res = {'group':'Pidgin', 'items':[]}
|
|
res['items'] += DBUSItem('pidgin', icon='/usr/share/icons/hicolor/22x22/apps/pidgin.png', iconStyle='icon-size:22px').toMap()
|
|
try:
|
|
systray_proxy.notify(encoder.encode(res), dbus_interface='com.soutade.GenericMonitor')
|
|
except dbus.exceptions.DBusException as e:
|
|
print(str(e))
|
|
pass
|
|
else:
|
|
deleteIcon = True
|
|
# Are all messages seen ?
|
|
for pConv in pidgin_conversations.values():
|
|
if not pConv.isSeen():
|
|
deleteIcon = False
|
|
break
|
|
if deleteIcon:
|
|
res = {'items':['pidgin@Pidgin']}
|
|
try:
|
|
systray_proxy.deleteItems(encoder.encode(res), dbus_interface='com.soutade.GenericMonitor')
|
|
except dbus.exceptions.DBusException as e:
|
|
print(str(e))
|
|
pass
|
|
|
|
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
|
|
|
|
bus = dbus.SessionBus()
|
|
systray_proxy = bus.get_object('org.gnome.Shell', '/com/soutade/GenericMonitor')
|
|
|
|
bus.add_signal_receiver(pidginMessageReceived, 'ReceivedImMsg', 'im.pidgin.purple.PurpleInterface')
|
|
bus.add_signal_receiver(pidginMessageReceived, 'ReceivedChatMsg', 'im.pidgin.purple.PurpleInterface')
|
|
bus.add_signal_receiver(pidginMessageWrote, 'WroteImMsg', 'im.pidgin.purple.PurpleInterface')
|
|
bus.add_signal_receiver(pidginMessageWrote, 'WroteChatMsg', 'im.pidgin.purple.PurpleInterface')
|
|
bus.add_signal_receiver(pidginConversationUpdated, 'ConversationUpdated', 'im.pidgin.purple.PurpleInterface')
|
|
|
|
stockThread = StockThread()
|
|
stockThread.start()
|
|
|
|
signal(SIGINT, signalHandler)
|
|
|
|
mainLoop = GLib.MainLoop()
|
|
mainLoop.run()
|