Update examples:

* Move examples in examples directory
       * Create separate DBUSItem class
       * Add timer example
This commit is contained in:
Grégory Soutadé 2020-05-04 10:09:59 +02:00
parent c7462662f4
commit 506660f1da
3 changed files with 178 additions and 23 deletions

26
examples/dbusitem.py Normal file
View File

@ -0,0 +1,26 @@
class DBUSItem:
def __init__(self, name, text='', style='', icon='', iconStyle='', onClick=''):
self.name = name
self.text = text
self.style= style
self.icon = icon
self.iconStyle = iconStyle
self.onClick = onClick
self._checkValues()
def _checkValues(self):
if self.onClick and not self.onClick in ('signal', 'delete'):
raise ValueError('Invalid onClick value')
def toMap(self):
myMap = {"name":self.name}
for p in ('text', 'style', 'icon'):
if self.__dict__[p]:
myMap[p] = self.__dict__[p]
if self.iconStyle:
myMap['icon-style'] = self.iconStyle
if self.onClick:
myMap['on-click'] = self.onClick
return [myMap]

View File

@ -33,26 +33,10 @@ from gi.repository import GObject, GLib, Gst
from threading import Thread from threading import Thread
from signal import signal, SIGINT from signal import signal, SIGINT
import sys import sys
from dbusitem import DBUSItem
MAIL_ADDRESS='XXX@gmail.com' MAIL_ADDRESS='XXX@gmail.com'
class DBUSItem:
def __init__(self, name, text='', style='', icon='', iconStyle=''):
self.name = name
self.text = text
self.style= style
self.icon = icon
self.iconStyle = iconStyle
def toMap(self):
myMap = {"name":self.name}
for p in ('text', 'style', 'icon'):
if self.__dict__[p]:
myMap[p] = self.__dict__[p]
if self.iconStyle:
myMap['icon-style'] = self.iconStyle
return [myMap]
def getMail(user, password): def getMail(user, password):
res = DBUSItem('mail') res = DBUSItem('mail')
@ -85,7 +69,7 @@ def getEvents(mail_user, mail_password, proxy):
print(str(e)) print(str(e))
pass pass
class StockThread(Thread): class EventThread(Thread):
SLEEP_TIME = 30 SLEEP_TIME = 30
def stop(self): def stop(self):
@ -97,14 +81,14 @@ class StockThread(Thread):
while not self._stopLoop: while not self._stopLoop:
getEvents(MAIL_ADDRESS, mail_password, systray_proxy) getEvents(MAIL_ADDRESS, mail_password, systray_proxy)
# Be more reactive on signal capture # Be more reactive on signal capture
for i in range(0, StockThread.SLEEP_TIME): for i in range(0, EventThread.SLEEP_TIME):
if self._stopLoop: break if self._stopLoop: break
time.sleep(1) time.sleep(1)
def signalHandler(signal_received, frame): def signalHandler(signal_received, frame):
stockThread.stop() eventThread.stop()
mainLoop.quit() mainLoop.quit()
stockThread.join() eventThread.join()
encoder = json.JSONEncoder() encoder = json.JSONEncoder()
res = {'groups':['Mail', 'Pidgin']} res = {'groups':['Mail', 'Pidgin']}
try: try:
@ -209,8 +193,8 @@ bus.add_signal_receiver(pidginMessageWrote, 'WroteImMsg', 'im.pi
bus.add_signal_receiver(pidginMessageWrote, 'WroteChatMsg', 'im.pidgin.purple.PurpleInterface') bus.add_signal_receiver(pidginMessageWrote, 'WroteChatMsg', 'im.pidgin.purple.PurpleInterface')
bus.add_signal_receiver(pidginConversationUpdated, 'ConversationUpdated', 'im.pidgin.purple.PurpleInterface') bus.add_signal_receiver(pidginConversationUpdated, 'ConversationUpdated', 'im.pidgin.purple.PurpleInterface')
stockThread = StockThread() eventThread = EventThread()
stockThread.start() eventThread.start()
signal(SIGINT, signalHandler) signal(SIGINT, signalHandler)

145
examples/timer.py Executable file
View File

@ -0,0 +1,145 @@
#!/usr/bin/python3
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
'''
Display two timers with interactive behaviour :
* One timer is white and background become red after one hour
* Second timer is blue
* Click : start/stop timer
* Double click : reset timer
* Right click : switch timer
'''
import json
import time
import dbus
from dbus.mainloop.glib import DBusGMainLoop
import gi
gi.require_version('Gst', '1.0')
from gi.repository import GObject, GLib, Gst
from threading import Thread
from signal import signal, SIGINT
import sys
from dbusitem import DBUSItem
class TimerThread(Thread):
def stop(self):
self._stopLoop = True
def _displayTimerValue(self):
encoder = json.JSONEncoder()
res = {'group':'Timer', 'items':[]}
item = DBUSItem('timer', onClick='signal')
curValue = self.timers[self.curTimer]
item.text = '%02d:%02d' % (int(curValue/60)%60, curValue%60)
if curValue >= (60*60):
item.text = '%02d:%s' % (int(curValue/(60*60)), item.text)
if self.curTimer == 0:
style = 'color:white'
if curValue > (60*60):
style += ';background-color:red'
else:
style = 'color:#215D9C'
item.style = style
res['items'] += item.toMap()
try:
systray_proxy.notify(encoder.encode(res), dbus_interface='com.soutade.GenericMonitor')
except:
pass
def run(self):
self.timers = [0, 0]
self.curTimer = 0
self.timerPaused = False
self._stopLoop = False
while not self._stopLoop:
time.sleep(1)
if not self.timerPaused:
self.timers[self.curTimer] += 1
self._displayTimerValue()
def _forMe(self, sender):
return sender == 'timer@Timer'
def onClick(self, sender):
if not self._forMe(sender): return
self.timerPaused = not self.timerPaused
self._displayTimerValue()
def onRightClick(self, sender):
if not self._forMe(sender): return
if self.curTimer == 0:
self.curTimer = 1
else:
self.curTimer = 0
self.timers[self.curTimer] = 0
self.timerPaused = False
self._displayTimerValue()
def onDblClick(self, sender):
if not self._forMe(sender): return
self.timers[self.curTimer] = 0
self.timerPaused = True
self._displayTimerValue()
def onDblRightClick(self, sender):
pass
def onClick(sender):
timerThread.onClick(sender)
def onRightClick(sender):
timerThread.onRightClick(sender)
def onDblClick(sender):
timerThread.onDblClick(sender)
def onDblRightClick(sender):
timerThread.onDblRightClick(sender)
def signalHandler(signal_received, frame):
timerThread.stop()
mainLoop.quit()
timerThread.join()
encoder = json.JSONEncoder()
res = {'groups':['Timer']}
try:
systray_proxy.deleteGroups(encoder.encode(res), dbus_interface='com.soutade.GenericMonitor')
except Exception as e:
pass
sys.exit(0)
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
bus = dbus.SessionBus()
systray_proxy = bus.get_object('org.gnome.Shell', '/com/soutade/GenericMonitor')
bus.add_signal_receiver(onClick, 'onClick', 'com.soutade.GenericMonitor')
bus.add_signal_receiver(onDblClick, 'onDblClick', 'com.soutade.GenericMonitor')
bus.add_signal_receiver(onRightClick, 'onRightClick', 'com.soutade.GenericMonitor')
bus.add_signal_receiver(onDblRightClick, 'onDblRightClick', 'com.soutade.GenericMonitor')
timerThread = TimerThread()
timerThread.start()
signal(SIGINT, signalHandler)
mainLoop = GLib.MainLoop()
mainLoop.run()