Compare commits
10 Commits
19600d8e41
...
f3c539ad78
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f3c539ad78 | ||
|
|
c899ec1c5a | ||
|
|
5cbb06292d | ||
|
|
fd96fc6d2e | ||
|
|
28ed5c1c30 | ||
|
|
534822077a | ||
|
|
d4addb9e78 | ||
|
|
0820dd4e3d | ||
|
|
60ac28b678 | ||
|
|
d93c61c90b |
@ -1,6 +1,5 @@
|
||||
pytest>=2.5
|
||||
sphinx>=1.1,<1.5
|
||||
colour>=0.0.5
|
||||
mock>=1.0
|
||||
pycodestyle>=1.5.7
|
||||
i3ipc>=1.2.0
|
||||
|
||||
@ -17,7 +17,7 @@ sys.path.insert(0, os.path.abspath('.'))
|
||||
sys.path.insert(0, os.path.abspath('..'))
|
||||
|
||||
# requires PyPI mock
|
||||
import mock
|
||||
import unittest.mock as mock
|
||||
|
||||
MOCK_MODULES = [
|
||||
"alsaaudio",
|
||||
@ -367,4 +367,4 @@ epub_copyright = '2013, Author'
|
||||
#epub_use_index = True
|
||||
|
||||
def setup(app):
|
||||
app.add_stylesheet('i3pystatus.css')
|
||||
app.add_css_file('i3pystatus.css')
|
||||
|
||||
@ -6,7 +6,8 @@ from i3pystatus.core.settings import SettingsBase
|
||||
from i3pystatus.core.util import formatp, get_module
|
||||
|
||||
import argparse
|
||||
import imp
|
||||
import importlib.util
|
||||
import importlib.machinery
|
||||
import logging
|
||||
import os
|
||||
|
||||
@ -35,6 +36,19 @@ def clock_example():
|
||||
status.run()
|
||||
|
||||
|
||||
def load_source(modname, filename):
|
||||
"""
|
||||
From: https://docs.python.org/3/whatsnew/3.12.html?highlight=load_source#imp
|
||||
"""
|
||||
loader = importlib.machinery.SourceFileLoader(modname, filename)
|
||||
spec = importlib.util.spec_from_file_location(modname, filename, loader=loader)
|
||||
module = importlib.util.module_from_spec(spec)
|
||||
# The module is always executed and not cached in sys.modules.
|
||||
# Uncomment the following line to cache the module.
|
||||
# sys.modules[module.__name__] = module
|
||||
loader.exec_module(module)
|
||||
return module
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description='''
|
||||
run i3pystatus configuration file. Starts i3pystatus clock example if no arguments were provided
|
||||
@ -44,6 +58,6 @@ def main():
|
||||
|
||||
if args.config:
|
||||
module_name = "i3pystatus-config"
|
||||
imp.load_source(module_name, args.config)
|
||||
load_source(module_name, args.config)
|
||||
else:
|
||||
clock_example()
|
||||
|
||||
237
i3pystatus/buds.py
Normal file
237
i3pystatus/buds.py
Normal file
@ -0,0 +1,237 @@
|
||||
from enum import Enum, IntEnum
|
||||
from json import JSONDecodeError, loads
|
||||
from i3pystatus import IntervalModule
|
||||
from i3pystatus.core.command import run_through_shell
|
||||
from i3pystatus.core.color import ColorRangeModule
|
||||
|
||||
|
||||
class BudsEqualizer(Enum):
|
||||
off = 0
|
||||
bass = 1
|
||||
soft = 2
|
||||
dynamic = 3
|
||||
clear = 4
|
||||
treble = 5
|
||||
|
||||
|
||||
class BudsPlacementStatus(IntEnum):
|
||||
wearing = 1
|
||||
idle = 2
|
||||
case = 3
|
||||
|
||||
|
||||
class Buds(IntervalModule, ColorRangeModule):
|
||||
earbuds_binary = "earbuds"
|
||||
|
||||
"""
|
||||
Displays information about Galaxy Buds devices
|
||||
|
||||
Requires the earbuds tool from https://github.com/JojiiOfficial/LiveBudsCli
|
||||
|
||||
.. rubric :: Available formatters
|
||||
* {amb} Displays the current ambient sound control status.
|
||||
* {anc} Displays the current active noise control status.
|
||||
* {battery} Displays combined battery level for left and right.
|
||||
If both are at the same level, it simply returns the battery level.
|
||||
If they have different levels and the drift threshold is enabled, provided
|
||||
they do not exceed the threshold, display the smaller level.
|
||||
If they have different battery levels, it returns both levels, if the threshold
|
||||
is exceeded.
|
||||
* `{left_battery}` Displays the left bud battery level.
|
||||
* `{right_battery}` Displays the right bud battery level.
|
||||
* `{battery_case} Displays the case battery level, if one of the buds is on the case.
|
||||
* `{device_model}` The model of the device.
|
||||
* `{equalizer} Displays current equalizer setting, only if the equalizer is on.
|
||||
* `{placement_left}` A placement indicator for the left bud, if it's on the (C)ase, (I)dle or being (W)ear.
|
||||
* `{placement_right}` A placement indicator for the right bud, if it's on the (C)ase, (I)dle or being (W)ear.
|
||||
* `{touchpad}` Displays if the touchpad is locked, and only if it is locked. A T(ouchpad)L(ocked) string indicates
|
||||
the touchpad is locked.
|
||||
"""
|
||||
|
||||
settings = (
|
||||
("format", "Format string used for output"),
|
||||
("interval", "Interval to run the module"),
|
||||
("hide_no_device", "Hide the output if no device is connected"),
|
||||
("battery_drift_threshold", "Drift threshold."),
|
||||
("use_battery_drift_threshold", "Whether to display combined or separate levels, based on drift"),
|
||||
("connected_color", "Output color for when the device is connected"),
|
||||
("disconnected_color", "Output color for when the device is disconnected"),
|
||||
("dynamic_color", "Output color based on battery level. Overrides connected_color"),
|
||||
("start_color", "Hex or English name for start of color range, eg '#00FF00' or 'green'"),
|
||||
("end_color", "Hex or English name for end of color range, eg '#FF0000' or 'red'")
|
||||
)
|
||||
|
||||
format = (
|
||||
"{device_model} "
|
||||
"L{placement_left}"
|
||||
"{battery}"
|
||||
"R{placement_right}"
|
||||
"{battery_case}"
|
||||
"{amb}"
|
||||
"{anc}"
|
||||
"{equalizer}"
|
||||
"{touchpad}"
|
||||
)
|
||||
hide_no_device = False
|
||||
battery_limit = 100
|
||||
battery_drift_threshold = 3
|
||||
use_battery_drift_threshold = True
|
||||
|
||||
connected_color = "#00FF00"
|
||||
disconnected_color = "#FFFFFF"
|
||||
dynamic_color = True
|
||||
colors = []
|
||||
|
||||
on_leftclick = 'toggle_anc'
|
||||
on_rightclick = 'toggle_amb'
|
||||
on_doubleleftclick = 'connect'
|
||||
on_doublerightclick = 'disconnect'
|
||||
on_middleclick = ['equalizer_set', BudsEqualizer.off]
|
||||
on_downscroll = ['equalizer_set', -1]
|
||||
on_upscroll = ['equalizer_set', +1]
|
||||
on_doublemiddleclick = 'restart_daemon'
|
||||
on_doubleupscroll = ['touchpad_set', 'true']
|
||||
on_doubledownscroll = ['touchpad_set', 'false']
|
||||
|
||||
def init(self):
|
||||
if not self.dynamic_color:
|
||||
self.end_color = self.start_color = self.connected_color
|
||||
# battery discharges from battery_limit to 0
|
||||
self.colors = self.get_hex_color_range(self.end_color, self.start_color, self.battery_limit)
|
||||
|
||||
def run(self):
|
||||
try:
|
||||
status = loads(run_through_shell(f"{self.earbuds_binary} status -o json -q").out)
|
||||
except JSONDecodeError:
|
||||
self.output = None
|
||||
else:
|
||||
payload = status.get("payload")
|
||||
if payload:
|
||||
amb = payload.get("ambient_sound_enabled")
|
||||
anc = payload.get("noise_reduction")
|
||||
left_battery = payload.get("batt_left")
|
||||
right_battery = payload.get("batt_right")
|
||||
equalizer_type = BudsEqualizer(payload.get("equalizer_type", 0))
|
||||
placement_left = payload.get("placement_left")
|
||||
placement_right = payload.get("placement_right")
|
||||
tab_lock_status = payload.get("tab_lock_status")
|
||||
# determine touchpad lock status
|
||||
touchpad = ""
|
||||
if tab_lock_status:
|
||||
touch_an_hold_on = tab_lock_status.get("touch_an_hold_on")
|
||||
tap_on = tab_lock_status.get("tap_on")
|
||||
if not touch_an_hold_on and not tap_on:
|
||||
touchpad = " TL"
|
||||
|
||||
# determine battery level and color to display
|
||||
battery_display = f"{left_battery} {right_battery}"
|
||||
color = self.connected_color
|
||||
combined_level = min(left_battery, right_battery)
|
||||
# if one bud has battery depleted, invert the logic.
|
||||
if left_battery == 0 or right_battery == 0:
|
||||
combined_level = max(left_battery, right_battery)
|
||||
if self.use_battery_drift_threshold:
|
||||
drift = abs(left_battery - right_battery)
|
||||
# only use drift if buds aren't on case, otherwise show both.
|
||||
if drift <= self.battery_drift_threshold and not (
|
||||
placement_left == BudsPlacementStatus.case or
|
||||
placement_right == BudsPlacementStatus.case
|
||||
):
|
||||
|
||||
battery_display = f"{combined_level}"
|
||||
|
||||
if self.dynamic_color:
|
||||
color = self.get_gradient(
|
||||
combined_level,
|
||||
self.colors,
|
||||
self.battery_limit
|
||||
)
|
||||
|
||||
fdict = {
|
||||
"amb": " AMB" if amb else "",
|
||||
"anc": " ANC" if anc else "",
|
||||
"battery": battery_display,
|
||||
"left_battery": left_battery,
|
||||
"right_battery": right_battery,
|
||||
"battery_case":
|
||||
f' {payload.get("batt_case")}C' if placement_left == BudsPlacementStatus.case
|
||||
or placement_right == BudsPlacementStatus.case else "",
|
||||
"device_model": payload.get("model"),
|
||||
"equalizer": "" if equalizer_type == BudsEqualizer.off else f" {equalizer_type.name.capitalize()}",
|
||||
"placement_left": self.translate_placement(placement_left),
|
||||
"placement_right": self.translate_placement(placement_right),
|
||||
"touchpad": touchpad
|
||||
}
|
||||
|
||||
self.output = {
|
||||
"full_text": self.format.format(**fdict),
|
||||
"color": color
|
||||
}
|
||||
|
||||
return payload
|
||||
else:
|
||||
if not self.hide_no_device:
|
||||
self.output = {
|
||||
"full_text": "Disconnected",
|
||||
"color": self.disconnected_color
|
||||
}
|
||||
else:
|
||||
self.output = None
|
||||
|
||||
return
|
||||
|
||||
def connect(self):
|
||||
run_through_shell(f"{self.earbuds_binary} connect")
|
||||
|
||||
def disconnect(self):
|
||||
run_through_shell(f"{self.earbuds_binary} disconnect")
|
||||
|
||||
def equalizer_set(self, adjustment):
|
||||
payload = self.run()
|
||||
if payload:
|
||||
current_eq = int(payload.get("equalizer_type", 0)) # Default to 0 if not found
|
||||
|
||||
if isinstance(adjustment, BudsEqualizer):
|
||||
new_eq_value = adjustment.value
|
||||
else: # Adjustment is -1 or +1
|
||||
# Calculate new equalizer setting, ensuring it wraps correctly within bounds
|
||||
new_eq_value = (current_eq + adjustment) % len(BudsEqualizer)
|
||||
|
||||
# Find the enum member corresponding to the new equalizer value
|
||||
new_eq_setting = BudsEqualizer(new_eq_value)
|
||||
|
||||
# Execute the command with the new equalizer setting
|
||||
run_through_shell(f"{self.earbuds_binary} set equalizer {new_eq_setting.name}")
|
||||
|
||||
def restart_daemon(self):
|
||||
run_through_shell(f"{self.earbuds_binary} -kd")
|
||||
|
||||
def toggle_amb(self):
|
||||
payload = self.run()
|
||||
if payload:
|
||||
amb = payload.get("ambient_sound_enabled")
|
||||
if amb:
|
||||
run_through_shell(f"{self.earbuds_binary} set ambientsound 0")
|
||||
else:
|
||||
run_through_shell(f"{self.earbuds_binary} set ambientsound 1")
|
||||
|
||||
def toggle_anc(self):
|
||||
payload = self.run()
|
||||
if payload:
|
||||
anc = payload.get("noise_reduction")
|
||||
if anc:
|
||||
run_through_shell(f"{self.earbuds_binary} set anc false")
|
||||
else:
|
||||
run_through_shell(f"{self.earbuds_binary} set anc true")
|
||||
|
||||
def touchpad_set(self, setting):
|
||||
run_through_shell(f"{self.earbuds_binary} set touchpad {setting}")
|
||||
|
||||
@staticmethod
|
||||
def translate_placement(placement):
|
||||
mapping = {
|
||||
BudsPlacementStatus.wearing.value: "W",
|
||||
BudsPlacementStatus.idle.value: "I",
|
||||
BudsPlacementStatus.case.value: "C",
|
||||
}
|
||||
return mapping.get(placement, "?")
|
||||
@ -87,8 +87,8 @@ class Module(SettingsBase):
|
||||
if "name" not in self.output:
|
||||
self.output["name"] = self.__name__
|
||||
self.output["instance"] = str(id(self))
|
||||
if (self.output.get("color", "") or "").lower() == "#ffffff":
|
||||
del self.output["color"]
|
||||
if (self.output.get("color", "") or "").lower() in ("", "#ffffff"):
|
||||
self.output.pop("color", None)
|
||||
if self.hints:
|
||||
for key, val in self.hints.items():
|
||||
if key not in self.output:
|
||||
|
||||
@ -1,4 +1,3 @@
|
||||
import os
|
||||
import re
|
||||
import subprocess
|
||||
|
||||
@ -46,7 +45,7 @@ class PulseAudio(Module, ColorRangeModule):
|
||||
("format_muted", "optional format string to use when muted"),
|
||||
("format_selected", "string used to mark this sink if selected"),
|
||||
"muted", "unmuted",
|
||||
"color_muted", "color_unmuted",
|
||||
"color_error", "color_muted", "color_unmuted",
|
||||
("step", "percentage to increment volume on scroll"),
|
||||
("sink", "sink name to use, None means pulseaudio default"),
|
||||
("move_sink_inputs", "Move all sink inputs when we change the default sink"),
|
||||
@ -64,6 +63,7 @@ class PulseAudio(Module, ColorRangeModule):
|
||||
format_selected = " 🗸"
|
||||
currently_muted = False
|
||||
has_amixer = False
|
||||
color_error = "#FF0000"
|
||||
color_muted = "#FF0000"
|
||||
color_unmuted = "#FFFFFF"
|
||||
vertical_bar_glyphs = None
|
||||
@ -120,8 +120,7 @@ class PulseAudio(Module, ColorRangeModule):
|
||||
if self.sink is not None:
|
||||
return self.sink
|
||||
|
||||
self.sinks = subprocess.check_output(['pactl', 'list', 'short', 'sinks'],
|
||||
universal_newlines=True).splitlines()
|
||||
self.sinks = self._call_pactl(["list", "short", "sinks"]).splitlines()
|
||||
bestsink = None
|
||||
state = 'DEFAULT'
|
||||
for sink in self.sinks:
|
||||
@ -191,19 +190,17 @@ class PulseAudio(Module, ColorRangeModule):
|
||||
output_format = self.format
|
||||
|
||||
if self.bar_type == 'vertical':
|
||||
volume_bar = make_vertical_bar(volume_percent, self.vertical_bar_width, glyphs=self.vertical_bar_glyphs)
|
||||
volume_bar = make_vertical_bar(volume_percent, self.vertical_bar_width,
|
||||
glyphs=self.vertical_bar_glyphs)
|
||||
elif self.bar_type == 'horizontal':
|
||||
volume_bar = make_bar(volume_percent)
|
||||
else:
|
||||
raise Exception("bar_type must be 'vertical' or 'horizontal'")
|
||||
|
||||
selected = ""
|
||||
dump = subprocess.check_output("pacmd dump".split(), universal_newlines=True)
|
||||
for line in dump.split("\n"):
|
||||
if line.startswith("set-default-sink"):
|
||||
default_sink = line.split()[1]
|
||||
if default_sink == self.current_sink:
|
||||
selected = self.format_selected
|
||||
default_sink = self._call_pactl(["get-default-sink"]).strip()
|
||||
if default_sink == self.current_sink:
|
||||
selected = self.format_selected
|
||||
|
||||
self.output = {
|
||||
"color": color,
|
||||
@ -223,28 +220,33 @@ class PulseAudio(Module, ColorRangeModule):
|
||||
def change_sink(self):
|
||||
sinks = list(s.split()[1] for s in self.sinks)
|
||||
if self.sink is None:
|
||||
next_sink = sinks[(sinks.index(self.current_sink) + 1) %
|
||||
len(sinks)]
|
||||
next_sink = sinks[(sinks.index(self.current_sink) + 1) % len(sinks)]
|
||||
else:
|
||||
next_sink = self.current_sink
|
||||
|
||||
if self.move_sink_inputs:
|
||||
sink_inputs = subprocess.check_output("pacmd list-sink-inputs".split(),
|
||||
universal_newlines=True)
|
||||
sink_inputs = self._call_pactl(["list-sink-inputs"])
|
||||
for input_index in re.findall(r'index:\s+(\d+)', sink_inputs):
|
||||
command = "pacmd move-sink-input {} {}".format(input_index, next_sink)
|
||||
pactl_args = ["move-sink-input", input_index, next_sink]
|
||||
self._call_pactl(pactl_args)
|
||||
self._call_pactl(["set-default-sink", next_sink])
|
||||
|
||||
# Not all applications can be moved and pulseaudio, and when
|
||||
# this fail pacmd print error messaging
|
||||
with open(os.devnull, 'w') as devnull:
|
||||
subprocess.call(command.split(), stdout=devnull)
|
||||
subprocess.call("pacmd set-default-sink {}".format(next_sink).split())
|
||||
def _call_pactl(self, pactl_arguments):
|
||||
try:
|
||||
output = subprocess.check_output(
|
||||
["pactl"] + [str(arg) for arg in pactl_arguments],
|
||||
universal_newlines=True)
|
||||
return output
|
||||
except Exception:
|
||||
self.logger.exception("Error while executing pactl")
|
||||
self.output = {"color": self.color_error, "full_text": "Error while executing pactl"}
|
||||
self.send_output()
|
||||
|
||||
def switch_mute(self):
|
||||
subprocess.call(['pactl', '--', 'set-sink-mute', self.current_sink, "toggle"])
|
||||
self._call_pactl(["--", "set-sink-mute", self.current_sink, "toggle"])
|
||||
|
||||
def increase_volume(self):
|
||||
subprocess.call(['pactl', '--', 'set-sink-volume', self.current_sink, "+%s%%" % self.step])
|
||||
self._call_pactl(["--", "set-sink-volume", self.current_sink, f"+{self.step}%"])
|
||||
|
||||
def decrease_volume(self):
|
||||
subprocess.call(['pactl', '--', 'set-sink-volume', self.current_sink, "-%s%%" % self.step])
|
||||
self._call_pactl(["--", "set-sink-volume", self.current_sink, f"-{self.step}%"])
|
||||
|
||||
@ -145,7 +145,7 @@ class MLB(ScoresBackend):
|
||||
required = ()
|
||||
|
||||
_default_colors = {
|
||||
'ARI': '#A71930',
|
||||
'AZ': '#A71930',
|
||||
'ATL': '#CE1141',
|
||||
'BAL': '#DF4601',
|
||||
'BOS': '#BD3039',
|
||||
|
||||
@ -284,21 +284,30 @@ class NHL(ScoresBackend):
|
||||
'score',
|
||||
callback=self.zero_fallback,
|
||||
default=0)
|
||||
ret[f'{team}_wins'] = self.get_nested(
|
||||
team_data,
|
||||
'leagueRecord:wins',
|
||||
callback=self.zero_fallback,
|
||||
default=0)
|
||||
ret[f'{team}_losses'] = self.get_nested(
|
||||
team_data,
|
||||
'leagueRecord:losses',
|
||||
callback=self.zero_fallback,
|
||||
default=0)
|
||||
ret[f'{team}_otl'] = self.get_nested(
|
||||
team_data,
|
||||
'leagueRecord:ot',
|
||||
callback=self.zero_fallback,
|
||||
default=0)
|
||||
ret[f'{team}_wins'] = int(
|
||||
self.get_nested(
|
||||
team_data,
|
||||
'leagueRecord:wins',
|
||||
callback=self.zero_fallback,
|
||||
default=0,
|
||||
)
|
||||
)
|
||||
ret[f'{team}_losses'] = int(
|
||||
self.get_nested(
|
||||
team_data,
|
||||
'leagueRecord:losses',
|
||||
callback=self.zero_fallback,
|
||||
default=0,
|
||||
)
|
||||
)
|
||||
ret[f'{team}_otl'] = int(
|
||||
self.get_nested(
|
||||
team_data,
|
||||
'leagueRecord:ot',
|
||||
callback=self.zero_fallback,
|
||||
default=0,
|
||||
)
|
||||
)
|
||||
|
||||
ret[f'{team}_city'] = self.get_nested(
|
||||
team_data,
|
||||
|
||||
@ -109,10 +109,15 @@ class Wunderground(WeatherBackend):
|
||||
except Exception as exc:
|
||||
self.logger.exception(f'Failed to load {url}')
|
||||
else:
|
||||
try:
|
||||
return re.search(r'apiKey=([0-9a-f]+)', page_source).group(1)
|
||||
except AttributeError:
|
||||
self.logger.debug('Scanning page source for embedded API keys')
|
||||
for key_match in re.finditer(r'apiKey=([0-9a-f]+)', page_source):
|
||||
self.api_key = key_match.group(1)
|
||||
self.logger.debug(f'Checking if {self.api_key} works')
|
||||
if self.api_request(self.observation_url.format(**vars(self))):
|
||||
break
|
||||
else:
|
||||
self.logger.error('Failed to find API key in mainpage source')
|
||||
self.api_key = None
|
||||
|
||||
@require(internet)
|
||||
def api_request(self, url, headers=None):
|
||||
@ -128,7 +133,7 @@ class Wunderground(WeatherBackend):
|
||||
Query the desired station and return the weather data
|
||||
'''
|
||||
# Get the API key from the page source
|
||||
self.api_key = self.get_api_key()
|
||||
self.get_api_key()
|
||||
if self.api_key is None:
|
||||
self.data['update_error'] = self.update_error
|
||||
return
|
||||
@ -136,7 +141,9 @@ class Wunderground(WeatherBackend):
|
||||
self.data['update_error'] = ''
|
||||
try:
|
||||
try:
|
||||
observation = self.api_request(self.observation_url.format(**vars(self)))['observations'][0]
|
||||
observation = self.api_request(
|
||||
self.observation_url.format(**vars(self))
|
||||
)['observations'][0]
|
||||
except (IndexError, KeyError):
|
||||
self.logger.error(
|
||||
'Failed to retrieve observation data from API response. '
|
||||
|
||||
222
i3pystatus/wifionice.py
Executable file
222
i3pystatus/wifionice.py
Executable file
@ -0,0 +1,222 @@
|
||||
from datetime import datetime, timedelta
|
||||
from json import loads
|
||||
from urllib.request import urlopen
|
||||
from threading import Condition, Thread
|
||||
|
||||
from i3pystatus import Module
|
||||
from i3pystatus.core.util import formatp, user_open
|
||||
|
||||
|
||||
class WifiOnIceAPI(Module):
|
||||
"""
|
||||
Displays information about your current trip on Deutsche Bahn trains.
|
||||
Allows you to open an url formatted using train information.
|
||||
|
||||
Requires the PyPI package `basiciw` if you want to use automatic
|
||||
detection. See below on how to disable automatic detection based on
|
||||
wifi adapter names.
|
||||
|
||||
.. rubric:: URL examples
|
||||
|
||||
* `https://travelynx.de/s/{last_station_no}?train={train_type}%20{train_no}` - Open travelynx check in page
|
||||
* `https://bahn.expert/details/{train_type}%20{train_no}/{trip_date}/?station={next_station_no}` - Show bahn.expert view for next station
|
||||
|
||||
.. rubric:: Available formatters
|
||||
|
||||
* `{arrival_in}` - Time until arrival (in form "1h 12m" or "53m")
|
||||
* `{arrival_time}` - Arrival time of train at the station (actual, if available, otherwise scheduled)
|
||||
* `{delay}` - delay of train in minutes
|
||||
* `{gps_lat}` - Current GPS latitude
|
||||
* `{gps_lon}` - Current GPS longitude
|
||||
* `{last_station_no}` - EVA number of the previous stop
|
||||
* `{net_current}` - current state of network quality
|
||||
* `{net_duration}` - how long until the next network quality change
|
||||
* `{net_expected}` - next state of network quality
|
||||
* `{next_platform}` - Platform number or name
|
||||
* `{next_station_no}` - EVA number of the next stop
|
||||
* `{next_station}` - Station name
|
||||
* `{speed}` - Train speed in km/h
|
||||
* `{train_no}` - Train number
|
||||
* `{train_type}` - Train Type (probably always `ICE`)
|
||||
"""
|
||||
|
||||
final_destination = 'Endstation'
|
||||
format_offtrain = None
|
||||
format_ontrain = '{speed}km/h > {next_station} ({arrival_in}[ | {delay}])'
|
||||
ice_status = {}
|
||||
off_train_interval = 10
|
||||
on_leftclick = 'open_url'
|
||||
on_train_interval = 2
|
||||
trip_info = {}
|
||||
url_on_click = ''
|
||||
wifi_adapters = ['wlan0']
|
||||
wifi_names = ['WiFi@DB', 'WIFIonICE']
|
||||
|
||||
settings = (
|
||||
("final_destination", "Information text for 'final destination has been reached'"),
|
||||
("format_offtrain", "Formatter for 'not on a train' (module hidden if `None` - no formatters available)"),
|
||||
("format_ontrain", "Formatter for 'on a train'"),
|
||||
("off_train_interval", "time between updates if no train is detected"),
|
||||
("on_train_interval", "time between updates while on a train"),
|
||||
("url_on_click", "URL to open when left-clicking the module"),
|
||||
("wifi_adapters", "List of wifi adapters the module should consider "
|
||||
"when detecting if you are in a train. Set to `None` "
|
||||
"to disable that functionality."),
|
||||
("wifi_names", "List of Wifi network names that should be considered 'on a train'."),
|
||||
)
|
||||
|
||||
def _format_time(self, seconds):
|
||||
if seconds is None:
|
||||
return "?"
|
||||
seconds = int(seconds)
|
||||
components = []
|
||||
if seconds >= 3600:
|
||||
hours = int(seconds / 3600)
|
||||
seconds -= hours * 3600
|
||||
components.append(f'{hours}h')
|
||||
if seconds >= 60:
|
||||
minutes = int(seconds / 60)
|
||||
seconds -= minutes * 60
|
||||
components.append(f'{minutes}m')
|
||||
if not components:
|
||||
components.append('now')
|
||||
return " ".join(components)
|
||||
|
||||
def _check_wifi(self):
|
||||
if self.wifi_adapters is None:
|
||||
self.logger.debug('Disabling automatic on-train detection because wifi_adapters is None')
|
||||
return True
|
||||
try:
|
||||
from basiciw import iwinfo
|
||||
except ModuleNotFoundError:
|
||||
self.logger.warning('Disabling automatic on-train detection because basiciw is not installed')
|
||||
return True
|
||||
for adapter in self.wifi_adapters:
|
||||
self.logger.info(f'Checking {adapter} for compatible wifi network')
|
||||
iwi = iwinfo(adapter)
|
||||
for wifi in self.wifi_names:
|
||||
if iwi['essid'].lower() == wifi.lower():
|
||||
self.logger.info(f'{adapter} uses {wifi} - success!')
|
||||
return True
|
||||
self.logger.info('No matching wifi connection found')
|
||||
return False
|
||||
|
||||
def _loop(self):
|
||||
self.logger.debug('begin of _loop()')
|
||||
while True:
|
||||
self.logger.debug('new _loop()')
|
||||
if self._check_wifi():
|
||||
self.logger.info('On a train :)')
|
||||
try:
|
||||
trip_info_req = urlopen('https://iceportal.de/api1/rs/tripInfo/trip')
|
||||
self.trip_info = loads(trip_info_req.read())['trip']
|
||||
|
||||
ice_status_req = urlopen('https://iceportal.de/api1/rs/status')
|
||||
self.ice_status = loads(ice_status_req.read())
|
||||
except Exception:
|
||||
self.trip_info = {}
|
||||
self.ice_status = {}
|
||||
|
||||
self.logger.debug(f'trip_info: {self.trip_info!r}')
|
||||
self.logger.debug(f'ice_status: {self.ice_status!r}')
|
||||
|
||||
self.update_bar()
|
||||
|
||||
with self.condition:
|
||||
self.condition.wait(self.on_train_interval)
|
||||
else:
|
||||
self.logger.info('Not on a train :(')
|
||||
|
||||
self.trip_info = {}
|
||||
self.ice_status = {}
|
||||
|
||||
self.logger.debug(f'trip_info: {self.trip_info!r}')
|
||||
self.logger.debug(f'ice_status: {self.ice_status!r}')
|
||||
|
||||
self.update_bar()
|
||||
|
||||
with self.condition:
|
||||
self.condition.wait(self.off_train_interval)
|
||||
|
||||
@property
|
||||
def _format_vars(self):
|
||||
format_vars = {
|
||||
'arrival_in': '?',
|
||||
'arrival_time': '',
|
||||
'gps_lat': self.ice_status['latitude'],
|
||||
'gps_lon': self.ice_status['longitude'],
|
||||
'last_station_no': self.trip_info['stopInfo']['actualLast'],
|
||||
'net_current': '',
|
||||
'net_duration': '?',
|
||||
'net_expected': '',
|
||||
'next_platform': '',
|
||||
'next_station': self.final_destination,
|
||||
'next_station_no': self.trip_info['stopInfo']['actualNext'],
|
||||
'speed': self.ice_status['speed'],
|
||||
'train_no': self.trip_info['vzn'],
|
||||
'train_type': self.trip_info['trainType'],
|
||||
'trip_date': self.trip_info['tripDate'],
|
||||
}
|
||||
|
||||
next_stop_id = self.trip_info['stopInfo']['actualNext']
|
||||
now = datetime.now()
|
||||
for stop in self.trip_info['stops']:
|
||||
if stop['station']['evaNr'] == next_stop_id:
|
||||
if stop['timetable']['departureDelay']:
|
||||
format_vars['delay'] = stop['timetable']['departureDelay']
|
||||
else:
|
||||
format_vars['delay'] = 0
|
||||
|
||||
if stop['timetable'].get('actualArrivalTime', 0):
|
||||
arrival = datetime.fromtimestamp(stop['timetable']['actualArrivalTime'] / 1000)
|
||||
arrival_in = arrival - now
|
||||
elif stop['timetable'].get('scheduledArrivalTime', 0):
|
||||
arrival = datetime.fromtimestamp(stop['timetable']['scheduledArrivalTime'] / 1000)
|
||||
arrival_in = arrival - now
|
||||
else:
|
||||
arrival = datetime.now()
|
||||
arrival_in = timedelta()
|
||||
|
||||
format_vars['next_station'] = stop['station']['name']
|
||||
format_vars['next_platform'] = stop['track']['actual']
|
||||
format_vars['arrival_time'] = arrival.strftime('%H:%M')
|
||||
format_vars['arrival_in'] = self._format_time(arrival_in.total_seconds())
|
||||
break
|
||||
|
||||
net_current = self.ice_status['connectivity']['currentState']
|
||||
net_future = self.ice_status['connectivity']['nextState']
|
||||
|
||||
if net_current not in (None, 'NO_INFO') or net_future not in (None, 'NO_INFO'):
|
||||
format_vars['net_current'] = net_current
|
||||
format_vars['net_expected'] = net_future
|
||||
format_vars['net_duration'] = self._format_time(self.ice_status['connectivity']['remainingTimeSeconds'])
|
||||
|
||||
self.logger.debug(f'format_vars: {format_vars!r}')
|
||||
return format_vars
|
||||
|
||||
def init(self):
|
||||
self.condition = Condition()
|
||||
self.thread = Thread(
|
||||
target=self._loop,
|
||||
daemon=True
|
||||
)
|
||||
self.thread.start()
|
||||
|
||||
def open_url(self):
|
||||
if not (self.trip_info and self.ice_status and self.url_on_click):
|
||||
return
|
||||
|
||||
user_open(self.url_on_click.format(**self._format_vars))
|
||||
|
||||
def update_bar(self):
|
||||
if self.trip_info and self.ice_status:
|
||||
self.output = {
|
||||
'full_text': formatp(self.format_ontrain, **self._format_vars).strip(),
|
||||
}
|
||||
else:
|
||||
if self.format_offtrain is not None:
|
||||
self.output = {
|
||||
'full_text': self.format_offtrain,
|
||||
}
|
||||
else:
|
||||
self.output = None
|
||||
48
tests/test_buds.json
Normal file
48
tests/test_buds.json
Normal file
@ -0,0 +1,48 @@
|
||||
{
|
||||
"connected_payload": {
|
||||
"status": "success",
|
||||
"device": "00:00:00:00:00:00",
|
||||
"status_message": null,
|
||||
"payload": {
|
||||
"address": "00:00:00:00:00:00",
|
||||
"ready": true,
|
||||
"batt_left": 53,
|
||||
"batt_right": 48,
|
||||
"batt_case": 88,
|
||||
"placement_left": 1,
|
||||
"placement_right": 1,
|
||||
"equalizer_type": 0,
|
||||
"touchpads_blocked": false,
|
||||
"noise_reduction": false,
|
||||
"did_battery_notify": false,
|
||||
"touchpad_option_left": 2,
|
||||
"touchpad_option_right": 2,
|
||||
"paused_music_earlier": false,
|
||||
"debug": {
|
||||
"voltage_left": 0.0,
|
||||
"voltage_right": 0.0,
|
||||
"temperature_left": 36.0,
|
||||
"temperature_right": 37.0,
|
||||
"current_left": 0.0,
|
||||
"current_right": 0.0
|
||||
},
|
||||
"model": "Buds2",
|
||||
"ambient_sound_enabled": false,
|
||||
"ambient_sound_volume": 0,
|
||||
"extra_high_ambient_volume": false,
|
||||
"tab_lock_status": {
|
||||
"touch_an_hold_on": true,
|
||||
"triple_tap_on": true,
|
||||
"double_tap_on": true,
|
||||
"tap_on": true,
|
||||
"touch_controls_on": true
|
||||
}
|
||||
}
|
||||
},
|
||||
"disconnected_payload": {
|
||||
"status": "error",
|
||||
"device": "",
|
||||
"status_message": "No connected device found",
|
||||
"payload": null
|
||||
}
|
||||
}
|
||||
542
tests/test_buds.py
Normal file
542
tests/test_buds.py
Normal file
@ -0,0 +1,542 @@
|
||||
import json
|
||||
import unittest
|
||||
from copy import deepcopy
|
||||
from unittest.mock import patch
|
||||
from i3pystatus.core.color import ColorRangeModule
|
||||
from i3pystatus.buds import Buds, BudsEqualizer, BudsPlacementStatus
|
||||
|
||||
|
||||
class TestBuds(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.buds = Buds()
|
||||
with open('test_buds.json', 'rb') as file:
|
||||
self.payload = json.load(file)
|
||||
|
||||
@patch('i3pystatus.buds.run_through_shell')
|
||||
def test_run_device_connected(self, mock_run):
|
||||
# Setup: Use json.dumps as we expect JSON
|
||||
payload = self.payload.get('connected_payload')
|
||||
mock_run.return_value.out = json.dumps(payload)
|
||||
|
||||
# Action: Call run() and save return for comparison
|
||||
buds_run_return = self.buds.run()
|
||||
|
||||
# Verify: Assert called with right params
|
||||
mock_run.assert_called_with(f"{self.buds.earbuds_binary} status -o json -q")
|
||||
|
||||
expected_output = {
|
||||
"full_text": "Buds2 LW53 48RW",
|
||||
"color": self.buds.get_gradient(
|
||||
48,
|
||||
self.buds.colors,
|
||||
self.buds.battery_limit
|
||||
)
|
||||
}
|
||||
|
||||
# Verify: Assert correct output
|
||||
self.assertEqual(expected_output, self.buds.output)
|
||||
# Verify: run() return is equal to payload
|
||||
self.assertDictEqual(payload.get('payload'), buds_run_return)
|
||||
|
||||
@patch('i3pystatus.buds.run_through_shell')
|
||||
def test_run_device_disconnected(self, mock_run):
|
||||
# Setup: Use json.dumps as we expect JSON
|
||||
mock_run.return_value.out = json.dumps(self.payload.get('disconnected_payload'))
|
||||
|
||||
# Action: Call run() and save return for comparison
|
||||
buds_run_return = self.buds.run()
|
||||
|
||||
# Verify: Assert called with right params
|
||||
mock_run.assert_called_with(f"{self.buds.earbuds_binary} status -o json -q")
|
||||
|
||||
expected_output = {
|
||||
"full_text": "Disconnected",
|
||||
"color": self.buds.disconnected_color
|
||||
}
|
||||
|
||||
# Verify: Assert correct output
|
||||
self.assertEqual(expected_output, self.buds.output)
|
||||
# Verify: run() return should be none
|
||||
self.assertIsNone(buds_run_return)
|
||||
|
||||
@patch('i3pystatus.buds.run_through_shell')
|
||||
def test_toggle_amb(self, mock_run):
|
||||
# Setup: AMB is initially disabled
|
||||
modified_payload = deepcopy(self.payload.get('connected_payload'))
|
||||
modified_payload['payload']['ambient_sound_enabled'] = False
|
||||
|
||||
mock_run.return_value.out = json.dumps(modified_payload)
|
||||
|
||||
# Action: Toggle AMB
|
||||
self.buds.toggle_amb()
|
||||
|
||||
# Verify: The correct command is sent to enable AMB
|
||||
mock_run.assert_called_with(f"{self.buds.earbuds_binary} set ambientsound 1")
|
||||
|
||||
# Setup: Change the payload again to update the AMB status
|
||||
modified_payload['payload']['ambient_sound_enabled'] = True
|
||||
mock_run.return_value.out = json.dumps(modified_payload)
|
||||
|
||||
# Action: Call run again to update output
|
||||
self.buds.run()
|
||||
|
||||
# Verify: The output correctly displays AMB is enabled
|
||||
expected_output = {
|
||||
"full_text": "Buds2 LW53 48RW AMB",
|
||||
"color": self.buds.get_gradient(
|
||||
48,
|
||||
self.buds.colors,
|
||||
self.buds.battery_limit
|
||||
)
|
||||
}
|
||||
|
||||
self.assertEqual(expected_output, self.buds.output)
|
||||
|
||||
# Action: Toggle AMB again
|
||||
self.buds.toggle_amb()
|
||||
|
||||
# Verify: The correct command is sent to disable AMB this time
|
||||
mock_run.assert_called_with(f"{self.buds.earbuds_binary} set ambientsound 0")
|
||||
|
||||
# Setup: Change the payload one last time to update the AMB status
|
||||
modified_payload['payload']['ambient_sound_enabled'] = False
|
||||
mock_run.return_value.out = json.dumps(modified_payload)
|
||||
|
||||
# Action: Call run again to update output
|
||||
self.buds.run()
|
||||
|
||||
# Verify: The output correctly displays AMB is disabled
|
||||
expected_output = {
|
||||
"full_text": "Buds2 LW53 48RW",
|
||||
"color": self.buds.get_gradient(
|
||||
48,
|
||||
self.buds.colors,
|
||||
self.buds.battery_limit
|
||||
)
|
||||
}
|
||||
|
||||
self.assertEqual(expected_output, self.buds.output)
|
||||
|
||||
@patch('i3pystatus.buds.run_through_shell')
|
||||
def test_toggle_anc(self, mock_run):
|
||||
# Setup: ANC is initially disabled
|
||||
modified_payload = deepcopy(self.payload.get('connected_payload'))
|
||||
modified_payload['payload']['noise_reduction'] = False
|
||||
|
||||
mock_run.return_value.out = json.dumps(modified_payload)
|
||||
|
||||
# Action: Toggle ANC
|
||||
self.buds.toggle_anc()
|
||||
|
||||
# Verify: The correct command is sent to enable ANC
|
||||
mock_run.assert_called_with(f"{self.buds.earbuds_binary} set anc true")
|
||||
|
||||
# Setup: Change the payload again to update the ANC status
|
||||
modified_payload['payload']['noise_reduction'] = True
|
||||
mock_run.return_value.out = json.dumps(modified_payload)
|
||||
|
||||
# Action: Call run again to update output
|
||||
self.buds.run()
|
||||
|
||||
# Verify: The output correctly displays ANC is enabled
|
||||
expected_output = {
|
||||
"full_text": "Buds2 LW53 48RW ANC",
|
||||
"color": self.buds.get_gradient(
|
||||
48,
|
||||
self.buds.colors,
|
||||
self.buds.battery_limit
|
||||
)
|
||||
}
|
||||
|
||||
self.assertEqual(expected_output, self.buds.output)
|
||||
|
||||
# Action: Toggle ANC again
|
||||
self.buds.toggle_anc()
|
||||
|
||||
# Verify: The correct command is sent to disable ANC this time
|
||||
mock_run.assert_called_with(f"{self.buds.earbuds_binary} set anc false")
|
||||
|
||||
# Setup: Change the payload one last time to update the ANC status
|
||||
modified_payload['payload']['noise_reduction'] = False
|
||||
mock_run.return_value.out = json.dumps(modified_payload)
|
||||
|
||||
# Action: Call run again to update output
|
||||
self.buds.run()
|
||||
|
||||
# Verify: The output correctly displays ANC is disabled
|
||||
expected_output = {
|
||||
"full_text": "Buds2 LW53 48RW",
|
||||
"color": self.buds.get_gradient(
|
||||
48,
|
||||
self.buds.colors,
|
||||
self.buds.battery_limit
|
||||
)
|
||||
}
|
||||
|
||||
self.assertEqual(expected_output, self.buds.output)
|
||||
|
||||
@patch('i3pystatus.buds.run_through_shell')
|
||||
def test_combined_battery(self, mock_run):
|
||||
# Setup: Equal left and right battery value
|
||||
modified_payload = deepcopy(self.payload.get('connected_payload'))
|
||||
modified_payload['payload']['batt_left'] = modified_payload['payload']['batt_right']
|
||||
|
||||
mock_run.return_value.out = json.dumps(modified_payload)
|
||||
|
||||
# Action: Call run() to update the output
|
||||
self.buds.run()
|
||||
|
||||
expected_output = {
|
||||
"full_text": "Buds2 LW48RW",
|
||||
"color": self.buds.get_gradient(
|
||||
48,
|
||||
self.buds.colors,
|
||||
self.buds.battery_limit
|
||||
)
|
||||
}
|
||||
|
||||
# Verify: The output correctly displays combined battery status
|
||||
self.assertEqual(expected_output, self.buds.output)
|
||||
|
||||
# Setup: Different left and right battery value
|
||||
mock_run.return_value.out = json.dumps(self.payload.get('connected_payload'))
|
||||
|
||||
# Action: Call run() again to update the output
|
||||
self.buds.run()
|
||||
|
||||
expected_output = {
|
||||
"full_text": "Buds2 LW53 48RW",
|
||||
"color": self.buds.get_gradient(
|
||||
48,
|
||||
self.buds.colors,
|
||||
self.buds.battery_limit
|
||||
)
|
||||
}
|
||||
|
||||
# Verify: The output correctly displays combined battery status
|
||||
self.assertEqual(expected_output, self.buds.output)
|
||||
|
||||
@patch('i3pystatus.buds.run_through_shell')
|
||||
def test_combined_battery_drift(self, mock_run):
|
||||
# Setup: Different battery level, should show smaller
|
||||
modified_payload = deepcopy(self.payload.get('connected_payload'))
|
||||
modified_payload['payload']['batt_left'] = modified_payload['payload']['batt_right']
|
||||
modified_payload['payload']['batt_left'] -= self.buds.battery_drift_threshold
|
||||
|
||||
mock_run.return_value.out = json.dumps(modified_payload)
|
||||
|
||||
# Action: Call run() to update the output
|
||||
self.buds.run()
|
||||
|
||||
expected_level = min(modified_payload['payload']['batt_left'], modified_payload['payload']['batt_right'])
|
||||
expected_output = {
|
||||
# Verify: The level should be the smallest one
|
||||
"full_text":
|
||||
f"Buds2 LW{expected_level}RW",
|
||||
"color": self.buds.get_gradient(
|
||||
expected_level,
|
||||
self.buds.colors,
|
||||
self.buds.battery_limit
|
||||
)
|
||||
}
|
||||
|
||||
# Verify: The output correctly displays combined battery status
|
||||
self.assertEqual(expected_output, self.buds.output)
|
||||
|
||||
# Setup: One battery is at level 0, should show the other
|
||||
modified_payload = deepcopy(self.payload.get('connected_payload'))
|
||||
modified_payload['payload']['batt_left'] = 0
|
||||
modified_payload['payload']['batt_right'] = 0 + self.buds.battery_drift_threshold
|
||||
|
||||
mock_run.return_value.out = json.dumps(modified_payload)
|
||||
|
||||
# Action: Call run() again to update the output
|
||||
self.buds.run()
|
||||
|
||||
expected_level = max(modified_payload['payload']['batt_left'], modified_payload['payload']['batt_right'])
|
||||
expected_output = {
|
||||
# Verify: The level should be the biggest one
|
||||
"full_text":
|
||||
f"Buds2 LW{expected_level}RW",
|
||||
"color": self.buds.get_gradient(
|
||||
expected_level,
|
||||
self.buds.colors,
|
||||
self.buds.battery_limit
|
||||
)
|
||||
}
|
||||
|
||||
# Verify: The output correctly displays combined battery status
|
||||
self.assertEqual(expected_output, self.buds.output)
|
||||
|
||||
@patch('i3pystatus.buds.run_through_shell')
|
||||
def test_combined_battery_drift_case(self, mock_run):
|
||||
# Setup: Change status of one buds to be on the case
|
||||
modified_payload = deepcopy(self.payload.get('connected_payload'))
|
||||
modified_payload['payload']['placement_left'] = BudsPlacementStatus.case
|
||||
|
||||
mock_run.return_value.out = json.dumps(modified_payload)
|
||||
|
||||
# Action: Call run() to update the output
|
||||
self.buds.run()
|
||||
|
||||
expected_output = {
|
||||
"full_text": f"Buds2 LC53 48RW 88C",
|
||||
"color": self.buds.get_gradient(
|
||||
48,
|
||||
self.buds.colors,
|
||||
self.buds.battery_limit
|
||||
)
|
||||
}
|
||||
|
||||
# Verify: The output correctly displays combined battery status
|
||||
self.assertEqual(expected_output, self.buds.output)
|
||||
|
||||
@patch('i3pystatus.buds.run_through_shell')
|
||||
def test_connect(self, mock_run):
|
||||
# Action: Call connect
|
||||
self.buds.connect()
|
||||
|
||||
# Verify: The correct command is sent to connect
|
||||
mock_run.assert_called_with(f"{self.buds.earbuds_binary} connect")
|
||||
|
||||
@patch('i3pystatus.buds.run_through_shell')
|
||||
def test_disconnect(self, mock_run):
|
||||
# Action: Call disconnect
|
||||
self.buds.disconnect()
|
||||
|
||||
# Verify: The correct command is sent to disconnect
|
||||
mock_run.assert_called_with(f"{self.buds.earbuds_binary} disconnect")
|
||||
|
||||
@patch('i3pystatus.buds.run_through_shell')
|
||||
def test_restart_daemin(self, mock_run):
|
||||
# Action: Call restart_daemon
|
||||
self.buds.restart_daemon()
|
||||
|
||||
# Verify: The correct command is sent to restart the daemon
|
||||
mock_run.assert_called_with(f"{self.buds.earbuds_binary} -kd")
|
||||
|
||||
def run_placement_helper(self, mock_run, placement_left, placement_right, case_battery, expected_display):
|
||||
modified_payload = deepcopy(self.payload.get('connected_payload'))
|
||||
modified_payload['payload']['placement_left'] = placement_left
|
||||
modified_payload['payload']['placement_right'] = placement_right
|
||||
if case_battery is not None:
|
||||
modified_payload['payload']['batt_case'] = case_battery
|
||||
mock_run.return_value.out = json.dumps(modified_payload)
|
||||
|
||||
self.buds.run()
|
||||
|
||||
expected_output = {
|
||||
"full_text": expected_display,
|
||||
"color": self.buds.get_gradient(
|
||||
48,
|
||||
self.buds.colors,
|
||||
self.buds.battery_limit
|
||||
)
|
||||
}
|
||||
self.assertEqual(expected_output, self.buds.output)
|
||||
|
||||
@patch('i3pystatus.buds.run_through_shell')
|
||||
def test_placement_wearing(self, mock_run):
|
||||
self.run_placement_helper(
|
||||
mock_run,
|
||||
BudsPlacementStatus.wearing.value,
|
||||
BudsPlacementStatus.wearing.value,
|
||||
None,
|
||||
"Buds2 LW53 48RW"
|
||||
)
|
||||
|
||||
@patch('i3pystatus.buds.run_through_shell')
|
||||
def test_placement_idle(self, mock_run):
|
||||
self.run_placement_helper(
|
||||
mock_run,
|
||||
BudsPlacementStatus.idle.value,
|
||||
BudsPlacementStatus.idle.value,
|
||||
None,
|
||||
"Buds2 LI53 48RI"
|
||||
)
|
||||
|
||||
@patch('i3pystatus.buds.run_through_shell')
|
||||
def test_placement_case_with_battery(self, mock_run):
|
||||
# Verify: Case battery is returned if a bud is on the case
|
||||
self.run_placement_helper(
|
||||
mock_run,
|
||||
BudsPlacementStatus.case.value,
|
||||
BudsPlacementStatus.case.value,
|
||||
88,
|
||||
"Buds2 LC53 48RC 88C"
|
||||
)
|
||||
|
||||
@patch('i3pystatus.buds.run_through_shell')
|
||||
def test_battery_level_dynamic_color(self, mock_run):
|
||||
# Setup: Build the colors array independently of our class
|
||||
colors = ColorRangeModule.get_hex_color_range(
|
||||
self.buds.end_color,
|
||||
self.buds.start_color,
|
||||
self.buds.battery_limit
|
||||
)
|
||||
modified_payload = deepcopy(self.payload.get('connected_payload'))
|
||||
|
||||
for battery_level in range(0, self.buds.battery_limit + 1):
|
||||
# Setup: Make both levels equal
|
||||
modified_payload['payload']['batt_left'] = battery_level
|
||||
modified_payload['payload']['batt_right'] = battery_level
|
||||
mock_run.return_value.out = json.dumps(modified_payload)
|
||||
|
||||
# Action: Call run() again to update the output
|
||||
self.buds.run()
|
||||
|
||||
expected_output = {
|
||||
"full_text": f"Buds2 LW{battery_level}RW",
|
||||
"color": self.buds.get_gradient(
|
||||
battery_level,
|
||||
colors,
|
||||
self.buds.battery_limit
|
||||
)
|
||||
}
|
||||
|
||||
self.assertEqual(expected_output, self.buds.output)
|
||||
|
||||
@patch('i3pystatus.buds.run_through_shell')
|
||||
def test_set_equalizer_direct(self, mock_run):
|
||||
for eq_setting in BudsEqualizer:
|
||||
with self.subTest(msg=f"Failed testing equalizer {eq_setting.name}", eq_setting=eq_setting):
|
||||
# Setup: Create a copy of the payload
|
||||
modified_payload = deepcopy(self.payload.get('connected_payload'))
|
||||
|
||||
mock_run.return_value.out = json.dumps(modified_payload)
|
||||
|
||||
# Action: Call the set function with each equalizer setting
|
||||
self.buds.equalizer_set(eq_setting)
|
||||
|
||||
expected_command = f"{self.buds.earbuds_binary} set equalizer {eq_setting.name}"
|
||||
|
||||
# Verify: Correct equalizer command is used
|
||||
mock_run.assert_called_with(expected_command)
|
||||
|
||||
# Setup: Modify payload to verify output
|
||||
modified_payload['payload']['equalizer_type'] = eq_setting.value
|
||||
mock_run.return_value.out = json.dumps(modified_payload)
|
||||
|
||||
# Action: Call run() again to update the output
|
||||
self.buds.run()
|
||||
|
||||
expected_equalizer = f" {eq_setting.name.capitalize()}" if eq_setting.name != "off" else ""
|
||||
expected_output = {
|
||||
"full_text": f"Buds2 LW53 48RW{expected_equalizer}",
|
||||
"color": self.buds.get_gradient(
|
||||
48,
|
||||
self.buds.colors,
|
||||
self.buds.battery_limit
|
||||
)
|
||||
}
|
||||
|
||||
# Verify: Output was updated with equalizer
|
||||
self.assertEqual(expected_output, self.buds.output)
|
||||
|
||||
@patch('i3pystatus.buds.run_through_shell')
|
||||
def test_increment_equalizer(self, mock_run):
|
||||
# Setup: Create a copy of the payload
|
||||
modified_payload = deepcopy(self.payload.get('connected_payload'))
|
||||
mock_run.return_value.out = json.dumps(modified_payload)
|
||||
|
||||
# Action: Call the set to increment by one the equalizer setting
|
||||
self.buds.equalizer_set(+1)
|
||||
|
||||
# Verify: Correct equalizer command is used
|
||||
expected_command = f"{self.buds.earbuds_binary} set equalizer {BudsEqualizer.bass.name}"
|
||||
mock_run.assert_called_with(expected_command)
|
||||
|
||||
# Setup: Modify payload to verify output
|
||||
modified_payload['payload']['equalizer_type'] = BudsEqualizer.bass.value
|
||||
mock_run.return_value.out = json.dumps(modified_payload)
|
||||
|
||||
# Action: Call run() again to update the output
|
||||
self.buds.run()
|
||||
|
||||
expected_equalizer = f" {BudsEqualizer.bass.name.capitalize()}"
|
||||
expected_output = {
|
||||
"full_text": f"Buds2 LW53 48RW{expected_equalizer}",
|
||||
"color": self.buds.get_gradient(
|
||||
48,
|
||||
self.buds.colors,
|
||||
self.buds.battery_limit
|
||||
)
|
||||
}
|
||||
|
||||
# Verify: Output was updated with equalizer
|
||||
self.assertEqual(expected_output, self.buds.output)
|
||||
|
||||
@patch('i3pystatus.buds.run_through_shell')
|
||||
def test_decrement_equalizer_from_off(self, mock_run):
|
||||
# Setup: Create a copy of the payload
|
||||
modified_payload = deepcopy(self.payload.get('connected_payload'))
|
||||
mock_run.return_value.out = json.dumps(modified_payload)
|
||||
|
||||
# Action: Call the set to decrement by one the equalizer setting
|
||||
self.buds.equalizer_set(-1)
|
||||
|
||||
# Verify: Correct equalizer command is used
|
||||
expected_command = f"{self.buds.earbuds_binary} set equalizer {BudsEqualizer.treble.name}"
|
||||
mock_run.assert_called_with(expected_command)
|
||||
|
||||
# Setup: Modify payload to verify output
|
||||
modified_payload['payload']['equalizer_type'] = BudsEqualizer.treble.value
|
||||
mock_run.return_value.out = json.dumps(modified_payload)
|
||||
|
||||
# Action: Call run() again to update the output
|
||||
self.buds.run()
|
||||
|
||||
expected_equalizer = f" {BudsEqualizer.treble.name.capitalize()}"
|
||||
expected_output = {
|
||||
"full_text": f"Buds2 LW53 48RW{expected_equalizer}",
|
||||
"color": self.buds.get_gradient(
|
||||
48,
|
||||
self.buds.colors,
|
||||
self.buds.battery_limit
|
||||
)
|
||||
}
|
||||
|
||||
# Verify: Output was updated with equalizer
|
||||
self.assertEqual(expected_output, self.buds.output)
|
||||
|
||||
def run_touchpad_set(self, mock_run, setting_value):
|
||||
# Setup: Create a copy of the payload
|
||||
modified_payload = deepcopy(self.payload.get('connected_payload'))
|
||||
mock_run.return_value.out = json.dumps(modified_payload)
|
||||
|
||||
# Action: Call the set with the appropriate setting
|
||||
self.buds.touchpad_set(f'{setting_value}')
|
||||
|
||||
# Verify: Correct command to disable the touchpad is called
|
||||
expected_command = f"{self.buds.earbuds_binary} set touchpad {setting_value}"
|
||||
mock_run.assert_called_with(expected_command)
|
||||
|
||||
# Setup: Modify the payload if we are disabling the touchpad
|
||||
if setting_value == 'false':
|
||||
modified_payload['payload']['tab_lock_status']['touch_an_hold_on'] = False
|
||||
modified_payload['payload']['tab_lock_status']['tap_on'] = False
|
||||
mock_run.return_value.out = json.dumps(modified_payload)
|
||||
|
||||
# Action: Call run() again to update the output
|
||||
self.buds.run()
|
||||
|
||||
# Setup:
|
||||
expected_output = {
|
||||
"full_text": f"Buds2 LW53 48RW{' TL' if setting_value == 'false' else ''}",
|
||||
"color": self.buds.get_gradient(
|
||||
48,
|
||||
self.buds.colors,
|
||||
self.buds.battery_limit
|
||||
)
|
||||
}
|
||||
|
||||
# Verify: Output was updated with equalizer
|
||||
self.assertEqual(expected_output, self.buds.output)
|
||||
|
||||
@patch('i3pystatus.buds.run_through_shell')
|
||||
def test_touchpad_disable(self, mock_run):
|
||||
self.run_touchpad_set(mock_run, "false")
|
||||
|
||||
@patch('i3pystatus.buds.run_through_shell')
|
||||
def test_touchpad_enable(self, mock_run):
|
||||
self.run_touchpad_set(mock_run, "true")
|
||||
@ -4,7 +4,7 @@ Requires the PyPI package `requests`
|
||||
"""
|
||||
|
||||
import unittest
|
||||
from mock import patch
|
||||
from unittest.mock import patch
|
||||
from requests import get
|
||||
from i3pystatus import hassio
|
||||
import json
|
||||
|
||||
@ -3,8 +3,7 @@ Basic test for the plexstatus module
|
||||
"""
|
||||
|
||||
import unittest
|
||||
from mock import patch
|
||||
from unittest.mock import MagicMock
|
||||
from unittest.mock import patch, MagicMock
|
||||
from urllib.request import urlopen
|
||||
from i3pystatus import lastfm
|
||||
|
||||
|
||||
@ -3,8 +3,7 @@ Basic test for the plexstatus module
|
||||
"""
|
||||
|
||||
import unittest
|
||||
from mock import patch
|
||||
from unittest.mock import MagicMock
|
||||
from unittest.mock import patch, MagicMock
|
||||
from urllib.request import urlopen
|
||||
from i3pystatus import plexstatus
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user