Compare commits
4 commits
Author | SHA1 | Date | |
---|---|---|---|
|
506782977e | ||
|
1219b0c034 | ||
|
cc023d28ac | ||
|
c236ec763f |
1 changed files with 176 additions and 146 deletions
|
@ -11,17 +11,59 @@ from .interface import Plugin
|
|||
|
||||
DEFAULT_MARGIN = 60
|
||||
|
||||
# Helper functions {{{
|
||||
|
||||
async def get_client_props_by_address(addr: str):
|
||||
|
||||
def convert_coords(logger, coords, monitor):
|
||||
"""
|
||||
Converts a string like "X Y" to coordinates relative to monitor
|
||||
Supported formats for X, Y:
|
||||
- Percentage: "V%". V in [0; 100]
|
||||
|
||||
Example:
|
||||
"10% 20%", monitor 800x600 => 80, 120
|
||||
"""
|
||||
|
||||
assert coords, "coords must be non null"
|
||||
|
||||
def convert(s, dim):
|
||||
if s[-1] == "%":
|
||||
p = int(s[:-1])
|
||||
if p < 0 or p > 100:
|
||||
raise Exception(f"Percentage must be in range [0; 100], got {p}")
|
||||
scale = float(monitor["scale"])
|
||||
return int(monitor[dim] / scale * p / 100)
|
||||
else:
|
||||
raise Exception(f"Unsupported format for dimension {dim} size, got {s}")
|
||||
|
||||
try:
|
||||
x_str, y_str = coords.split()
|
||||
|
||||
return convert(x_str, "width"), convert(y_str, "height")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to read coordinates: {e}")
|
||||
raise e
|
||||
|
||||
|
||||
async def get_client_props(addr: str | None = None, pid: int | None = None):
|
||||
"Returns client properties given its address"
|
||||
assert addr or pid
|
||||
if addr:
|
||||
assert len(addr) > 2, "Client address is invalid"
|
||||
if pid:
|
||||
assert pid, "Client pid is invalid"
|
||||
prop_name = "address" if addr else "pid"
|
||||
prop_value = addr if addr else pid
|
||||
for client in await hyprctlJSON("clients"):
|
||||
assert isinstance(client, dict)
|
||||
if client.get("address") == addr:
|
||||
if client.get(prop_name) == prop_value:
|
||||
return client
|
||||
|
||||
|
||||
class Animations:
|
||||
# }}}
|
||||
|
||||
|
||||
class Animations: # {{{
|
||||
"Animation store"
|
||||
|
||||
@staticmethod
|
||||
|
@ -83,7 +125,10 @@ class Animations:
|
|||
)
|
||||
|
||||
|
||||
class Scratch:
|
||||
# }}}
|
||||
|
||||
|
||||
class Scratch: # {{{
|
||||
"A scratchpad state including configuration & client state"
|
||||
log = logging.getLogger("scratch")
|
||||
|
||||
|
@ -92,8 +137,33 @@ class Scratch:
|
|||
self.pid = 0
|
||||
self.conf = opts
|
||||
self.visible = False
|
||||
self.just_created = True
|
||||
self.client_info = {}
|
||||
self.should_hide = False
|
||||
self.initialized = False
|
||||
|
||||
async def initialize(self):
|
||||
if self.initialized:
|
||||
return
|
||||
self.initialized = True
|
||||
await self.updateClientInfo()
|
||||
await hyprctl(
|
||||
f"movetoworkspacesilent special:scratch_{self.uid},address:0x{self.address}"
|
||||
)
|
||||
|
||||
size = self.conf.get("size")
|
||||
position = self.conf.get("position")
|
||||
monitor = await get_focused_monitor_props()
|
||||
if position:
|
||||
x_pos, y_pos = convert_coords(self.log, position, monitor)
|
||||
x_pos_abs, y_pos_abs = x_pos + monitor["x"], y_pos + monitor["y"]
|
||||
await hyprctl(
|
||||
f"movewindowpixel exact {x_pos_abs} {y_pos_abs},address:0x{self.address}"
|
||||
)
|
||||
if size:
|
||||
x_size, y_size = convert_coords(self.log, size, monitor)
|
||||
await hyprctl(
|
||||
f"resizewindowpixel exact {x_size} {y_size},address:0x{self.address}"
|
||||
)
|
||||
|
||||
def isAlive(self) -> bool:
|
||||
"is the process running ?"
|
||||
|
@ -110,8 +180,8 @@ class Scratch:
|
|||
"clear the object"
|
||||
self.pid = pid
|
||||
self.visible = False
|
||||
self.just_created = True
|
||||
self.client_info = {}
|
||||
self.initialized = False
|
||||
|
||||
@property
|
||||
def address(self) -> str:
|
||||
|
@ -121,7 +191,7 @@ class Scratch:
|
|||
async def updateClientInfo(self, client_info=None) -> None:
|
||||
"update the internal client info property, if not provided, refresh based on the current address"
|
||||
if client_info is None:
|
||||
client_info = await get_client_props_by_address("0x" + self.address)
|
||||
client_info = await get_client_props(addr="0x" + self.address)
|
||||
try:
|
||||
assert isinstance(client_info, dict)
|
||||
except AssertionError as e:
|
||||
|
@ -136,11 +206,13 @@ class Scratch:
|
|||
return f"{self.uid} {self.address} : {self.client_info} / {self.conf}"
|
||||
|
||||
|
||||
class Extension(Plugin): # pylint: disable=missing-class-docstring
|
||||
# }}}
|
||||
|
||||
|
||||
class Extension(Plugin): # pylint: disable=missing-class-docstring {{{
|
||||
procs: dict[str, subprocess.Popen] = {}
|
||||
scratches: dict[str, Scratch] = {}
|
||||
transitioning_scratches: set[str] = set()
|
||||
_new_scratches: set[str] = set()
|
||||
_respawned_scratches: set[str] = set()
|
||||
scratches_by_address: dict[str, Scratch] = {}
|
||||
scratches_by_pid: dict[int, Scratch] = {}
|
||||
|
@ -169,24 +241,49 @@ class Extension(Plugin): # pylint: disable=missing-class-docstring
|
|||
my_config: dict[str, dict[str, Any]] = config["scratchpads"]
|
||||
scratches = {k: Scratch(k, v) for k, v in my_config.items()}
|
||||
|
||||
new_scratches = set()
|
||||
|
||||
scratches_to_spawn = set()
|
||||
for name in scratches:
|
||||
if name not in self.scratches:
|
||||
self.scratches[name] = scratches[name]
|
||||
new_scratches.add(name)
|
||||
is_lazy = scratches[name].conf.get("lazy", False)
|
||||
if not is_lazy:
|
||||
scratches_to_spawn.add(name)
|
||||
else:
|
||||
self.scratches[name].conf = scratches[name].conf
|
||||
|
||||
# not known yet
|
||||
for name in new_scratches:
|
||||
if not self.scratches[name].conf.get("lazy", False):
|
||||
await self.start_scratch_command(name, is_new=True)
|
||||
self.log.info(scratches_to_spawn)
|
||||
for name in scratches_to_spawn:
|
||||
await self.ensure_alive(name)
|
||||
self.scratches[name].should_hide = True
|
||||
|
||||
async def start_scratch_command(self, name: str, is_new=False) -> None:
|
||||
async def ensure_alive(self, uid, item=None):
|
||||
if item is None:
|
||||
item = self.scratches.get(uid)
|
||||
|
||||
if not item.isAlive():
|
||||
self.log.info("%s is not running, restarting...", uid)
|
||||
if uid in self.procs:
|
||||
self.procs[uid].kill()
|
||||
if item.pid in self.scratches_by_pid:
|
||||
del self.scratches_by_pid[item.pid]
|
||||
if item.address in self.scratches_by_address:
|
||||
del self.scratches_by_address[item.address]
|
||||
self.log.info(f"starting {uid}")
|
||||
await self.start_scratch_command(uid)
|
||||
self.log.info(f"==> Wait for {uid} spawning")
|
||||
loop_count = count()
|
||||
while next(loop_count) < 10:
|
||||
await asyncio.sleep(0.1)
|
||||
info = await get_client_props(pid=item.pid)
|
||||
if info:
|
||||
self.log.info(f"=> {uid} info received on time")
|
||||
await item.updateClientInfo(info)
|
||||
self._respawned_scratches.discard(uid)
|
||||
break
|
||||
self.log.info(f"=> spawned {uid} as proc {item.pid}")
|
||||
|
||||
async def start_scratch_command(self, name: str) -> None:
|
||||
"spawns a given scratchpad's process"
|
||||
if is_new:
|
||||
self._new_scratches.add(name)
|
||||
self._respawned_scratches.add(name)
|
||||
scratch = self.scratches[name]
|
||||
old_pid = self.procs[name].pid if name in self.procs else 0
|
||||
|
@ -206,19 +303,31 @@ class Extension(Plugin): # pylint: disable=missing-class-docstring
|
|||
if old_pid and old_pid in self.scratches_by_pid:
|
||||
del self.scratches_by_pid[old_pid]
|
||||
|
||||
# Events
|
||||
async def updateScratchInfo(self, orig_scratch: Scratch | None = None) -> None:
|
||||
"""Update every scratchpads information if no `scratch` given,
|
||||
else update a specific scratchpad info"""
|
||||
pid = orig_scratch.pid if orig_scratch else None
|
||||
for client in await hyprctlJSON("clients"):
|
||||
assert isinstance(client, dict)
|
||||
if pid and pid != client["pid"]:
|
||||
continue
|
||||
scratch = self.scratches_by_address.get(client["address"][2:])
|
||||
if not scratch:
|
||||
scratch = self.scratches_by_pid.get(client["pid"])
|
||||
if scratch:
|
||||
self.scratches_by_address[client["address"][2:]] = scratch
|
||||
await scratch.updateClientInfo(client)
|
||||
break
|
||||
else:
|
||||
self.log.info("Didn't update scratch info %s" % self)
|
||||
|
||||
# Events {{{
|
||||
async def event_activewindowv2(self, addr) -> None:
|
||||
"active windows hook"
|
||||
addr = addr.strip()
|
||||
scratch = self.scratches_by_address.get(addr)
|
||||
if scratch:
|
||||
if scratch.just_created:
|
||||
self.log.debug("Hiding just created scratch %s", scratch.uid)
|
||||
await self.run_hide(scratch.uid, force=True)
|
||||
scratch.just_created = False
|
||||
else:
|
||||
if not scratch:
|
||||
for uid, scratch in self.scratches.items():
|
||||
self.log.info((scratch.address, addr))
|
||||
if scratch.client_info and scratch.address != addr:
|
||||
if (
|
||||
scratch.visible
|
||||
|
@ -251,20 +360,21 @@ class Extension(Plugin): # pylint: disable=missing-class-docstring
|
|||
async def event_openwindow(self, params) -> None:
|
||||
"open windows hook"
|
||||
addr, wrkspc, _kls, _title = params.split(",", 3)
|
||||
if self._respawned_scratches:
|
||||
item = self.scratches_by_address.get(addr)
|
||||
if self._respawned_scratches:
|
||||
if not item and self._respawned_scratches:
|
||||
# hack for windows which aren't related to the process (see #8)
|
||||
if not await self._alternative_lookup():
|
||||
self.log.info("Updating Scratch info")
|
||||
await self.updateScratchInfo()
|
||||
item = self.scratches_by_address.get(addr)
|
||||
if item and item.just_created:
|
||||
if item.uid in self._new_scratches:
|
||||
if item and item.should_hide:
|
||||
await self.run_hide(item.uid, force=True)
|
||||
self._new_scratches.discard(item.uid)
|
||||
self._respawned_scratches.discard(item.uid)
|
||||
item.just_created = False
|
||||
if item:
|
||||
await item.initialize()
|
||||
|
||||
# }}}
|
||||
# Commands {{{
|
||||
async def run_toggle(self, uid: str) -> None:
|
||||
"""<name> toggles visibility of scratchpad "name" """
|
||||
uid = uid.strip()
|
||||
|
@ -273,7 +383,7 @@ class Extension(Plugin): # pylint: disable=missing-class-docstring
|
|||
self.log.warning("%s is not configured", uid)
|
||||
return
|
||||
self.log.debug("%s is visible = %s", uid, item.visible)
|
||||
if item.visible:
|
||||
if item.visible and item.isAlive():
|
||||
await self.run_hide(uid)
|
||||
else:
|
||||
await self.run_show(uid)
|
||||
|
@ -301,76 +411,7 @@ class Extension(Plugin): # pylint: disable=missing-class-docstring
|
|||
return # abort sequence
|
||||
await asyncio.sleep(0.2) # await for animation to finish
|
||||
|
||||
async def updateScratchInfo(self, orig_scratch: Scratch | None = None) -> None:
|
||||
"""Update every scratchpads information if no `scratch` given,
|
||||
else update a specific scratchpad info"""
|
||||
pid = orig_scratch.pid if orig_scratch else None
|
||||
for client in await hyprctlJSON("clients"):
|
||||
assert isinstance(client, dict)
|
||||
if pid and pid != client["pid"]:
|
||||
continue
|
||||
scratch = self.scratches_by_address.get(client["address"][2:])
|
||||
if not scratch:
|
||||
scratch = self.scratches_by_pid.get(client["pid"])
|
||||
if scratch:
|
||||
self.scratches_by_address[client["address"][2:]] = scratch
|
||||
if scratch:
|
||||
await scratch.updateClientInfo(client)
|
||||
|
||||
async def run_hide(self, uid: str, force=False, autohide=False) -> None:
|
||||
"""<name> hides scratchpad "name" """
|
||||
uid = uid.strip()
|
||||
scratch = self.scratches.get(uid)
|
||||
if not scratch:
|
||||
self.log.warning("%s is not configured", uid)
|
||||
return
|
||||
if not scratch.visible and not force:
|
||||
self.log.warning("%s is already hidden", uid)
|
||||
return
|
||||
scratch.visible = False
|
||||
if not scratch.isAlive():
|
||||
await self.run_show(uid, force=True)
|
||||
return
|
||||
self.log.info("Hiding %s", uid)
|
||||
addr = "address:0x" + scratch.address
|
||||
animation_type: str = scratch.conf.get("animation", "").lower()
|
||||
if animation_type:
|
||||
await self._anim_hide(animation_type, scratch)
|
||||
|
||||
if uid not in self.transitioning_scratches:
|
||||
await hyprctl(f"movetoworkspacesilent special:scratch_{uid},{addr}")
|
||||
|
||||
if (
|
||||
animation_type and uid in self.focused_window_tracking
|
||||
): # focus got lost when animating
|
||||
if not autohide and "address" in self.focused_window_tracking[uid]:
|
||||
await hyprctl(
|
||||
f"focuswindow address:{self.focused_window_tracking[uid]['address']}"
|
||||
)
|
||||
del self.focused_window_tracking[uid]
|
||||
|
||||
async def ensure_alive(self, uid, item=None):
|
||||
if item is None:
|
||||
item = self.scratches.get(uid)
|
||||
|
||||
if not item.isAlive():
|
||||
self.log.info("%s is not running, restarting...", uid)
|
||||
if uid in self.procs:
|
||||
self.procs[uid].kill()
|
||||
if item.pid in self.scratches_by_pid:
|
||||
del self.scratches_by_pid[item.pid]
|
||||
if item.address in self.scratches_by_address:
|
||||
del self.scratches_by_address[item.address]
|
||||
self.log.info(f"starting {uid}")
|
||||
await self.start_scratch_command(uid)
|
||||
self.log.info(f"{uid} started")
|
||||
self.log.info("==> Wait for spawning")
|
||||
loop_count = count()
|
||||
while uid in self._respawned_scratches and next(loop_count) < 10:
|
||||
await asyncio.sleep(0.05)
|
||||
self.log.info(f"=> spawned {uid} as proc {item.pid}")
|
||||
|
||||
async def run_show(self, uid, force=False) -> None:
|
||||
async def run_show(self, uid) -> None:
|
||||
"""<name> shows scratchpad "name" """
|
||||
uid = uid.strip()
|
||||
item = self.scratches.get(uid)
|
||||
|
@ -383,19 +424,15 @@ class Extension(Plugin): # pylint: disable=missing-class-docstring
|
|||
self.log.warning("%s is not configured", uid)
|
||||
return
|
||||
|
||||
if item.visible and not force:
|
||||
self.log.warning("%s is already visible", uid)
|
||||
return
|
||||
|
||||
self.log.info("Showing %s", uid)
|
||||
await self.ensure_alive(uid, item)
|
||||
await item.updateClientInfo()
|
||||
await item.initialize()
|
||||
|
||||
item.visible = True
|
||||
monitor = await get_focused_monitor_props()
|
||||
assert monitor
|
||||
|
||||
await self.updateScratchInfo(item)
|
||||
|
||||
assert item.address, "No address !"
|
||||
|
||||
addr = "address:0x" + item.address
|
||||
|
@ -414,46 +451,39 @@ class Extension(Plugin): # pylint: disable=missing-class-docstring
|
|||
|
||||
await hyprctl(f"focuswindow {addr}")
|
||||
|
||||
size = item.conf.get("size")
|
||||
if size:
|
||||
x_size, y_size = self._convert_coords(size, monitor)
|
||||
await hyprctl(f"resizewindowpixel exact {x_size} {y_size},{addr}")
|
||||
|
||||
position = item.conf.get("position")
|
||||
if position:
|
||||
x_pos, y_pos = self._convert_coords(position, monitor)
|
||||
x_pos_abs, y_pos_abs = x_pos + monitor["x"], y_pos + monitor["y"]
|
||||
await hyprctl(f"movewindowpixel exact {x_pos_abs} {y_pos_abs},{addr}")
|
||||
|
||||
await asyncio.sleep(0.2) # ensure some time for events to propagate
|
||||
self.transitioning_scratches.discard(uid)
|
||||
|
||||
def _convert_coords(self, coords, monitor):
|
||||
"""
|
||||
Converts a string like "X Y" to coordinates relative to monitor
|
||||
Supported formats for X, Y:
|
||||
- Percentage: "V%". V in [0; 100]
|
||||
async def run_hide(self, uid: str, force=False, autohide=False) -> None:
|
||||
"""<name> hides scratchpad "name" """
|
||||
uid = uid.strip()
|
||||
scratch = self.scratches.get(uid)
|
||||
if not scratch:
|
||||
self.log.warning("%s is not configured", uid)
|
||||
return
|
||||
if not scratch.visible and not force:
|
||||
self.log.warning("%s is already hidden", uid)
|
||||
return
|
||||
scratch.visible = False
|
||||
self.log.info("Hiding %s", uid)
|
||||
addr = "address:0x" + scratch.address
|
||||
animation_type: str = scratch.conf.get("animation", "").lower()
|
||||
if animation_type:
|
||||
await self._anim_hide(animation_type, scratch)
|
||||
|
||||
Example:
|
||||
"10% 20%", monitor 800x600 => 80, 120
|
||||
"""
|
||||
if uid not in self.transitioning_scratches:
|
||||
await hyprctl(f"movetoworkspacesilent special:scratch_{uid},{addr}")
|
||||
|
||||
assert coords, "coords must be non null"
|
||||
if (
|
||||
animation_type and uid in self.focused_window_tracking
|
||||
): # focus got lost when animating
|
||||
if not autohide and "address" in self.focused_window_tracking[uid]:
|
||||
await hyprctl(
|
||||
f"focuswindow address:{self.focused_window_tracking[uid]['address']}"
|
||||
)
|
||||
del self.focused_window_tracking[uid]
|
||||
|
||||
def convert(s, dim):
|
||||
if s[-1] == "%":
|
||||
p = int(s[:-1])
|
||||
if p < 0 or p > 100:
|
||||
raise Exception(f"Percentage must be in range [0; 100], got {p}")
|
||||
scale = float(monitor["scale"])
|
||||
return int(monitor[dim] / scale * p / 100)
|
||||
else:
|
||||
raise Exception(f"Unsupported format for dimension {dim} size, got {s}")
|
||||
# }}}
|
||||
|
||||
try:
|
||||
x_str, y_str = coords.split()
|
||||
|
||||
return convert(x_str, "width"), convert(y_str, "height")
|
||||
except Exception as e:
|
||||
self.log.error(f"Failed to read coordinates: {e}")
|
||||
raise e
|
||||
# }}}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue