#!/usr/bin/python3 # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # ''' Display number of mail from gmail account and Pidgin icon if there is some unread Pidgin message ''' import json import time import requests from requests.auth import HTTPBasicAuth import xml.dom.minidom import getpass import dbus from dbus.mainloop.glib import DBusGMainLoop import gi gi.require_version('Gst', '1.0') from gi.repository import GObject, GLib, Gst from threading import Thread from signal import signal, SIGINT import sys from dbusitem import DBUSItem MAIL_ADDRESS='XXX@gmail.com' def getMail(user, password): res = DBUSItem('mail') address = "https://mail.google.com/mail/feed/atom" auth=HTTPBasicAuth(user, password) req = requests.get(address, auth=auth) if req.status_code == requests.codes.ok: dom = xml.dom.minidom.parseString(req.text) try: nb_messages = int(dom.getElementsByTagName('fullcount')[0].firstChild.nodeValue) if nb_messages == 1: res.text = '1 msg' elif nb_messages > 1: res.text = '%d msgs' % (nb_messages) res.style = 'color:white' except Exception as e: res.text = str(e) else: res.text = 'Mail error %d' % (req.status_code) return res.toMap() def getEvents(mail_user, mail_password, proxy): encoder = json.JSONEncoder() res = {'group':'Mail', 'items':[]} res['items'] += getMail(mail_user, mail_password) try: proxy.notify(encoder.encode(res), dbus_interface='com.soutade.GenericMonitor') except dbus.exceptions.DBusException as e: print(str(e)) pass class EventThread(Thread): SLEEP_TIME = 30 def stop(self): self._stopLoop = True def run(self): self._stopLoop = False mail_password = getpass.getpass('Enter password for address %s: ' % (MAIL_ADDRESS)) while not self._stopLoop: getEvents(MAIL_ADDRESS, mail_password, systray_proxy) # Be more reactive on signal capture for i in range(0, EventThread.SLEEP_TIME): if self._stopLoop: break time.sleep(1) def signalHandler(signal_received, frame): eventThread.stop() mainLoop.quit() eventThread.join() encoder = json.JSONEncoder() res = {'groups':['Mail', 'Pidgin']} try: systray_proxy.deleteGroups(encoder.encode(res), dbus_interface='com.soutade.GenericMonitor') except Exception as e: pass sys.exit(0) PURPLE_CONV_UPDATE_UNSEEN = 4 PURPLE_MESSAGE_SEND = 0 class PidginConversation: STATUS_SEEN = 0 STATUS_UNSEEN = 1 def __init__(self, _id): self._id = _id self.status = PidginConversation.STATUS_SEEN self.nbMessages = 1 def updateNbMessages(self): self.nbMessages += 1 # Status is seen until it changes self.status = PidginConversation.STATUS_SEEN def setSeen(self): self.status = PidginConversation.STATUS_SEEN def setUnseen(self): self.nbMessages = 0 self.status = PidginConversation.STATUS_UNSEEN def isSeen(self): return (self.status == PidginConversation.STATUS_SEEN) pidgin_conversations = {} def pidginMessageReceived(account, sender, message, conversation, flags): pidginConversation = pidgin_conversations.get(conversation, None) if not pidginConversation: pidgin_conversations[conversation] = PidginConversation(conversation) else: pidginConversation.updateNbMessages() def pidginMessageWrote(account, sender, message, conversation, flags): if not (flags & (1 << PURPLE_MESSAGE_SEND)): return pidginConversation = pidgin_conversations.get(conversation, None) if not pidginConversation: pidgin_conversations[conversation] = PidginConversation(conversation) pidginConversation = pidgin_conversations[conversation] pidginConversation.setSeen() pidginConversation.nbMessages = 0 def pidginConversationUpdated(conversation, _type): if _type != PURPLE_CONV_UPDATE_UNSEEN: return pidginConversation = pidgin_conversations.get(conversation, None) if not pidginConversation: return if pidginConversation.isSeen(): if pidginConversation.nbMessages > 0: pidginConversation.setUnseen() else: pidginConversation.setSeen() encoder = json.JSONEncoder() # Message not seen by user if not pidginConversation.isSeen(): res = {'group':'Pidgin', 'items':[]} res['items'] += DBUSItem('pidgin', icon='/usr/share/icons/hicolor/22x22/apps/pidgin.png', iconStyle='icon-size:22px').toMap() try: systray_proxy.notify(encoder.encode(res), dbus_interface='com.soutade.GenericMonitor') except dbus.exceptions.DBusException as e: print(str(e)) pass else: deleteIcon = True # Are all messages seen ? for pConv in pidgin_conversations.values(): if not pConv.isSeen(): deleteIcon = False break if deleteIcon: res = {'items':['pidgin@Pidgin']} try: systray_proxy.deleteItems(encoder.encode(res), dbus_interface='com.soutade.GenericMonitor') except dbus.exceptions.DBusException as e: print(str(e)) pass dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) bus = dbus.SessionBus() systray_proxy = bus.get_object('org.gnome.Shell', '/com/soutade/GenericMonitor') bus.add_signal_receiver(pidginMessageReceived, 'ReceivedImMsg', 'im.pidgin.purple.PurpleInterface') bus.add_signal_receiver(pidginMessageReceived, 'ReceivedChatMsg', 'im.pidgin.purple.PurpleInterface') bus.add_signal_receiver(pidginMessageWrote, 'WroteImMsg', 'im.pidgin.purple.PurpleInterface') bus.add_signal_receiver(pidginMessageWrote, 'WroteChatMsg', 'im.pidgin.purple.PurpleInterface') bus.add_signal_receiver(pidginConversationUpdated, 'ConversationUpdated', 'im.pidgin.purple.PurpleInterface') eventThread = EventThread() eventThread.start() signal(SIGINT, signalHandler) mainLoop = GLib.MainLoop() mainLoop.run()