diff --git a/pyprland/command.py b/pyprland/command.py index e36a61a..621d259 100755 --- a/pyprland/command.py +++ b/pyprland/command.py @@ -1,4 +1,5 @@ #!/bin/env python +" Pyprland - an Hyprland companion app " import asyncio import json import sys @@ -17,25 +18,29 @@ CONFIG_FILE = "~/.config/hypr/pyprland.json" class Pyprland: + "Main app object" server: asyncio.Server event_reader: asyncio.StreamReader stopped = False name = "builtin" + config: dict[str, dict] = None def __init__(self): self.plugins: dict[str, Plugin] = {} self.log = get_logger() async def load_config(self, init=True): + """Loads the configuration + + if `init` is true, also initializes the plugins""" try: - self.config = json.loads( - open(os.path.expanduser(CONFIG_FILE), encoding="utf-8").read() - ) + with open(os.path.expanduser(CONFIG_FILE), encoding="utf-8") as f: + self.config = json.loads(f.read()) except FileNotFoundError as e: self.log.critical( "No config file found, create one at ~/.config/hypr/pyprland.json with a valid pyprland.plugins list" ) - raise PyprError() + raise PyprError() from e for name in self.config["pyprland"]["plugins"]: if name not in self.plugins: @@ -46,27 +51,31 @@ class Pyprland: await plug.init() self.plugins[name] = plug except Exception as e: - self.log.error(f"Error loading plugin {name}:", exc_info=True) - raise PyprError() + self.log.error("Error loading plugin %s:", name, exc_info=True) + raise PyprError() from e if init: try: await self.plugins[name].load_config(self.config) except PyprError: raise except Exception as e: - self.log.error(f"Error initializing plugin {name}:", exc_info=True) - raise PyprError() + self.log.error("Error initializing plugin %s:", name, exc_info=True) + raise PyprError() from e async def _callHandler(self, full_name, *params): + "Call an event handler with params" for plugin in [self] + list(self.plugins.values()): if hasattr(plugin, full_name): try: await getattr(plugin, full_name)(*params) - except Exception as e: - self.log.warn(f"{plugin.name}::{full_name}({params}) failed:") + except Exception as e: # pylint: disable=W0718 + self.log.warning( + "%s::%s(%s) failed:", plugin.name, full_name, params + ) self.log.exception(e) async def read_events_loop(self): + "Consumes the event loop and calls corresponding handlers" while not self.stopped: data = (await self.event_reader.readline()).decode() if not data: @@ -75,10 +84,11 @@ class Pyprland: cmd, params = data.split(">>") full_name = f"event_{cmd}" - self.log.debug(f"EVT {full_name}({params.strip()})") + self.log.debug("EVT %s(%s)", full_name, params.strip()) await self._callHandler(full_name, params) async def read_command(self, reader, writer) -> None: + "Receives a socket command" data = (await reader.readline()).decode() if not data: self.log.critical("Server starved") @@ -102,11 +112,12 @@ class Pyprland: # run mako for notifications & uncomment this # os.system(f"notify-send '{data}'") - self.log.debug(f"CMD: {full_name}({args})") + self.log.debug("CMD: %s(%s)", full_name, args) await self._callHandler(full_name, *args) async def serve(self): + "Runs the server" try: async with self.server: await self.server.serve_forever() @@ -114,6 +125,7 @@ class Pyprland: await asyncio.gather(*(plugin.exit() for plugin in self.plugins.values())) async def run(self): + "Runs the server and the event listener" await asyncio.gather( asyncio.create_task(self.serve()), asyncio.create_task(self.read_events_loop()), @@ -123,6 +135,7 @@ class Pyprland: async def run_daemon(): + "Runs the server / daemon" manager = Pyprland() err_count = itertools.count() manager.server = await asyncio.start_unix_server(manager.read_command, CONTROL) @@ -131,14 +144,13 @@ async def run_daemon(): attempt = next(err_count) try: events_reader, events_writer = await get_event_stream() - except Exception as e: + except Exception as e: # pylint: disable=W0718 if attempt > max_retry: - manager.log.critical(f"Failed to open hyprland event stream: {e}.") - raise PyprError() - else: - manager.log.warn( - f"Failed to get event stream: {e}, retry {attempt}/{max_retry}..." - ) + manager.log.critical("Failed to open hyprland event stream: %s.", e) + raise PyprError() from e + manager.log.warning( + "Failed to get event stream: %s}, retry %s/%s...", e, attempt, max_retry + ) await asyncio.sleep(1) else: break @@ -147,11 +159,11 @@ async def run_daemon(): try: await manager.load_config() # ensure sockets are connected first - except PyprError: - raise SystemExit(1) + except PyprError as e: + raise SystemExit(1) from e except Exception as e: - manager.log.critical(f"Failed to load config.") - raise SystemExit(1) + manager.log.critical("Failed to load config.") + raise SystemExit(1) from e try: await manager.run() @@ -167,6 +179,7 @@ async def run_daemon(): async def run_client(): + "Runs the client (CLI)" manager = Pyprland() if sys.argv[1] in ("--help", "-h", "help"): await manager.load_config(init=False) @@ -192,17 +205,18 @@ Commands: try: _, writer = await asyncio.open_unix_connection(CONTROL) - except FileNotFoundError: + except FileNotFoundError as e: manager.log.critical("Failed to open control socket, is pypr daemon running ?") - raise PyprError() - else: - writer.write((" ".join(sys.argv[1:])).encode()) - await writer.drain() - writer.close() - await writer.wait_closed() + raise PyprError() from e + + writer.write((" ".join(sys.argv[1:])).encode()) + await writer.drain() + writer.close() + await writer.wait_closed() def main(): + "runs the command" if "--debug" in sys.argv: i = sys.argv.index("--debug") init_logger(filename=sys.argv[i + 1], force_debug=True) @@ -215,11 +229,11 @@ def main(): asyncio.run(run_daemon() if len(sys.argv) <= 1 else run_client()) except KeyboardInterrupt: pass - except PyprError as e: - log.critical(f"Command failed.") + except PyprError: + log.critical("Command failed.") except json.decoder.JSONDecodeError as e: - log.critical(f"Invalid JSON syntax in the config file: {e.args[0]}") - except Exception as e: + log.critical("Invalid JSON syntax in the config file: %s", e.args[0]) + except Exception: # pylint: disable=W0718 log.critical("Unhandled exception:", exc_info=True) diff --git a/pyprland/common.py b/pyprland/common.py index 319ae6e..af2546e 100644 --- a/pyprland/common.py +++ b/pyprland/common.py @@ -1,3 +1,4 @@ +""" Shared utilities: logging """ import os import logging @@ -7,19 +8,23 @@ DEBUG = os.environ.get("DEBUG", False) class PyprError(Exception): - pass + """Used for errors which already triggered logging""" class LogObjects: + """Reusable objects for loggers""" + handlers: list[logging.Handler] = [] def init_logger(filename=None, force_debug=False): + """initializes the logging system""" global DEBUG if force_debug: DEBUG = True class ScreenLogFormatter(logging.Formatter): + "A custom formatter, adding colors" LOG_FORMAT = ( r"%(levelname)s:%(name)s - %(message)s // %(filename)s:%(lineno)d" if DEBUG @@ -53,6 +58,7 @@ def init_logger(filename=None, force_debug=False): def get_logger(name="pypr", level=None): + "Returns a logger for `name`" logger = logging.getLogger(name) if level is None: logger.setLevel(logging.DEBUG if DEBUG else logging.INFO) @@ -61,5 +67,5 @@ def get_logger(name="pypr", level=None): logger.propagate = False for handler in LogObjects.handlers: logger.addHandler(handler) - logger.debug(f"Logger initialized for {name}") + logger.debug("Logger initialized for %s", name) return logger diff --git a/pyprland/ipc.py b/pyprland/ipc.py index 46eed0a..78ef791 100644 --- a/pyprland/ipc.py +++ b/pyprland/ipc.py @@ -1,4 +1,5 @@ #!/bin/env python +""" Interact with hyprland using sockets """ import asyncio from logging import Logger from typing import Any @@ -14,17 +15,18 @@ EVENTS = f'/tmp/hypr/{ os.environ["HYPRLAND_INSTANCE_SIGNATURE"] }/.socket2.sock async def get_event_stream(): + "Returns a new event socket connection" return await asyncio.open_unix_connection(EVENTS) async def hyprctlJSON(command) -> list[dict[str, Any]] | dict[str, Any]: """Run an IPC command and return the JSON output.""" - log.debug(f"JS>> {command}") + log.debug("JS>> %s", command) try: ctl_reader, ctl_writer = await asyncio.open_unix_connection(HYPRCTL) - except FileNotFoundError: + except FileNotFoundError as e: log.critical("hyprctl socket not found! is it running ?") - raise PyprError() + raise PyprError() from e ctl_writer.write(f"-j/{command}".encode()) await ctl_writer.drain() resp = await ctl_reader.read() @@ -36,6 +38,7 @@ async def hyprctlJSON(command) -> list[dict[str, Any]] | dict[str, Any]: def _format_command(command_list, default_base_command): + "helper function to format BATCH commands" for command in command_list: if isinstance(command, str): yield f"{default_base_command} {command}" @@ -45,12 +48,12 @@ def _format_command(command_list, default_base_command): async def hyprctl(command, base_command="dispatch") -> bool: """Run an IPC command. Returns success value.""" - log.debug(f"JS>> {command}") + log.debug("JS>> %s", command) try: ctl_reader, ctl_writer = await asyncio.open_unix_connection(HYPRCTL) - except FileNotFoundError: + except FileNotFoundError as e: log.critical("hyprctl socket not found! is it running ?") - raise PyprError() + raise PyprError() from e if isinstance(command, list): ctl_writer.write( @@ -62,21 +65,23 @@ async def hyprctl(command, base_command="dispatch") -> bool: resp = await ctl_reader.read(100) ctl_writer.close() await ctl_writer.wait_closed() - log.debug(f"< dict[str, Any]: + "Returns focused monitor data" for monitor in await hyprctlJSON("monitors"): assert isinstance(monitor, dict) - if monitor.get("focused") == True: + if monitor.get("focused"): return monitor raise RuntimeError("no focused monitor") def init(): + "initialize logging" global log log = get_logger("ipc") diff --git a/pyprland/plugins/experimental.py b/pyprland/plugins/experimental.py index 02d6f39..b7cd2ab 100644 --- a/pyprland/plugins/experimental.py +++ b/pyprland/plugins/experimental.py @@ -1,7 +1,8 @@ +" Plugin template " from .interface import Plugin -from ..ipc import hyprctlJSON, hyprctl +# from ..ipc import hyprctlJSON, hyprctl class Extension(Plugin): - pass + "Sample plugin template" diff --git a/pyprland/plugins/expose.py b/pyprland/plugins/expose.py index 52897a7..b4c086f 100644 --- a/pyprland/plugins/expose.py +++ b/pyprland/plugins/expose.py @@ -5,8 +5,7 @@ from ..ipc import hyprctlJSON, hyprctl class Extension(Plugin): - async def init(self) -> None: - self.exposed = False + exposed = False async def run_toggle_minimized(self, special_workspace="minimized"): """[name] Toggles switching the focused window to the special workspace "name" (default: minimized)""" @@ -30,7 +29,7 @@ class Extension(Plugin): else: return [c for c in self.exposed if c["workspace"]["id"] > 0] - async def run_expose(self, arg=""): + async def run_expose(self): """Expose every client on the active workspace. If expose is active restores everything and move to the focused window""" if self.exposed: aw: dict[str, Any] = await hyprctlJSON("activewindow") diff --git a/pyprland/plugins/interface.py b/pyprland/plugins/interface.py index e7e1d60..b48dc19 100644 --- a/pyprland/plugins/interface.py +++ b/pyprland/plugins/interface.py @@ -1,19 +1,25 @@ +" Common plugin interface " from typing import Any from ..common import get_logger class Plugin: + "Base plugin class, handles logger and config" + def __init__(self, name: str): + "create a new plugin `name` and the matching logger" self.name = name self.log = get_logger(name) + self.config: dict[str, Any] = {} async def init(self): - pass + "empty init function" async def exit(self): - return + "empty exit function" async def load_config(self, config: dict[str, Any]): + "Loads the configuration section from the passed `config`" try: self.config = config[self.name] except KeyError: diff --git a/pyprland/plugins/lost_windows.py b/pyprland/plugins/lost_windows.py index 3f115f3..d6c5a54 100644 --- a/pyprland/plugins/lost_windows.py +++ b/pyprland/plugins/lost_windows.py @@ -18,7 +18,7 @@ def contains(monitor, window): class Extension(Plugin): - async def run_attract_lost(self, *args): + async def run_attract_lost(self): """Brings lost floating windows to the current workspace""" monitors = await hyprctlJSON("monitors") windows = await hyprctlJSON("clients") diff --git a/pyprland/plugins/magnify.py b/pyprland/plugins/magnify.py index f8052aa..2589614 100644 --- a/pyprland/plugins/magnify.py +++ b/pyprland/plugins/magnify.py @@ -1,11 +1,10 @@ from .interface import Plugin -from ..ipc import hyprctlJSON, hyprctl +from ..ipc import hyprctl class Extension(Plugin): - async def init(self): - self.zoomed = False + zoomed = False async def run_zoom(self, *args): """[factor] zooms to "factor" or toggles zoom level ommited""" diff --git a/pyprland/plugins/monitors.py b/pyprland/plugins/monitors.py index f23a84f..fa6cc86 100644 --- a/pyprland/plugins/monitors.py +++ b/pyprland/plugins/monitors.py @@ -1,11 +1,13 @@ +" The monitors plugin " +import subprocess from typing import Any from .interface import Plugin -import subprocess from ..ipc import hyprctlJSON def configure_monitors(monitors, screenid: str, x: int, y: int) -> None: + "Apply the configuration change" x_offset = -x if x < 0 else 0 y_offset = -y if y < 0 else 0 @@ -33,18 +35,19 @@ def configure_monitors(monitors, screenid: str, x: int, y: int) -> None: subprocess.call(command) -class Extension(Plugin): +class Extension(Plugin): # pylint: disable=missing-class-docstring async def load_config(self, config) -> None: await super().load_config(config) monitors = await hyprctlJSON("monitors") for monitor in monitors: await self.event_monitoradded( - monitor["name"], noDefault=True, monitors=monitors + monitor["name"], no_default=True, monitors=monitors ) async def event_monitoradded( - self, screenid, noDefault=False, monitors: list | None = None + self, screenid, no_default=False, monitors: list | None = None ) -> None: + "Triggers when a monitor is plugged" screenid = screenid.strip() if not monitors: @@ -55,7 +58,7 @@ class Extension(Plugin): mon_name = mon["description"] break else: - self.log.info(f"Monitor {screenid} not found") + self.log.info("Monitor %s not found", screenid) return mon_by_name = {m["name"]: m for m in monitors} @@ -83,7 +86,7 @@ class Extension(Plugin): configure_monitors(monitors, screenid, x, y) return - if not noDefault: + if not no_default: default_command = self.config.get("unknown") if default_command: subprocess.call(default_command, shell=True) diff --git a/pyprland/plugins/scratchpads.py b/pyprland/plugins/scratchpads.py index 3022d58..c93f689 100644 --- a/pyprland/plugins/scratchpads.py +++ b/pyprland/plugins/scratchpads.py @@ -1,12 +1,14 @@ +" Scratchpads addon " +import os +import asyncio import subprocess from typing import Any -import asyncio + from ..ipc import ( hyprctl, hyprctlJSON, get_focused_monitor_props, ) -import os from .interface import Plugin @@ -14,6 +16,7 @@ DEFAULT_MARGIN = 60 async def get_client_props_by_address(addr: str): + "Returns client properties given its address" for client in await hyprctlJSON("clients"): assert isinstance(client, dict) if client.get("address") == addr: @@ -21,8 +24,11 @@ async def get_client_props_by_address(addr: str): class Animations: + "Animation store" + @classmethod async def fromtop(cls, monitor, client, client_uid, margin): + "Slide from/to top" scale = float(monitor["scale"]) mon_x = monitor["x"] mon_y = monitor["y"] @@ -35,6 +41,7 @@ class Animations: @classmethod async def frombottom(cls, monitor, client, client_uid, margin): + "Slide from/to bottom" scale = float(monitor["scale"]) mon_x = monitor["x"] mon_y = monitor["y"] @@ -50,6 +57,7 @@ class Animations: @classmethod async def fromleft(cls, monitor, client, client_uid, margin): + "Slide from/to left" scale = float(monitor["scale"]) mon_x = monitor["x"] mon_y = monitor["y"] @@ -62,6 +70,7 @@ class Animations: @classmethod async def fromright(cls, monitor, client, client_uid, margin): + "Slide from/to right" scale = float(monitor["scale"]) mon_x = monitor["x"] mon_y = monitor["y"] @@ -77,54 +86,62 @@ class Animations: class Scratch: + "A scratchpad state including configuration & client state" + def __init__(self, uid, opts): self.uid = uid self.pid = 0 self.conf = opts self.visible = False self.just_created = True - self.clientInfo = {} + self.client_info = {} def isAlive(self) -> bool: + "is the process running ?" path = f"/proc/{self.pid}" if os.path.exists(path): - for line in open(os.path.join(path, "status"), "r").readlines(): - if line.startswith("State"): - state = line.split()[1] - return state in "RSDTt" # not "Z (zombie)"or "X (dead)" + with open(os.path.join(path, "status"), "r", encoding="utf-8") as f: + for line in f.readlines(): + if line.startswith("State"): + state = line.split()[1] + return state in "RSDTt" # not "Z (zombie)"or "X (dead)" return False def reset(self, pid: int) -> None: + "clear the object" self.pid = pid self.visible = False self.just_created = True - self.clientInfo = {} + self.client_info = {} @property def address(self) -> str: - return str(self.clientInfo.get("address", ""))[2:] + "Returns the client address" + return str(self.client_info.get("address", ""))[2:] async def updateClientInfo(self, clientInfo=None) -> None: + "update the internal client info property, if not provided, refresh based on the current address" if clientInfo is None: clientInfo = await get_client_props_by_address("0x" + self.address) assert isinstance(clientInfo, dict) - self.clientInfo.update(clientInfo) + self.client_info.update(clientInfo) def __str__(self): - return f"{self.uid} {self.address} : {self.clientInfo} / {self.conf}" + return f"{self.uid} {self.address} : {self.client_info} / {self.conf}" class Extension(Plugin): - async def init(self) -> None: - self.procs: dict[str, subprocess.Popen] = {} - self.scratches: dict[str, Scratch] = {} - self.transitioning_scratches: set[str] = set() - self._respawned_scratches: set[str] = set() - self.scratches_by_address: dict[str, Scratch] = {} - self.scratches_by_pid: dict[int, Scratch] = {} - self.focused_window_tracking = dict() + procs: dict[str, subprocess.Popen] = {} + scratches: dict[str, Scratch] = {} + transitioning_scratches: set[str] = set() + _respawned_scratches: set[str] = set() + scratches_by_address: dict[str, Scratch] = {} + scratches_by_pid: dict[int, Scratch] = {} + focused_window_tracking: dict[str, dict] = {} async def exit(self) -> None: + "exit hook" + async def die_in_piece(scratch: Scratch): proc = self.procs[scratch.uid] proc.terminate() @@ -141,6 +158,7 @@ class Extension(Plugin): ) async def load_config(self, config) -> None: + "config loader" config: dict[str, dict[str, Any]] = config["scratchpads"] scratches = {k: Scratch(k, v) for k, v in config.items()} @@ -159,6 +177,7 @@ class Extension(Plugin): await self.start_scratch_command(name) async def start_scratch_command(self, name: str) -> None: + "spawns a given scratchpad's process" self._respawned_scratches.add(name) scratch = self.scratches[name] old_pid = self.procs[name].pid if name in self.procs else 0 @@ -177,6 +196,7 @@ class Extension(Plugin): # Events async def event_activewindowv2(self, addr) -> None: + "active windows hook" addr = addr.strip() scratch = self.scratches_by_address.get(addr) if scratch: @@ -185,7 +205,7 @@ class Extension(Plugin): scratch.just_created = False else: for uid, scratch in self.scratches.items(): - if scratch.clientInfo and scratch.address != addr: + if scratch.client_info and scratch.address != addr: if ( scratch.visible and scratch.conf.get("unfocus") == "hide" @@ -194,7 +214,8 @@ class Extension(Plugin): await self.run_hide(uid, autohide=True) async def event_openwindow(self, params) -> None: - addr, wrkspc, kls, title = params.split(",", 3) + "open windows hook" + addr, wrkspc, _kls, _title = params.split(",", 3) if wrkspc.startswith("special"): item = self.scratches_by_address.get(addr) if not item and self._respawned_scratches: @@ -213,7 +234,7 @@ class Extension(Plugin): self.scratches_by_address[ client["address"][2:] ] = pending_scratch - self.log.debug(f"client class found: {client}") + self.log.debug("client class found: %s", client) await pending_scratch.updateClientInfo(client) else: await self.updateScratchInfo() @@ -228,7 +249,7 @@ class Extension(Plugin): uid = uid.strip() item = self.scratches.get(uid) if not item: - self.log.warn(f"{uid} is not configured") + self.log.warning("%s is not configured", uid) return if item.visible: await self.run_hide(uid) @@ -236,6 +257,8 @@ class Extension(Plugin): await self.run_show(uid) async def updateScratchInfo(self, scratch: Scratch | None = None) -> None: + """Update every scratchpads information if no `scratch` given, + else update a specific scratchpad info""" if scratch is None: for client in await hyprctlJSON("clients"): assert isinstance(client, dict) @@ -247,34 +270,34 @@ class Extension(Plugin): if scratch: await scratch.updateClientInfo(client) else: - add_to_address_book = ("address" not in scratch.clientInfo) or ( + add_to_address_book = ("address" not in scratch.client_info) or ( scratch.address not in self.scratches_by_address ) await scratch.updateClientInfo() if add_to_address_book: - self.scratches_by_address[scratch.clientInfo["address"][2:]] = scratch + self.scratches_by_address[scratch.client_info["address"][2:]] = scratch async def run_hide(self, uid: str, force=False, autohide=False) -> None: """ hides scratchpad "name" """ uid = uid.strip() item = self.scratches.get(uid) if not item: - self.log.warn(f"{uid} is not configured") + self.log.warning("%s is not configured", uid) return if not item.visible and not force: - self.log.warn(f"{uid} is already hidden") + self.log.warning("%s is already hidden", uid) return - self.log.info(f"Hiding {uid}") + self.log.info("Hiding %s", uid) item.visible = False addr = "address:0x" + item.address animation_type: str = item.conf.get("animation", "").lower() if animation_type: offset = item.conf.get("offset") if offset is None: - if "size" not in item.clientInfo: + if "size" not in item.client_info: await self.updateScratchInfo(item) - offset = int(1.3 * item.clientInfo["size"][1]) + offset = int(1.3 * item.client_info["size"][1]) if animation_type == "fromtop": await hyprctl(f"movewindowpixel 0 -{offset},{addr}") @@ -309,17 +332,17 @@ class Extension(Plugin): self.focused_window_tracking[uid] = await hyprctlJSON("activewindow") if not item: - self.log.warn(f"{uid} is not configured") + self.log.warning("%s is not configured", uid) return if item.visible and not force: - self.log.warn(f"{uid} is already visible") + self.log.warning("%s is already visible", uid) return - self.log.info(f"Showing {uid}") + self.log.info("Showing %s", uid) if not item.isAlive(): - self.log.info(f"{uid} is not running, restarting...") + self.log.info("%s is not running, restarting...", uid) if uid in self.procs: self.procs[uid].kill() if item.pid in self.scratches_by_pid: @@ -348,7 +371,7 @@ class Extension(Plugin): if animation_type: margin = item.conf.get("margin", DEFAULT_MARGIN) fn = getattr(Animations, animation_type) - await fn(monitor, item.clientInfo, addr, margin) + await fn(monitor, item.client_info, addr, margin) await hyprctl(f"focuswindow {addr}") await asyncio.sleep(0.2) # ensure some time for events to propagate diff --git a/pyprland/plugins/shift_monitors.py b/pyprland/plugins/shift_monitors.py index f047da3..8205bf3 100644 --- a/pyprland/plugins/shift_monitors.py +++ b/pyprland/plugins/shift_monitors.py @@ -4,6 +4,8 @@ from ..ipc import hyprctlJSON, hyprctl class Extension(Plugin): + monitors: list[str] = [] + async def init(self): self.monitors = [mon["name"] for mon in await hyprctlJSON("monitors")] diff --git a/pyprland/plugins/workspaces_follow_focus.py b/pyprland/plugins/workspaces_follow_focus.py index d9f840a..7435ce9 100644 --- a/pyprland/plugins/workspaces_follow_focus.py +++ b/pyprland/plugins/workspaces_follow_focus.py @@ -1,15 +1,19 @@ -import asyncio +""" Force workspaces to follow the focus / mouse """ from .interface import Plugin from ..ipc import hyprctlJSON, hyprctl -class Extension(Plugin): +class Extension(Plugin): # pylint: disable=missing-class-docstring + workspace_list: list[int] = [] + async def load_config(self, config): + "loads the config" await super().load_config(config) self.workspace_list = list(range(1, self.config.get("max_workspaces", 10) + 1)) async def event_focusedmon(self, screenid_index): + "reacts to monitor changes" monitor_id, workspace_id = screenid_index.split(",") workspace_id = int(workspace_id) # move every free workspace to the currently focused desktop