From cc023d28acb076f47b9824862fd1d43c8ff1e9c2 Mon Sep 17 00:00:00 2001 From: fdev31 Date: Wed, 25 Oct 2023 21:52:00 +0200 Subject: [PATCH] Code cleanup + re-introduce non-lazy --- pyprland/plugins/scratchpads.py | 290 ++++++++++++++++++-------------- 1 file changed, 160 insertions(+), 130 deletions(-) diff --git a/pyprland/plugins/scratchpads.py b/pyprland/plugins/scratchpads.py index 74e56f0..1024fbf 100644 --- a/pyprland/plugins/scratchpads.py +++ b/pyprland/plugins/scratchpads.py @@ -11,6 +11,39 @@ from .interface import Plugin DEFAULT_MARGIN = 60 +# Helper functions {{{ + + +def convert_coords(logger, coords, monitor): + """ + Converts a string like "X Y" to coordinates relative to monitor + Supported formats for X, Y: + - Percentage: "V%". V in [0; 100] + + Example: + "10% 20%", monitor 800x600 => 80, 120 + """ + + assert coords, "coords must be non null" + + def convert(s, dim): + if s[-1] == "%": + p = int(s[:-1]) + if p < 0 or p > 100: + raise Exception(f"Percentage must be in range [0; 100], got {p}") + scale = float(monitor["scale"]) + return int(monitor[dim] / scale * p / 100) + else: + raise Exception(f"Unsupported format for dimension {dim} size, got {s}") + + try: + x_str, y_str = coords.split() + + return convert(x_str, "width"), convert(y_str, "height") + except Exception as e: + logger.error(f"Failed to read coordinates: {e}") + raise e + async def get_client_props(addr: str | None = None, pid: int | None = None): "Returns client properties given its address" @@ -27,7 +60,10 @@ async def get_client_props(addr: str | None = None, pid: int | None = None): return client -class Animations: +# }}} + + +class Animations: # {{{ "Animation store" @staticmethod @@ -89,7 +125,10 @@ class Animations: ) -class Scratch: +# }}} + + +class Scratch: # {{{ "A scratchpad state including configuration & client state" log = logging.getLogger("scratch") @@ -99,6 +138,7 @@ class Scratch: self.conf = opts self.visible = False self.client_info = {} + self.should_hide = False def isAlive(self) -> bool: "is the process running ?" @@ -140,7 +180,10 @@ class Scratch: return f"{self.uid} {self.address} : {self.client_info} / {self.conf}" -class Extension(Plugin): # pylint: disable=missing-class-docstring +# }}} + + +class Extension(Plugin): # pylint: disable=missing-class-docstring {{{ procs: dict[str, subprocess.Popen] = {} scratches: dict[str, Scratch] = {} transitioning_scratches: set[str] = set() @@ -172,12 +215,50 @@ class Extension(Plugin): # pylint: disable=missing-class-docstring my_config: dict[str, dict[str, Any]] = config["scratchpads"] scratches = {k: Scratch(k, v) for k, v in my_config.items()} + scratches_to_spawn = set() for name in scratches: if name not in self.scratches: self.scratches[name] = scratches[name] + is_lazy = scratches[name].conf.get("lazy", False) + if not is_lazy: + scratches_to_spawn.add(name) else: self.scratches[name].conf = scratches[name].conf + self.log.info(scratches_to_spawn) + for name in scratches_to_spawn: + await self.ensure_alive(name) + self.scratches[name].should_hide = True + + async def ensure_alive(self, uid, item=None): + if item is None: + item = self.scratches.get(uid) + + if not item.isAlive(): + self.log.info("%s is not running, restarting...", uid) + if uid in self.procs: + self.procs[uid].kill() + if item.pid in self.scratches_by_pid: + del self.scratches_by_pid[item.pid] + if item.address in self.scratches_by_address: + del self.scratches_by_address[item.address] + self.log.info(f"starting {uid}") + await self.start_scratch_command(uid) + self.log.info(f"==> Wait for {uid} spawning") + loop_count = count() + while next(loop_count) < 10: + await asyncio.sleep(0.1) + info = await get_client_props(pid=item.pid) + if info: + self.log.info(f"=> {uid} info received on time") + await item.updateClientInfo(info) + self._respawned_scratches.discard(uid) + await hyprctl( + f"movetoworkspacesilent special:scratch_{uid},address:0x{item.address}" + ) + break + self.log.info(f"=> spawned {uid} as proc {item.pid}") + async def start_scratch_command(self, name: str) -> None: "spawns a given scratchpad's process" self._respawned_scratches.add(name) @@ -199,7 +280,25 @@ class Extension(Plugin): # pylint: disable=missing-class-docstring if old_pid and old_pid in self.scratches_by_pid: del self.scratches_by_pid[old_pid] - # Events + async def updateScratchInfo(self, orig_scratch: Scratch | None = None) -> None: + """Update every scratchpads information if no `scratch` given, + else update a specific scratchpad info""" + pid = orig_scratch.pid if orig_scratch else None + for client in await hyprctlJSON("clients"): + assert isinstance(client, dict) + if pid and pid != client["pid"]: + continue + scratch = self.scratches_by_address.get(client["address"][2:]) + if not scratch: + scratch = self.scratches_by_pid.get(client["pid"]) + if scratch: + self.scratches_by_address[client["address"][2:]] = scratch + await scratch.updateClientInfo(client) + break + else: + self.log.info("Didn't update scratch info %s" % self) + + # Events {{{ async def event_activewindowv2(self, addr) -> None: "active windows hook" addr = addr.strip() @@ -246,7 +345,11 @@ class Extension(Plugin): # pylint: disable=missing-class-docstring self.log.info("Updating Scratch info") await self.updateScratchInfo() item = self.scratches_by_address.get(addr) + if item and item.should_hide: + await self.run_hide(item.uid, force=True) + # }}} + # Commands {{{ async def run_toggle(self, uid: str) -> None: """ toggles visibility of scratchpad "name" """ uid = uid.strip() @@ -283,23 +386,57 @@ class Extension(Plugin): # pylint: disable=missing-class-docstring return # abort sequence await asyncio.sleep(0.2) # await for animation to finish - async def updateScratchInfo(self, orig_scratch: Scratch | None = None) -> None: - """Update every scratchpads information if no `scratch` given, - else update a specific scratchpad info""" - pid = orig_scratch.pid if orig_scratch else None - for client in await hyprctlJSON("clients"): - assert isinstance(client, dict) - if pid and pid != client["pid"]: - continue - scratch = self.scratches_by_address.get(client["address"][2:]) - if not scratch: - scratch = self.scratches_by_pid.get(client["pid"]) - if scratch: - self.scratches_by_address[client["address"][2:]] = scratch - await scratch.updateClientInfo(client) - break - else: - self.log.info("Didn't update scratch info %s" % self) + async def run_show(self, uid) -> None: + """ shows scratchpad "name" """ + uid = uid.strip() + item = self.scratches.get(uid) + + self.focused_window_tracking[uid] = cast( + dict[str, Any], await hyprctlJSON("activewindow") + ) + + if not item: + self.log.warning("%s is not configured", uid) + return + + self.log.info("Showing %s", uid) + await self.ensure_alive(uid, item) + + item.visible = True + monitor = await get_focused_monitor_props() + assert monitor + + assert item.address, "No address !" + + addr = "address:0x" + item.address + + animation_type = item.conf.get("animation", "").lower() + + wrkspc = monitor["activeWorkspace"]["id"] + + self.transitioning_scratches.add(uid) + await hyprctl(f"moveworkspacetomonitor special:scratch_{uid} {monitor['name']}") + await hyprctl(f"movetoworkspacesilent {wrkspc},{addr}") + if animation_type: + margin = item.conf.get("margin", DEFAULT_MARGIN) + fn = getattr(Animations, animation_type) + await fn(monitor, item.client_info, addr, margin) + + await hyprctl(f"focuswindow {addr}") + + size = item.conf.get("size") + if size: + x_size, y_size = convert_coords(self.log, size, monitor) + await hyprctl(f"resizewindowpixel exact {x_size} {y_size},{addr}") + + position = item.conf.get("position") + if position: + x_pos, y_pos = convert_coords(self.log, position, monitor) + x_pos_abs, y_pos_abs = x_pos + monitor["x"], y_pos + monitor["y"] + await hyprctl(f"movewindowpixel exact {x_pos_abs} {y_pos_abs},{addr}") + + await asyncio.sleep(0.2) # ensure some time for events to propagate + self.transitioning_scratches.discard(uid) async def run_hide(self, uid: str, force=False, autohide=False) -> None: """ hides scratchpad "name" """ @@ -330,114 +467,7 @@ class Extension(Plugin): # pylint: disable=missing-class-docstring ) del self.focused_window_tracking[uid] - async def ensure_alive(self, uid, item=None): - if item is None: - item = self.scratches.get(uid) + # }}} - if not item.isAlive(): - self.log.info("%s is not running, restarting...", uid) - if uid in self.procs: - self.procs[uid].kill() - if item.pid in self.scratches_by_pid: - del self.scratches_by_pid[item.pid] - if item.address in self.scratches_by_address: - del self.scratches_by_address[item.address] - self.log.info(f"starting {uid}") - await self.start_scratch_command(uid) - self.log.info(f"{uid} started") - self.log.info("==> Wait for spawning") - loop_count = count() - while next(loop_count) < 10: - await asyncio.sleep(0.1) - info = await get_client_props(pid=item.pid) - if info: - item.updateClientInfo(info) - break - self.log.info(f"=> spawned {uid} as proc {item.pid}") - await hyprctl(f"movewindowpixel exact {0} {-100},{item.address}") - # await hyprctl(f"movetoworkspacesilent special:scratch_{uid},{item.address}") - - async def run_show(self, uid) -> None: - """ shows scratchpad "name" """ - uid = uid.strip() - item = self.scratches.get(uid) - - self.focused_window_tracking[uid] = cast( - dict[str, Any], await hyprctlJSON("activewindow") - ) - - if not item: - self.log.warning("%s is not configured", uid) - return - - self.log.info("Showing %s", uid) - await self.ensure_alive(uid, item) - - item.visible = True - monitor = await get_focused_monitor_props() - assert monitor - - await self.updateScratchInfo(item) - - assert item.address, "No address !" - - addr = "address:0x" + item.address - - animation_type = item.conf.get("animation", "").lower() - - wrkspc = monitor["activeWorkspace"]["id"] - - self.transitioning_scratches.add(uid) - await hyprctl(f"moveworkspacetomonitor special:scratch_{uid} {monitor['name']}") - await hyprctl(f"movetoworkspacesilent {wrkspc},{addr}") - if animation_type: - margin = item.conf.get("margin", DEFAULT_MARGIN) - fn = getattr(Animations, animation_type) - await fn(monitor, item.client_info, addr, margin) - - await hyprctl(f"focuswindow {addr}") - - size = item.conf.get("size") - if size: - x_size, y_size = self._convert_coords(size, monitor) - await hyprctl(f"resizewindowpixel exact {x_size} {y_size},{addr}") - - position = item.conf.get("position") - if position: - x_pos, y_pos = self._convert_coords(position, monitor) - x_pos_abs, y_pos_abs = x_pos + monitor["x"], y_pos + monitor["y"] - await hyprctl(f"movewindowpixel exact {x_pos_abs} {y_pos_abs},{addr}") - - await asyncio.sleep(0.2) # ensure some time for events to propagate - self.transitioning_scratches.discard(uid) - - def _convert_coords(self, coords, monitor): - """ - Converts a string like "X Y" to coordinates relative to monitor - Supported formats for X, Y: - - Percentage: "V%". V in [0; 100] - - Example: - "10% 20%", monitor 800x600 => 80, 120 - """ - - assert coords, "coords must be non null" - - def convert(s, dim): - if s[-1] == "%": - p = int(s[:-1]) - if p < 0 or p > 100: - raise Exception(f"Percentage must be in range [0; 100], got {p}") - scale = float(monitor["scale"]) - return int(monitor[dim] / scale * p / 100) - else: - raise Exception(f"Unsupported format for dimension {dim} size, got {s}") - - try: - x_str, y_str = coords.split() - - return convert(x_str, "width"), convert(y_str, "height") - except Exception as e: - self.log.error(f"Failed to read coordinates: {e}") - raise e +# }}}