Compare commits

..

1 commit

2 changed files with 147 additions and 177 deletions

View file

@ -42,7 +42,7 @@ class Extension(Plugin): # pylint: disable=missing-class-docstring
await hyprctl( await hyprctl(
f"movetoworkspacesilent {client['workspace']['id']},address:{client['address']}" f"movetoworkspacesilent {client['workspace']['id']},address:{client['address']}"
) )
await hyprctl("togglespecialworkspace exposed") # await hyprctl("togglespecialworkspace exposed")
await hyprctl(f"focuswindow address:{focused_addr}") await hyprctl(f"focuswindow address:{focused_addr}")
self.exposed = [] self.exposed = []
else: else:

View file

@ -11,59 +11,17 @@ from .interface import Plugin
DEFAULT_MARGIN = 60 DEFAULT_MARGIN = 60
# Helper functions {{{
async def get_client_props_by_address(addr: str):
def convert_coords(logger, coords, monitor):
"""
Converts a string like "X Y" to coordinates relative to monitor
Supported formats for X, Y:
- Percentage: "V%". V in [0; 100]
Example:
"10% 20%", monitor 800x600 => 80, 120
"""
assert coords, "coords must be non null"
def convert(s, dim):
if s[-1] == "%":
p = int(s[:-1])
if p < 0 or p > 100:
raise Exception(f"Percentage must be in range [0; 100], got {p}")
scale = float(monitor["scale"])
return int(monitor[dim] / scale * p / 100)
else:
raise Exception(f"Unsupported format for dimension {dim} size, got {s}")
try:
x_str, y_str = coords.split()
return convert(x_str, "width"), convert(y_str, "height")
except Exception as e:
logger.error(f"Failed to read coordinates: {e}")
raise e
async def get_client_props(addr: str | None = None, pid: int | None = None):
"Returns client properties given its address" "Returns client properties given its address"
assert addr or pid assert len(addr) > 2, "Client address is invalid"
if addr:
assert len(addr) > 2, "Client address is invalid"
if pid:
assert pid, "Client pid is invalid"
prop_name = "address" if addr else "pid"
prop_value = addr if addr else pid
for client in await hyprctlJSON("clients"): for client in await hyprctlJSON("clients"):
assert isinstance(client, dict) assert isinstance(client, dict)
if client.get(prop_name) == prop_value: if client.get("address") == addr:
return client return client
# }}} class Animations:
class Animations: # {{{
"Animation store" "Animation store"
@staticmethod @staticmethod
@ -125,10 +83,7 @@ class Animations: # {{{
) )
# }}} class Scratch:
class Scratch: # {{{
"A scratchpad state including configuration & client state" "A scratchpad state including configuration & client state"
log = logging.getLogger("scratch") log = logging.getLogger("scratch")
@ -137,33 +92,8 @@ class Scratch: # {{{
self.pid = 0 self.pid = 0
self.conf = opts self.conf = opts
self.visible = False self.visible = False
self.just_created = True
self.client_info = {} self.client_info = {}
self.should_hide = False
self.initialized = False
async def initialize(self):
if self.initialized:
return
self.initialized = True
await self.updateClientInfo()
await hyprctl(
f"movetoworkspacesilent special:scratch_{self.uid},address:0x{self.address}"
)
size = self.conf.get("size")
position = self.conf.get("position")
monitor = await get_focused_monitor_props()
if position:
x_pos, y_pos = convert_coords(self.log, position, monitor)
x_pos_abs, y_pos_abs = x_pos + monitor["x"], y_pos + monitor["y"]
await hyprctl(
f"movewindowpixel exact {x_pos_abs} {y_pos_abs},address:0x{self.address}"
)
if size:
x_size, y_size = convert_coords(self.log, size, monitor)
await hyprctl(
f"resizewindowpixel exact {x_size} {y_size},address:0x{self.address}"
)
def isAlive(self) -> bool: def isAlive(self) -> bool:
"is the process running ?" "is the process running ?"
@ -180,8 +110,8 @@ class Scratch: # {{{
"clear the object" "clear the object"
self.pid = pid self.pid = pid
self.visible = False self.visible = False
self.just_created = True
self.client_info = {} self.client_info = {}
self.initialized = False
@property @property
def address(self) -> str: def address(self) -> str:
@ -191,7 +121,7 @@ class Scratch: # {{{
async def updateClientInfo(self, client_info=None) -> None: async def updateClientInfo(self, client_info=None) -> None:
"update the internal client info property, if not provided, refresh based on the current address" "update the internal client info property, if not provided, refresh based on the current address"
if client_info is None: if client_info is None:
client_info = await get_client_props(addr="0x" + self.address) client_info = await get_client_props_by_address("0x" + self.address)
try: try:
assert isinstance(client_info, dict) assert isinstance(client_info, dict)
except AssertionError as e: except AssertionError as e:
@ -206,13 +136,11 @@ class Scratch: # {{{
return f"{self.uid} {self.address} : {self.client_info} / {self.conf}" return f"{self.uid} {self.address} : {self.client_info} / {self.conf}"
# }}} class Extension(Plugin): # pylint: disable=missing-class-docstring
class Extension(Plugin): # pylint: disable=missing-class-docstring {{{
procs: dict[str, subprocess.Popen] = {} procs: dict[str, subprocess.Popen] = {}
scratches: dict[str, Scratch] = {} scratches: dict[str, Scratch] = {}
transitioning_scratches: set[str] = set() transitioning_scratches: set[str] = set()
_new_scratches: set[str] = set()
_respawned_scratches: set[str] = set() _respawned_scratches: set[str] = set()
scratches_by_address: dict[str, Scratch] = {} scratches_by_address: dict[str, Scratch] = {}
scratches_by_pid: dict[int, Scratch] = {} scratches_by_pid: dict[int, Scratch] = {}
@ -241,49 +169,24 @@ class Extension(Plugin): # pylint: disable=missing-class-docstring {{{
my_config: dict[str, dict[str, Any]] = config["scratchpads"] my_config: dict[str, dict[str, Any]] = config["scratchpads"]
scratches = {k: Scratch(k, v) for k, v in my_config.items()} scratches = {k: Scratch(k, v) for k, v in my_config.items()}
scratches_to_spawn = set() new_scratches = set()
for name in scratches: for name in scratches:
if name not in self.scratches: if name not in self.scratches:
self.scratches[name] = scratches[name] self.scratches[name] = scratches[name]
is_lazy = scratches[name].conf.get("lazy", False) new_scratches.add(name)
if not is_lazy:
scratches_to_spawn.add(name)
else: else:
self.scratches[name].conf = scratches[name].conf self.scratches[name].conf = scratches[name].conf
self.log.info(scratches_to_spawn) # not known yet
for name in scratches_to_spawn: for name in new_scratches:
await self.ensure_alive(name) if not self.scratches[name].conf.get("lazy", False):
self.scratches[name].should_hide = True await self.start_scratch_command(name, is_new=True)
async def ensure_alive(self, uid, item=None): async def start_scratch_command(self, name: str, is_new=False) -> None:
if item is None:
item = self.scratches.get(uid)
if not item.isAlive():
self.log.info("%s is not running, restarting...", uid)
if uid in self.procs:
self.procs[uid].kill()
if item.pid in self.scratches_by_pid:
del self.scratches_by_pid[item.pid]
if item.address in self.scratches_by_address:
del self.scratches_by_address[item.address]
self.log.info(f"starting {uid}")
await self.start_scratch_command(uid)
self.log.info(f"==> Wait for {uid} spawning")
loop_count = count()
while next(loop_count) < 10:
await asyncio.sleep(0.1)
info = await get_client_props(pid=item.pid)
if info:
self.log.info(f"=> {uid} info received on time")
await item.updateClientInfo(info)
self._respawned_scratches.discard(uid)
break
self.log.info(f"=> spawned {uid} as proc {item.pid}")
async def start_scratch_command(self, name: str) -> None:
"spawns a given scratchpad's process" "spawns a given scratchpad's process"
if is_new:
self._new_scratches.add(name)
self._respawned_scratches.add(name) self._respawned_scratches.add(name)
scratch = self.scratches[name] scratch = self.scratches[name]
old_pid = self.procs[name].pid if name in self.procs else 0 old_pid = self.procs[name].pid if name in self.procs else 0
@ -303,31 +206,19 @@ class Extension(Plugin): # pylint: disable=missing-class-docstring {{{
if old_pid and old_pid in self.scratches_by_pid: if old_pid and old_pid in self.scratches_by_pid:
del self.scratches_by_pid[old_pid] del self.scratches_by_pid[old_pid]
async def updateScratchInfo(self, orig_scratch: Scratch | None = None) -> None: # Events
"""Update every scratchpads information if no `scratch` given,
else update a specific scratchpad info"""
pid = orig_scratch.pid if orig_scratch else None
for client in await hyprctlJSON("clients"):
assert isinstance(client, dict)
if pid and pid != client["pid"]:
continue
scratch = self.scratches_by_address.get(client["address"][2:])
if not scratch:
scratch = self.scratches_by_pid.get(client["pid"])
if scratch:
self.scratches_by_address[client["address"][2:]] = scratch
await scratch.updateClientInfo(client)
break
else:
self.log.info("Didn't update scratch info %s" % self)
# Events {{{
async def event_activewindowv2(self, addr) -> None: async def event_activewindowv2(self, addr) -> None:
"active windows hook" "active windows hook"
addr = addr.strip() addr = addr.strip()
scratch = self.scratches_by_address.get(addr) scratch = self.scratches_by_address.get(addr)
if not scratch: if scratch:
if scratch.just_created:
self.log.debug("Hiding just created scratch %s", scratch.uid)
await self.run_hide(scratch.uid, force=True)
scratch.just_created = False
else:
for uid, scratch in self.scratches.items(): for uid, scratch in self.scratches.items():
self.log.info((scratch.address, addr))
if scratch.client_info and scratch.address != addr: if scratch.client_info and scratch.address != addr:
if ( if (
scratch.visible scratch.visible
@ -360,21 +251,20 @@ class Extension(Plugin): # pylint: disable=missing-class-docstring {{{
async def event_openwindow(self, params) -> None: async def event_openwindow(self, params) -> None:
"open windows hook" "open windows hook"
addr, wrkspc, _kls, _title = params.split(",", 3) addr, wrkspc, _kls, _title = params.split(",", 3)
item = self.scratches_by_address.get(addr)
if self._respawned_scratches: if self._respawned_scratches:
item = self.scratches_by_address.get(addr)
if not item and self._respawned_scratches: if not item and self._respawned_scratches:
# hack for windows which aren't related to the process (see #8) # hack for windows which aren't related to the process (see #8)
if not await self._alternative_lookup(): if not await self._alternative_lookup():
self.log.info("Updating Scratch info")
await self.updateScratchInfo() await self.updateScratchInfo()
item = self.scratches_by_address.get(addr) item = self.scratches_by_address.get(addr)
if item and item.should_hide: if item and item.just_created:
if item.uid in self._new_scratches:
await self.run_hide(item.uid, force=True) await self.run_hide(item.uid, force=True)
if item: self._new_scratches.discard(item.uid)
await item.initialize() self._respawned_scratches.discard(item.uid)
item.just_created = False
# }}}
# Commands {{{
async def run_toggle(self, uid: str) -> None: async def run_toggle(self, uid: str) -> None:
"""<name> toggles visibility of scratchpad "name" """ """<name> toggles visibility of scratchpad "name" """
uid = uid.strip() uid = uid.strip()
@ -383,7 +273,7 @@ class Extension(Plugin): # pylint: disable=missing-class-docstring {{{
self.log.warning("%s is not configured", uid) self.log.warning("%s is not configured", uid)
return return
self.log.debug("%s is visible = %s", uid, item.visible) self.log.debug("%s is visible = %s", uid, item.visible)
if item.visible and item.isAlive(): if item.visible:
await self.run_hide(uid) await self.run_hide(uid)
else: else:
await self.run_show(uid) await self.run_show(uid)
@ -411,7 +301,76 @@ class Extension(Plugin): # pylint: disable=missing-class-docstring {{{
return # abort sequence return # abort sequence
await asyncio.sleep(0.2) # await for animation to finish await asyncio.sleep(0.2) # await for animation to finish
async def run_show(self, uid) -> None: async def updateScratchInfo(self, orig_scratch: Scratch | None = None) -> None:
"""Update every scratchpads information if no `scratch` given,
else update a specific scratchpad info"""
pid = orig_scratch.pid if orig_scratch else None
for client in await hyprctlJSON("clients"):
assert isinstance(client, dict)
if pid and pid != client["pid"]:
continue
scratch = self.scratches_by_address.get(client["address"][2:])
if not scratch:
scratch = self.scratches_by_pid.get(client["pid"])
if scratch:
self.scratches_by_address[client["address"][2:]] = scratch
if scratch:
await scratch.updateClientInfo(client)
async def run_hide(self, uid: str, force=False, autohide=False) -> None:
"""<name> hides scratchpad "name" """
uid = uid.strip()
scratch = self.scratches.get(uid)
if not scratch:
self.log.warning("%s is not configured", uid)
return
if not scratch.visible and not force:
self.log.warning("%s is already hidden", uid)
return
scratch.visible = False
if not scratch.isAlive():
await self.run_show(uid, force=True)
return
self.log.info("Hiding %s", uid)
addr = "address:0x" + scratch.address
animation_type: str = scratch.conf.get("animation", "").lower()
if animation_type:
await self._anim_hide(animation_type, scratch)
if uid not in self.transitioning_scratches:
await hyprctl(f"movetoworkspacesilent special:scratch_{uid},{addr}")
if (
animation_type and uid in self.focused_window_tracking
): # focus got lost when animating
if not autohide and "address" in self.focused_window_tracking[uid]:
await hyprctl(
f"focuswindow address:{self.focused_window_tracking[uid]['address']}"
)
del self.focused_window_tracking[uid]
async def ensure_alive(self, uid, item=None):
if item is None:
item = self.scratches.get(uid)
if not item.isAlive():
self.log.info("%s is not running, restarting...", uid)
if uid in self.procs:
self.procs[uid].kill()
if item.pid in self.scratches_by_pid:
del self.scratches_by_pid[item.pid]
if item.address in self.scratches_by_address:
del self.scratches_by_address[item.address]
self.log.info(f"starting {uid}")
await self.start_scratch_command(uid)
self.log.info(f"{uid} started")
self.log.info("==> Wait for spawning")
loop_count = count()
while uid in self._respawned_scratches and next(loop_count) < 10:
await asyncio.sleep(0.05)
self.log.info(f"=> spawned {uid} as proc {item.pid}")
async def run_show(self, uid, force=False) -> None:
"""<name> shows scratchpad "name" """ """<name> shows scratchpad "name" """
uid = uid.strip() uid = uid.strip()
item = self.scratches.get(uid) item = self.scratches.get(uid)
@ -424,15 +383,19 @@ class Extension(Plugin): # pylint: disable=missing-class-docstring {{{
self.log.warning("%s is not configured", uid) self.log.warning("%s is not configured", uid)
return return
if item.visible and not force:
self.log.warning("%s is already visible", uid)
return
self.log.info("Showing %s", uid) self.log.info("Showing %s", uid)
await self.ensure_alive(uid, item) await self.ensure_alive(uid, item)
await item.updateClientInfo()
await item.initialize()
item.visible = True item.visible = True
monitor = await get_focused_monitor_props() monitor = await get_focused_monitor_props()
assert monitor assert monitor
await self.updateScratchInfo(item)
assert item.address, "No address !" assert item.address, "No address !"
addr = "address:0x" + item.address addr = "address:0x" + item.address
@ -451,39 +414,46 @@ class Extension(Plugin): # pylint: disable=missing-class-docstring {{{
await hyprctl(f"focuswindow {addr}") await hyprctl(f"focuswindow {addr}")
size = item.conf.get("size")
if size:
x_size, y_size = self._convert_coords(size, monitor)
await hyprctl(f"resizewindowpixel exact {x_size} {y_size},{addr}")
position = item.conf.get("position")
if position:
x_pos, y_pos = self._convert_coords(position, monitor)
x_pos_abs, y_pos_abs = x_pos + monitor["x"], y_pos + monitor["y"]
await hyprctl(f"movewindowpixel exact {x_pos_abs} {y_pos_abs},{addr}")
await asyncio.sleep(0.2) # ensure some time for events to propagate await asyncio.sleep(0.2) # ensure some time for events to propagate
self.transitioning_scratches.discard(uid) self.transitioning_scratches.discard(uid)
async def run_hide(self, uid: str, force=False, autohide=False) -> None: def _convert_coords(self, coords, monitor):
"""<name> hides scratchpad "name" """ """
uid = uid.strip() Converts a string like "X Y" to coordinates relative to monitor
scratch = self.scratches.get(uid) Supported formats for X, Y:
if not scratch: - Percentage: "V%". V in [0; 100]
self.log.warning("%s is not configured", uid)
return
if not scratch.visible and not force:
self.log.warning("%s is already hidden", uid)
return
scratch.visible = False
self.log.info("Hiding %s", uid)
addr = "address:0x" + scratch.address
animation_type: str = scratch.conf.get("animation", "").lower()
if animation_type:
await self._anim_hide(animation_type, scratch)
if uid not in self.transitioning_scratches: Example:
await hyprctl(f"movetoworkspacesilent special:scratch_{uid},{addr}") "10% 20%", monitor 800x600 => 80, 120
"""
if ( assert coords, "coords must be non null"
animation_type and uid in self.focused_window_tracking
): # focus got lost when animating
if not autohide and "address" in self.focused_window_tracking[uid]:
await hyprctl(
f"focuswindow address:{self.focused_window_tracking[uid]['address']}"
)
del self.focused_window_tracking[uid]
# }}} def convert(s, dim):
if s[-1] == "%":
p = int(s[:-1])
if p < 0 or p > 100:
raise Exception(f"Percentage must be in range [0; 100], got {p}")
scale = float(monitor["scale"])
return int(monitor[dim] / scale * p / 100)
else:
raise Exception(f"Unsupported format for dimension {dim} size, got {s}")
try:
x_str, y_str = coords.split()
# }}} return convert(x_str, "width"), convert(y_str, "height")
except Exception as e:
self.log.error(f"Failed to read coordinates: {e}")
raise e