Compare commits

...
Sign in to create a new pull request.

4 commits

Author SHA1 Message Date
fdev31
506782977e fix regression (updating client info)
Allows auto-recentering of resized client windows
2023-10-28 16:01:00 +02:00
fdev31
1219b0c034 refactor initialization code 2023-10-25 23:13:42 +02:00
fdev31
cc023d28ac Code cleanup + re-introduce non-lazy 2023-10-25 21:52:00 +02:00
fdev31
c236ec763f Remove support for non lazy windows 2023-10-25 21:04:18 +02:00

View file

@ -11,17 +11,59 @@ from .interface import Plugin
DEFAULT_MARGIN = 60 DEFAULT_MARGIN = 60
# Helper functions {{{
async def get_client_props_by_address(addr: str):
def convert_coords(logger, coords, monitor):
"""
Converts a string like "X Y" to coordinates relative to monitor
Supported formats for X, Y:
- Percentage: "V%". V in [0; 100]
Example:
"10% 20%", monitor 800x600 => 80, 120
"""
assert coords, "coords must be non null"
def convert(s, dim):
if s[-1] == "%":
p = int(s[:-1])
if p < 0 or p > 100:
raise Exception(f"Percentage must be in range [0; 100], got {p}")
scale = float(monitor["scale"])
return int(monitor[dim] / scale * p / 100)
else:
raise Exception(f"Unsupported format for dimension {dim} size, got {s}")
try:
x_str, y_str = coords.split()
return convert(x_str, "width"), convert(y_str, "height")
except Exception as e:
logger.error(f"Failed to read coordinates: {e}")
raise e
async def get_client_props(addr: str | None = None, pid: int | None = None):
"Returns client properties given its address" "Returns client properties given its address"
assert len(addr) > 2, "Client address is invalid" assert addr or pid
if addr:
assert len(addr) > 2, "Client address is invalid"
if pid:
assert pid, "Client pid is invalid"
prop_name = "address" if addr else "pid"
prop_value = addr if addr else pid
for client in await hyprctlJSON("clients"): for client in await hyprctlJSON("clients"):
assert isinstance(client, dict) assert isinstance(client, dict)
if client.get("address") == addr: if client.get(prop_name) == prop_value:
return client return client
class Animations: # }}}
class Animations: # {{{
"Animation store" "Animation store"
@staticmethod @staticmethod
@ -83,7 +125,10 @@ class Animations:
) )
class Scratch: # }}}
class Scratch: # {{{
"A scratchpad state including configuration & client state" "A scratchpad state including configuration & client state"
log = logging.getLogger("scratch") log = logging.getLogger("scratch")
@ -92,8 +137,33 @@ class Scratch:
self.pid = 0 self.pid = 0
self.conf = opts self.conf = opts
self.visible = False self.visible = False
self.just_created = True
self.client_info = {} self.client_info = {}
self.should_hide = False
self.initialized = False
async def initialize(self):
if self.initialized:
return
self.initialized = True
await self.updateClientInfo()
await hyprctl(
f"movetoworkspacesilent special:scratch_{self.uid},address:0x{self.address}"
)
size = self.conf.get("size")
position = self.conf.get("position")
monitor = await get_focused_monitor_props()
if position:
x_pos, y_pos = convert_coords(self.log, position, monitor)
x_pos_abs, y_pos_abs = x_pos + monitor["x"], y_pos + monitor["y"]
await hyprctl(
f"movewindowpixel exact {x_pos_abs} {y_pos_abs},address:0x{self.address}"
)
if size:
x_size, y_size = convert_coords(self.log, size, monitor)
await hyprctl(
f"resizewindowpixel exact {x_size} {y_size},address:0x{self.address}"
)
def isAlive(self) -> bool: def isAlive(self) -> bool:
"is the process running ?" "is the process running ?"
@ -110,8 +180,8 @@ class Scratch:
"clear the object" "clear the object"
self.pid = pid self.pid = pid
self.visible = False self.visible = False
self.just_created = True
self.client_info = {} self.client_info = {}
self.initialized = False
@property @property
def address(self) -> str: def address(self) -> str:
@ -121,7 +191,7 @@ class Scratch:
async def updateClientInfo(self, client_info=None) -> None: async def updateClientInfo(self, client_info=None) -> None:
"update the internal client info property, if not provided, refresh based on the current address" "update the internal client info property, if not provided, refresh based on the current address"
if client_info is None: if client_info is None:
client_info = await get_client_props_by_address("0x" + self.address) client_info = await get_client_props(addr="0x" + self.address)
try: try:
assert isinstance(client_info, dict) assert isinstance(client_info, dict)
except AssertionError as e: except AssertionError as e:
@ -136,11 +206,13 @@ class Scratch:
return f"{self.uid} {self.address} : {self.client_info} / {self.conf}" return f"{self.uid} {self.address} : {self.client_info} / {self.conf}"
class Extension(Plugin): # pylint: disable=missing-class-docstring # }}}
class Extension(Plugin): # pylint: disable=missing-class-docstring {{{
procs: dict[str, subprocess.Popen] = {} procs: dict[str, subprocess.Popen] = {}
scratches: dict[str, Scratch] = {} scratches: dict[str, Scratch] = {}
transitioning_scratches: set[str] = set() transitioning_scratches: set[str] = set()
_new_scratches: set[str] = set()
_respawned_scratches: set[str] = set() _respawned_scratches: set[str] = set()
scratches_by_address: dict[str, Scratch] = {} scratches_by_address: dict[str, Scratch] = {}
scratches_by_pid: dict[int, Scratch] = {} scratches_by_pid: dict[int, Scratch] = {}
@ -169,24 +241,49 @@ class Extension(Plugin): # pylint: disable=missing-class-docstring
my_config: dict[str, dict[str, Any]] = config["scratchpads"] my_config: dict[str, dict[str, Any]] = config["scratchpads"]
scratches = {k: Scratch(k, v) for k, v in my_config.items()} scratches = {k: Scratch(k, v) for k, v in my_config.items()}
new_scratches = set() scratches_to_spawn = set()
for name in scratches: for name in scratches:
if name not in self.scratches: if name not in self.scratches:
self.scratches[name] = scratches[name] self.scratches[name] = scratches[name]
new_scratches.add(name) is_lazy = scratches[name].conf.get("lazy", False)
if not is_lazy:
scratches_to_spawn.add(name)
else: else:
self.scratches[name].conf = scratches[name].conf self.scratches[name].conf = scratches[name].conf
# not known yet self.log.info(scratches_to_spawn)
for name in new_scratches: for name in scratches_to_spawn:
if not self.scratches[name].conf.get("lazy", False): await self.ensure_alive(name)
await self.start_scratch_command(name, is_new=True) self.scratches[name].should_hide = True
async def start_scratch_command(self, name: str, is_new=False) -> None: async def ensure_alive(self, uid, item=None):
if item is None:
item = self.scratches.get(uid)
if not item.isAlive():
self.log.info("%s is not running, restarting...", uid)
if uid in self.procs:
self.procs[uid].kill()
if item.pid in self.scratches_by_pid:
del self.scratches_by_pid[item.pid]
if item.address in self.scratches_by_address:
del self.scratches_by_address[item.address]
self.log.info(f"starting {uid}")
await self.start_scratch_command(uid)
self.log.info(f"==> Wait for {uid} spawning")
loop_count = count()
while next(loop_count) < 10:
await asyncio.sleep(0.1)
info = await get_client_props(pid=item.pid)
if info:
self.log.info(f"=> {uid} info received on time")
await item.updateClientInfo(info)
self._respawned_scratches.discard(uid)
break
self.log.info(f"=> spawned {uid} as proc {item.pid}")
async def start_scratch_command(self, name: str) -> None:
"spawns a given scratchpad's process" "spawns a given scratchpad's process"
if is_new:
self._new_scratches.add(name)
self._respawned_scratches.add(name) self._respawned_scratches.add(name)
scratch = self.scratches[name] scratch = self.scratches[name]
old_pid = self.procs[name].pid if name in self.procs else 0 old_pid = self.procs[name].pid if name in self.procs else 0
@ -206,19 +303,31 @@ class Extension(Plugin): # pylint: disable=missing-class-docstring
if old_pid and old_pid in self.scratches_by_pid: if old_pid and old_pid in self.scratches_by_pid:
del self.scratches_by_pid[old_pid] del self.scratches_by_pid[old_pid]
# Events async def updateScratchInfo(self, orig_scratch: Scratch | None = None) -> None:
"""Update every scratchpads information if no `scratch` given,
else update a specific scratchpad info"""
pid = orig_scratch.pid if orig_scratch else None
for client in await hyprctlJSON("clients"):
assert isinstance(client, dict)
if pid and pid != client["pid"]:
continue
scratch = self.scratches_by_address.get(client["address"][2:])
if not scratch:
scratch = self.scratches_by_pid.get(client["pid"])
if scratch:
self.scratches_by_address[client["address"][2:]] = scratch
await scratch.updateClientInfo(client)
break
else:
self.log.info("Didn't update scratch info %s" % self)
# Events {{{
async def event_activewindowv2(self, addr) -> None: async def event_activewindowv2(self, addr) -> None:
"active windows hook" "active windows hook"
addr = addr.strip() addr = addr.strip()
scratch = self.scratches_by_address.get(addr) scratch = self.scratches_by_address.get(addr)
if scratch: if not scratch:
if scratch.just_created:
self.log.debug("Hiding just created scratch %s", scratch.uid)
await self.run_hide(scratch.uid, force=True)
scratch.just_created = False
else:
for uid, scratch in self.scratches.items(): for uid, scratch in self.scratches.items():
self.log.info((scratch.address, addr))
if scratch.client_info and scratch.address != addr: if scratch.client_info and scratch.address != addr:
if ( if (
scratch.visible scratch.visible
@ -251,20 +360,21 @@ class Extension(Plugin): # pylint: disable=missing-class-docstring
async def event_openwindow(self, params) -> None: async def event_openwindow(self, params) -> None:
"open windows hook" "open windows hook"
addr, wrkspc, _kls, _title = params.split(",", 3) addr, wrkspc, _kls, _title = params.split(",", 3)
item = self.scratches_by_address.get(addr)
if self._respawned_scratches: if self._respawned_scratches:
item = self.scratches_by_address.get(addr)
if not item and self._respawned_scratches: if not item and self._respawned_scratches:
# hack for windows which aren't related to the process (see #8) # hack for windows which aren't related to the process (see #8)
if not await self._alternative_lookup(): if not await self._alternative_lookup():
self.log.info("Updating Scratch info")
await self.updateScratchInfo() await self.updateScratchInfo()
item = self.scratches_by_address.get(addr) item = self.scratches_by_address.get(addr)
if item and item.just_created: if item and item.should_hide:
if item.uid in self._new_scratches:
await self.run_hide(item.uid, force=True) await self.run_hide(item.uid, force=True)
self._new_scratches.discard(item.uid) if item:
self._respawned_scratches.discard(item.uid) await item.initialize()
item.just_created = False
# }}}
# Commands {{{
async def run_toggle(self, uid: str) -> None: async def run_toggle(self, uid: str) -> None:
"""<name> toggles visibility of scratchpad "name" """ """<name> toggles visibility of scratchpad "name" """
uid = uid.strip() uid = uid.strip()
@ -273,7 +383,7 @@ class Extension(Plugin): # pylint: disable=missing-class-docstring
self.log.warning("%s is not configured", uid) self.log.warning("%s is not configured", uid)
return return
self.log.debug("%s is visible = %s", uid, item.visible) self.log.debug("%s is visible = %s", uid, item.visible)
if item.visible: if item.visible and item.isAlive():
await self.run_hide(uid) await self.run_hide(uid)
else: else:
await self.run_show(uid) await self.run_show(uid)
@ -301,76 +411,7 @@ class Extension(Plugin): # pylint: disable=missing-class-docstring
return # abort sequence return # abort sequence
await asyncio.sleep(0.2) # await for animation to finish await asyncio.sleep(0.2) # await for animation to finish
async def updateScratchInfo(self, orig_scratch: Scratch | None = None) -> None: async def run_show(self, uid) -> None:
"""Update every scratchpads information if no `scratch` given,
else update a specific scratchpad info"""
pid = orig_scratch.pid if orig_scratch else None
for client in await hyprctlJSON("clients"):
assert isinstance(client, dict)
if pid and pid != client["pid"]:
continue
scratch = self.scratches_by_address.get(client["address"][2:])
if not scratch:
scratch = self.scratches_by_pid.get(client["pid"])
if scratch:
self.scratches_by_address[client["address"][2:]] = scratch
if scratch:
await scratch.updateClientInfo(client)
async def run_hide(self, uid: str, force=False, autohide=False) -> None:
"""<name> hides scratchpad "name" """
uid = uid.strip()
scratch = self.scratches.get(uid)
if not scratch:
self.log.warning("%s is not configured", uid)
return
if not scratch.visible and not force:
self.log.warning("%s is already hidden", uid)
return
scratch.visible = False
if not scratch.isAlive():
await self.run_show(uid, force=True)
return
self.log.info("Hiding %s", uid)
addr = "address:0x" + scratch.address
animation_type: str = scratch.conf.get("animation", "").lower()
if animation_type:
await self._anim_hide(animation_type, scratch)
if uid not in self.transitioning_scratches:
await hyprctl(f"movetoworkspacesilent special:scratch_{uid},{addr}")
if (
animation_type and uid in self.focused_window_tracking
): # focus got lost when animating
if not autohide and "address" in self.focused_window_tracking[uid]:
await hyprctl(
f"focuswindow address:{self.focused_window_tracking[uid]['address']}"
)
del self.focused_window_tracking[uid]
async def ensure_alive(self, uid, item=None):
if item is None:
item = self.scratches.get(uid)
if not item.isAlive():
self.log.info("%s is not running, restarting...", uid)
if uid in self.procs:
self.procs[uid].kill()
if item.pid in self.scratches_by_pid:
del self.scratches_by_pid[item.pid]
if item.address in self.scratches_by_address:
del self.scratches_by_address[item.address]
self.log.info(f"starting {uid}")
await self.start_scratch_command(uid)
self.log.info(f"{uid} started")
self.log.info("==> Wait for spawning")
loop_count = count()
while uid in self._respawned_scratches and next(loop_count) < 10:
await asyncio.sleep(0.05)
self.log.info(f"=> spawned {uid} as proc {item.pid}")
async def run_show(self, uid, force=False) -> None:
"""<name> shows scratchpad "name" """ """<name> shows scratchpad "name" """
uid = uid.strip() uid = uid.strip()
item = self.scratches.get(uid) item = self.scratches.get(uid)
@ -383,19 +424,15 @@ class Extension(Plugin): # pylint: disable=missing-class-docstring
self.log.warning("%s is not configured", uid) self.log.warning("%s is not configured", uid)
return return
if item.visible and not force:
self.log.warning("%s is already visible", uid)
return
self.log.info("Showing %s", uid) self.log.info("Showing %s", uid)
await self.ensure_alive(uid, item) await self.ensure_alive(uid, item)
await item.updateClientInfo()
await item.initialize()
item.visible = True item.visible = True
monitor = await get_focused_monitor_props() monitor = await get_focused_monitor_props()
assert monitor assert monitor
await self.updateScratchInfo(item)
assert item.address, "No address !" assert item.address, "No address !"
addr = "address:0x" + item.address addr = "address:0x" + item.address
@ -414,46 +451,39 @@ class Extension(Plugin): # pylint: disable=missing-class-docstring
await hyprctl(f"focuswindow {addr}") await hyprctl(f"focuswindow {addr}")
size = item.conf.get("size")
if size:
x_size, y_size = self._convert_coords(size, monitor)
await hyprctl(f"resizewindowpixel exact {x_size} {y_size},{addr}")
position = item.conf.get("position")
if position:
x_pos, y_pos = self._convert_coords(position, monitor)
x_pos_abs, y_pos_abs = x_pos + monitor["x"], y_pos + monitor["y"]
await hyprctl(f"movewindowpixel exact {x_pos_abs} {y_pos_abs},{addr}")
await asyncio.sleep(0.2) # ensure some time for events to propagate await asyncio.sleep(0.2) # ensure some time for events to propagate
self.transitioning_scratches.discard(uid) self.transitioning_scratches.discard(uid)
def _convert_coords(self, coords, monitor): async def run_hide(self, uid: str, force=False, autohide=False) -> None:
""" """<name> hides scratchpad "name" """
Converts a string like "X Y" to coordinates relative to monitor uid = uid.strip()
Supported formats for X, Y: scratch = self.scratches.get(uid)
- Percentage: "V%". V in [0; 100] if not scratch:
self.log.warning("%s is not configured", uid)
return
if not scratch.visible and not force:
self.log.warning("%s is already hidden", uid)
return
scratch.visible = False
self.log.info("Hiding %s", uid)
addr = "address:0x" + scratch.address
animation_type: str = scratch.conf.get("animation", "").lower()
if animation_type:
await self._anim_hide(animation_type, scratch)
Example: if uid not in self.transitioning_scratches:
"10% 20%", monitor 800x600 => 80, 120 await hyprctl(f"movetoworkspacesilent special:scratch_{uid},{addr}")
"""
assert coords, "coords must be non null" if (
animation_type and uid in self.focused_window_tracking
): # focus got lost when animating
if not autohide and "address" in self.focused_window_tracking[uid]:
await hyprctl(
f"focuswindow address:{self.focused_window_tracking[uid]['address']}"
)
del self.focused_window_tracking[uid]
def convert(s, dim): # }}}
if s[-1] == "%":
p = int(s[:-1])
if p < 0 or p > 100:
raise Exception(f"Percentage must be in range [0; 100], got {p}")
scale = float(monitor["scale"])
return int(monitor[dim] / scale * p / 100)
else:
raise Exception(f"Unsupported format for dimension {dim} size, got {s}")
try:
x_str, y_str = coords.split()
return convert(x_str, "width"), convert(y_str, "height") # }}}
except Exception as e:
self.log.error(f"Failed to read coordinates: {e}")
raise e