diff --git a/pyprland/plugins/expose.py b/pyprland/plugins/expose.py index 78f8e81..ed01925 100644 --- a/pyprland/plugins/expose.py +++ b/pyprland/plugins/expose.py @@ -42,7 +42,7 @@ class Extension(Plugin): # pylint: disable=missing-class-docstring await hyprctl( f"movetoworkspacesilent {client['workspace']['id']},address:{client['address']}" ) - await hyprctl("togglespecialworkspace exposed") + # await hyprctl("togglespecialworkspace exposed") await hyprctl(f"focuswindow address:{focused_addr}") self.exposed = [] else: diff --git a/pyprland/plugins/scratchpads.py b/pyprland/plugins/scratchpads.py index f3b413d..df1d466 100644 --- a/pyprland/plugins/scratchpads.py +++ b/pyprland/plugins/scratchpads.py @@ -11,59 +11,17 @@ from .interface import Plugin DEFAULT_MARGIN = 60 -# Helper functions {{{ - -def convert_coords(logger, coords, monitor): - """ - Converts a string like "X Y" to coordinates relative to monitor - Supported formats for X, Y: - - Percentage: "V%". V in [0; 100] - - Example: - "10% 20%", monitor 800x600 => 80, 120 - """ - - assert coords, "coords must be non null" - - def convert(s, dim): - if s[-1] == "%": - p = int(s[:-1]) - if p < 0 or p > 100: - raise Exception(f"Percentage must be in range [0; 100], got {p}") - scale = float(monitor["scale"]) - return int(monitor[dim] / scale * p / 100) - else: - raise Exception(f"Unsupported format for dimension {dim} size, got {s}") - - try: - x_str, y_str = coords.split() - - return convert(x_str, "width"), convert(y_str, "height") - except Exception as e: - logger.error(f"Failed to read coordinates: {e}") - raise e - - -async def get_client_props(addr: str | None = None, pid: int | None = None): +async def get_client_props_by_address(addr: str): "Returns client properties given its address" - assert addr or pid - if addr: - assert len(addr) > 2, "Client address is invalid" - if pid: - assert pid, "Client pid is invalid" - prop_name = "address" if addr else "pid" - prop_value = addr if addr else pid + assert len(addr) > 2, "Client address is invalid" for client in await hyprctlJSON("clients"): assert isinstance(client, dict) - if client.get(prop_name) == prop_value: + if client.get("address") == addr: return client -# }}} - - -class Animations: # {{{ +class Animations: "Animation store" @staticmethod @@ -125,10 +83,7 @@ class Animations: # {{{ ) -# }}} - - -class Scratch: # {{{ +class Scratch: "A scratchpad state including configuration & client state" log = logging.getLogger("scratch") @@ -137,33 +92,8 @@ class Scratch: # {{{ self.pid = 0 self.conf = opts self.visible = False + self.just_created = True self.client_info = {} - self.should_hide = False - self.initialized = False - - async def initialize(self): - if self.initialized: - return - self.initialized = True - await self.updateClientInfo() - await hyprctl( - f"movetoworkspacesilent special:scratch_{self.uid},address:0x{self.address}" - ) - - size = self.conf.get("size") - position = self.conf.get("position") - monitor = await get_focused_monitor_props() - if position: - x_pos, y_pos = convert_coords(self.log, position, monitor) - x_pos_abs, y_pos_abs = x_pos + monitor["x"], y_pos + monitor["y"] - await hyprctl( - f"movewindowpixel exact {x_pos_abs} {y_pos_abs},address:0x{self.address}" - ) - if size: - x_size, y_size = convert_coords(self.log, size, monitor) - await hyprctl( - f"resizewindowpixel exact {x_size} {y_size},address:0x{self.address}" - ) def isAlive(self) -> bool: "is the process running ?" @@ -180,8 +110,8 @@ class Scratch: # {{{ "clear the object" self.pid = pid self.visible = False + self.just_created = True self.client_info = {} - self.initialized = False @property def address(self) -> str: @@ -191,7 +121,7 @@ class Scratch: # {{{ async def updateClientInfo(self, client_info=None) -> None: "update the internal client info property, if not provided, refresh based on the current address" if client_info is None: - client_info = await get_client_props(addr="0x" + self.address) + client_info = await get_client_props_by_address("0x" + self.address) try: assert isinstance(client_info, dict) except AssertionError as e: @@ -206,13 +136,11 @@ class Scratch: # {{{ return f"{self.uid} {self.address} : {self.client_info} / {self.conf}" -# }}} - - -class Extension(Plugin): # pylint: disable=missing-class-docstring {{{ +class Extension(Plugin): # pylint: disable=missing-class-docstring procs: dict[str, subprocess.Popen] = {} scratches: dict[str, Scratch] = {} transitioning_scratches: set[str] = set() + _new_scratches: set[str] = set() _respawned_scratches: set[str] = set() scratches_by_address: dict[str, Scratch] = {} scratches_by_pid: dict[int, Scratch] = {} @@ -241,49 +169,24 @@ class Extension(Plugin): # pylint: disable=missing-class-docstring {{{ my_config: dict[str, dict[str, Any]] = config["scratchpads"] scratches = {k: Scratch(k, v) for k, v in my_config.items()} - scratches_to_spawn = set() + new_scratches = set() + for name in scratches: if name not in self.scratches: self.scratches[name] = scratches[name] - is_lazy = scratches[name].conf.get("lazy", False) - if not is_lazy: - scratches_to_spawn.add(name) + new_scratches.add(name) else: self.scratches[name].conf = scratches[name].conf - self.log.info(scratches_to_spawn) - for name in scratches_to_spawn: - await self.ensure_alive(name) - self.scratches[name].should_hide = True + # not known yet + for name in new_scratches: + if not self.scratches[name].conf.get("lazy", False): + await self.start_scratch_command(name, is_new=True) - async def ensure_alive(self, uid, item=None): - if item is None: - item = self.scratches.get(uid) - - if not item.isAlive(): - self.log.info("%s is not running, restarting...", uid) - if uid in self.procs: - self.procs[uid].kill() - if item.pid in self.scratches_by_pid: - del self.scratches_by_pid[item.pid] - if item.address in self.scratches_by_address: - del self.scratches_by_address[item.address] - self.log.info(f"starting {uid}") - await self.start_scratch_command(uid) - self.log.info(f"==> Wait for {uid} spawning") - loop_count = count() - while next(loop_count) < 10: - await asyncio.sleep(0.1) - info = await get_client_props(pid=item.pid) - if info: - self.log.info(f"=> {uid} info received on time") - await item.updateClientInfo(info) - self._respawned_scratches.discard(uid) - break - self.log.info(f"=> spawned {uid} as proc {item.pid}") - - async def start_scratch_command(self, name: str) -> None: + async def start_scratch_command(self, name: str, is_new=False) -> None: "spawns a given scratchpad's process" + if is_new: + self._new_scratches.add(name) self._respawned_scratches.add(name) scratch = self.scratches[name] old_pid = self.procs[name].pid if name in self.procs else 0 @@ -303,31 +206,19 @@ class Extension(Plugin): # pylint: disable=missing-class-docstring {{{ if old_pid and old_pid in self.scratches_by_pid: del self.scratches_by_pid[old_pid] - async def updateScratchInfo(self, orig_scratch: Scratch | None = None) -> None: - """Update every scratchpads information if no `scratch` given, - else update a specific scratchpad info""" - pid = orig_scratch.pid if orig_scratch else None - for client in await hyprctlJSON("clients"): - assert isinstance(client, dict) - if pid and pid != client["pid"]: - continue - scratch = self.scratches_by_address.get(client["address"][2:]) - if not scratch: - scratch = self.scratches_by_pid.get(client["pid"]) - if scratch: - self.scratches_by_address[client["address"][2:]] = scratch - await scratch.updateClientInfo(client) - break - else: - self.log.info("Didn't update scratch info %s" % self) - - # Events {{{ + # Events async def event_activewindowv2(self, addr) -> None: "active windows hook" addr = addr.strip() scratch = self.scratches_by_address.get(addr) - if not scratch: + if scratch: + if scratch.just_created: + self.log.debug("Hiding just created scratch %s", scratch.uid) + await self.run_hide(scratch.uid, force=True) + scratch.just_created = False + else: for uid, scratch in self.scratches.items(): + self.log.info((scratch.address, addr)) if scratch.client_info and scratch.address != addr: if ( scratch.visible @@ -360,21 +251,20 @@ class Extension(Plugin): # pylint: disable=missing-class-docstring {{{ async def event_openwindow(self, params) -> None: "open windows hook" addr, wrkspc, _kls, _title = params.split(",", 3) - item = self.scratches_by_address.get(addr) if self._respawned_scratches: + item = self.scratches_by_address.get(addr) if not item and self._respawned_scratches: # hack for windows which aren't related to the process (see #8) if not await self._alternative_lookup(): - self.log.info("Updating Scratch info") await self.updateScratchInfo() item = self.scratches_by_address.get(addr) - if item and item.should_hide: + if item and item.just_created: + if item.uid in self._new_scratches: await self.run_hide(item.uid, force=True) - if item: - await item.initialize() + self._new_scratches.discard(item.uid) + self._respawned_scratches.discard(item.uid) + item.just_created = False - # }}} - # Commands {{{ async def run_toggle(self, uid: str) -> None: """ toggles visibility of scratchpad "name" """ uid = uid.strip() @@ -383,7 +273,7 @@ class Extension(Plugin): # pylint: disable=missing-class-docstring {{{ self.log.warning("%s is not configured", uid) return self.log.debug("%s is visible = %s", uid, item.visible) - if item.visible and item.isAlive(): + if item.visible: await self.run_hide(uid) else: await self.run_show(uid) @@ -411,7 +301,76 @@ class Extension(Plugin): # pylint: disable=missing-class-docstring {{{ return # abort sequence await asyncio.sleep(0.2) # await for animation to finish - async def run_show(self, uid) -> None: + async def updateScratchInfo(self, orig_scratch: Scratch | None = None) -> None: + """Update every scratchpads information if no `scratch` given, + else update a specific scratchpad info""" + pid = orig_scratch.pid if orig_scratch else None + for client in await hyprctlJSON("clients"): + assert isinstance(client, dict) + if pid and pid != client["pid"]: + continue + scratch = self.scratches_by_address.get(client["address"][2:]) + if not scratch: + scratch = self.scratches_by_pid.get(client["pid"]) + if scratch: + self.scratches_by_address[client["address"][2:]] = scratch + if scratch: + await scratch.updateClientInfo(client) + + async def run_hide(self, uid: str, force=False, autohide=False) -> None: + """ hides scratchpad "name" """ + uid = uid.strip() + scratch = self.scratches.get(uid) + if not scratch: + self.log.warning("%s is not configured", uid) + return + if not scratch.visible and not force: + self.log.warning("%s is already hidden", uid) + return + scratch.visible = False + if not scratch.isAlive(): + await self.run_show(uid, force=True) + return + self.log.info("Hiding %s", uid) + addr = "address:0x" + scratch.address + animation_type: str = scratch.conf.get("animation", "").lower() + if animation_type: + await self._anim_hide(animation_type, scratch) + + if uid not in self.transitioning_scratches: + await hyprctl(f"movetoworkspacesilent special:scratch_{uid},{addr}") + + if ( + animation_type and uid in self.focused_window_tracking + ): # focus got lost when animating + if not autohide and "address" in self.focused_window_tracking[uid]: + await hyprctl( + f"focuswindow address:{self.focused_window_tracking[uid]['address']}" + ) + del self.focused_window_tracking[uid] + + async def ensure_alive(self, uid, item=None): + if item is None: + item = self.scratches.get(uid) + + if not item.isAlive(): + self.log.info("%s is not running, restarting...", uid) + if uid in self.procs: + self.procs[uid].kill() + if item.pid in self.scratches_by_pid: + del self.scratches_by_pid[item.pid] + if item.address in self.scratches_by_address: + del self.scratches_by_address[item.address] + self.log.info(f"starting {uid}") + await self.start_scratch_command(uid) + self.log.info(f"{uid} started") + self.log.info("==> Wait for spawning") + loop_count = count() + while uid in self._respawned_scratches and next(loop_count) < 10: + await asyncio.sleep(0.05) + self.log.info(f"=> spawned {uid} as proc {item.pid}") + + async def run_show(self, uid, force=False) -> None: """ shows scratchpad "name" """ uid = uid.strip() item = self.scratches.get(uid) @@ -424,15 +383,19 @@ class Extension(Plugin): # pylint: disable=missing-class-docstring {{{ self.log.warning("%s is not configured", uid) return + if item.visible and not force: + self.log.warning("%s is already visible", uid) + return + self.log.info("Showing %s", uid) await self.ensure_alive(uid, item) - await item.updateClientInfo() - await item.initialize() item.visible = True monitor = await get_focused_monitor_props() assert monitor + await self.updateScratchInfo(item) + assert item.address, "No address !" addr = "address:0x" + item.address @@ -451,39 +414,46 @@ class Extension(Plugin): # pylint: disable=missing-class-docstring {{{ await hyprctl(f"focuswindow {addr}") + size = item.conf.get("size") + if size: + x_size, y_size = self._convert_coords(size, monitor) + await hyprctl(f"resizewindowpixel exact {x_size} {y_size},{addr}") + + position = item.conf.get("position") + if position: + x_pos, y_pos = self._convert_coords(position, monitor) + x_pos_abs, y_pos_abs = x_pos + monitor["x"], y_pos + monitor["y"] + await hyprctl(f"movewindowpixel exact {x_pos_abs} {y_pos_abs},{addr}") + await asyncio.sleep(0.2) # ensure some time for events to propagate self.transitioning_scratches.discard(uid) - async def run_hide(self, uid: str, force=False, autohide=False) -> None: - """ hides scratchpad "name" """ - uid = uid.strip() - scratch = self.scratches.get(uid) - if not scratch: - self.log.warning("%s is not configured", uid) - return - if not scratch.visible and not force: - self.log.warning("%s is already hidden", uid) - return - scratch.visible = False - self.log.info("Hiding %s", uid) - addr = "address:0x" + scratch.address - animation_type: str = scratch.conf.get("animation", "").lower() - if animation_type: - await self._anim_hide(animation_type, scratch) + def _convert_coords(self, coords, monitor): + """ + Converts a string like "X Y" to coordinates relative to monitor + Supported formats for X, Y: + - Percentage: "V%". V in [0; 100] - if uid not in self.transitioning_scratches: - await hyprctl(f"movetoworkspacesilent special:scratch_{uid},{addr}") + Example: + "10% 20%", monitor 800x600 => 80, 120 + """ - if ( - animation_type and uid in self.focused_window_tracking - ): # focus got lost when animating - if not autohide and "address" in self.focused_window_tracking[uid]: - await hyprctl( - f"focuswindow address:{self.focused_window_tracking[uid]['address']}" - ) - del self.focused_window_tracking[uid] + assert coords, "coords must be non null" - # }}} + def convert(s, dim): + if s[-1] == "%": + p = int(s[:-1]) + if p < 0 or p > 100: + raise Exception(f"Percentage must be in range [0; 100], got {p}") + scale = float(monitor["scale"]) + return int(monitor[dim] / scale * p / 100) + else: + raise Exception(f"Unsupported format for dimension {dim} size, got {s}") + try: + x_str, y_str = coords.split() -# }}} + return convert(x_str, "width"), convert(y_str, "height") + except Exception as e: + self.log.error(f"Failed to read coordinates: {e}") + raise e