This commit is contained in:
fdev31 2023-07-29 17:47:52 +02:00
parent 258f2e0988
commit 14d8fb449f
12 changed files with 161 additions and 99 deletions

View file

@ -1,4 +1,5 @@
#!/bin/env python #!/bin/env python
" Pyprland - an Hyprland companion app "
import asyncio import asyncio
import json import json
import sys import sys
@ -17,25 +18,29 @@ CONFIG_FILE = "~/.config/hypr/pyprland.json"
class Pyprland: class Pyprland:
"Main app object"
server: asyncio.Server server: asyncio.Server
event_reader: asyncio.StreamReader event_reader: asyncio.StreamReader
stopped = False stopped = False
name = "builtin" name = "builtin"
config: dict[str, dict] = None
def __init__(self): def __init__(self):
self.plugins: dict[str, Plugin] = {} self.plugins: dict[str, Plugin] = {}
self.log = get_logger() self.log = get_logger()
async def load_config(self, init=True): async def load_config(self, init=True):
"""Loads the configuration
if `init` is true, also initializes the plugins"""
try: try:
self.config = json.loads( with open(os.path.expanduser(CONFIG_FILE), encoding="utf-8") as f:
open(os.path.expanduser(CONFIG_FILE), encoding="utf-8").read() self.config = json.loads(f.read())
)
except FileNotFoundError as e: except FileNotFoundError as e:
self.log.critical( self.log.critical(
"No config file found, create one at ~/.config/hypr/pyprland.json with a valid pyprland.plugins list" "No config file found, create one at ~/.config/hypr/pyprland.json with a valid pyprland.plugins list"
) )
raise PyprError() raise PyprError() from e
for name in self.config["pyprland"]["plugins"]: for name in self.config["pyprland"]["plugins"]:
if name not in self.plugins: if name not in self.plugins:
@ -46,27 +51,31 @@ class Pyprland:
await plug.init() await plug.init()
self.plugins[name] = plug self.plugins[name] = plug
except Exception as e: except Exception as e:
self.log.error(f"Error loading plugin {name}:", exc_info=True) self.log.error("Error loading plugin %s:", name, exc_info=True)
raise PyprError() raise PyprError() from e
if init: if init:
try: try:
await self.plugins[name].load_config(self.config) await self.plugins[name].load_config(self.config)
except PyprError: except PyprError:
raise raise
except Exception as e: except Exception as e:
self.log.error(f"Error initializing plugin {name}:", exc_info=True) self.log.error("Error initializing plugin %s:", name, exc_info=True)
raise PyprError() raise PyprError() from e
async def _callHandler(self, full_name, *params): async def _callHandler(self, full_name, *params):
"Call an event handler with params"
for plugin in [self] + list(self.plugins.values()): for plugin in [self] + list(self.plugins.values()):
if hasattr(plugin, full_name): if hasattr(plugin, full_name):
try: try:
await getattr(plugin, full_name)(*params) await getattr(plugin, full_name)(*params)
except Exception as e: except Exception as e: # pylint: disable=W0718
self.log.warn(f"{plugin.name}::{full_name}({params}) failed:") self.log.warning(
"%s::%s(%s) failed:", plugin.name, full_name, params
)
self.log.exception(e) self.log.exception(e)
async def read_events_loop(self): async def read_events_loop(self):
"Consumes the event loop and calls corresponding handlers"
while not self.stopped: while not self.stopped:
data = (await self.event_reader.readline()).decode() data = (await self.event_reader.readline()).decode()
if not data: if not data:
@ -75,10 +84,11 @@ class Pyprland:
cmd, params = data.split(">>") cmd, params = data.split(">>")
full_name = f"event_{cmd}" full_name = f"event_{cmd}"
self.log.debug(f"EVT {full_name}({params.strip()})") self.log.debug("EVT %s(%s)", full_name, params.strip())
await self._callHandler(full_name, params) await self._callHandler(full_name, params)
async def read_command(self, reader, writer) -> None: async def read_command(self, reader, writer) -> None:
"Receives a socket command"
data = (await reader.readline()).decode() data = (await reader.readline()).decode()
if not data: if not data:
self.log.critical("Server starved") self.log.critical("Server starved")
@ -102,11 +112,12 @@ class Pyprland:
# run mako for notifications & uncomment this # run mako for notifications & uncomment this
# os.system(f"notify-send '{data}'") # os.system(f"notify-send '{data}'")
self.log.debug(f"CMD: {full_name}({args})") self.log.debug("CMD: %s(%s)", full_name, args)
await self._callHandler(full_name, *args) await self._callHandler(full_name, *args)
async def serve(self): async def serve(self):
"Runs the server"
try: try:
async with self.server: async with self.server:
await self.server.serve_forever() await self.server.serve_forever()
@ -114,6 +125,7 @@ class Pyprland:
await asyncio.gather(*(plugin.exit() for plugin in self.plugins.values())) await asyncio.gather(*(plugin.exit() for plugin in self.plugins.values()))
async def run(self): async def run(self):
"Runs the server and the event listener"
await asyncio.gather( await asyncio.gather(
asyncio.create_task(self.serve()), asyncio.create_task(self.serve()),
asyncio.create_task(self.read_events_loop()), asyncio.create_task(self.read_events_loop()),
@ -123,6 +135,7 @@ class Pyprland:
async def run_daemon(): async def run_daemon():
"Runs the server / daemon"
manager = Pyprland() manager = Pyprland()
err_count = itertools.count() err_count = itertools.count()
manager.server = await asyncio.start_unix_server(manager.read_command, CONTROL) manager.server = await asyncio.start_unix_server(manager.read_command, CONTROL)
@ -131,14 +144,13 @@ async def run_daemon():
attempt = next(err_count) attempt = next(err_count)
try: try:
events_reader, events_writer = await get_event_stream() events_reader, events_writer = await get_event_stream()
except Exception as e: except Exception as e: # pylint: disable=W0718
if attempt > max_retry: if attempt > max_retry:
manager.log.critical(f"Failed to open hyprland event stream: {e}.") manager.log.critical("Failed to open hyprland event stream: %s.", e)
raise PyprError() raise PyprError() from e
else: manager.log.warning(
manager.log.warn( "Failed to get event stream: %s}, retry %s/%s...", e, attempt, max_retry
f"Failed to get event stream: {e}, retry {attempt}/{max_retry}..." )
)
await asyncio.sleep(1) await asyncio.sleep(1)
else: else:
break break
@ -147,11 +159,11 @@ async def run_daemon():
try: try:
await manager.load_config() # ensure sockets are connected first await manager.load_config() # ensure sockets are connected first
except PyprError: except PyprError as e:
raise SystemExit(1) raise SystemExit(1) from e
except Exception as e: except Exception as e:
manager.log.critical(f"Failed to load config.") manager.log.critical("Failed to load config.")
raise SystemExit(1) raise SystemExit(1) from e
try: try:
await manager.run() await manager.run()
@ -167,6 +179,7 @@ async def run_daemon():
async def run_client(): async def run_client():
"Runs the client (CLI)"
manager = Pyprland() manager = Pyprland()
if sys.argv[1] in ("--help", "-h", "help"): if sys.argv[1] in ("--help", "-h", "help"):
await manager.load_config(init=False) await manager.load_config(init=False)
@ -192,17 +205,18 @@ Commands:
try: try:
_, writer = await asyncio.open_unix_connection(CONTROL) _, writer = await asyncio.open_unix_connection(CONTROL)
except FileNotFoundError: except FileNotFoundError as e:
manager.log.critical("Failed to open control socket, is pypr daemon running ?") manager.log.critical("Failed to open control socket, is pypr daemon running ?")
raise PyprError() raise PyprError() from e
else:
writer.write((" ".join(sys.argv[1:])).encode()) writer.write((" ".join(sys.argv[1:])).encode())
await writer.drain() await writer.drain()
writer.close() writer.close()
await writer.wait_closed() await writer.wait_closed()
def main(): def main():
"runs the command"
if "--debug" in sys.argv: if "--debug" in sys.argv:
i = sys.argv.index("--debug") i = sys.argv.index("--debug")
init_logger(filename=sys.argv[i + 1], force_debug=True) init_logger(filename=sys.argv[i + 1], force_debug=True)
@ -215,11 +229,11 @@ def main():
asyncio.run(run_daemon() if len(sys.argv) <= 1 else run_client()) asyncio.run(run_daemon() if len(sys.argv) <= 1 else run_client())
except KeyboardInterrupt: except KeyboardInterrupt:
pass pass
except PyprError as e: except PyprError:
log.critical(f"Command failed.") log.critical("Command failed.")
except json.decoder.JSONDecodeError as e: except json.decoder.JSONDecodeError as e:
log.critical(f"Invalid JSON syntax in the config file: {e.args[0]}") log.critical("Invalid JSON syntax in the config file: %s", e.args[0])
except Exception as e: except Exception: # pylint: disable=W0718
log.critical("Unhandled exception:", exc_info=True) log.critical("Unhandled exception:", exc_info=True)

View file

@ -1,3 +1,4 @@
""" Shared utilities: logging """
import os import os
import logging import logging
@ -7,19 +8,23 @@ DEBUG = os.environ.get("DEBUG", False)
class PyprError(Exception): class PyprError(Exception):
pass """Used for errors which already triggered logging"""
class LogObjects: class LogObjects:
"""Reusable objects for loggers"""
handlers: list[logging.Handler] = [] handlers: list[logging.Handler] = []
def init_logger(filename=None, force_debug=False): def init_logger(filename=None, force_debug=False):
"""initializes the logging system"""
global DEBUG global DEBUG
if force_debug: if force_debug:
DEBUG = True DEBUG = True
class ScreenLogFormatter(logging.Formatter): class ScreenLogFormatter(logging.Formatter):
"A custom formatter, adding colors"
LOG_FORMAT = ( LOG_FORMAT = (
r"%(levelname)s:%(name)s - %(message)s // %(filename)s:%(lineno)d" r"%(levelname)s:%(name)s - %(message)s // %(filename)s:%(lineno)d"
if DEBUG if DEBUG
@ -53,6 +58,7 @@ def init_logger(filename=None, force_debug=False):
def get_logger(name="pypr", level=None): def get_logger(name="pypr", level=None):
"Returns a logger for `name`"
logger = logging.getLogger(name) logger = logging.getLogger(name)
if level is None: if level is None:
logger.setLevel(logging.DEBUG if DEBUG else logging.INFO) logger.setLevel(logging.DEBUG if DEBUG else logging.INFO)
@ -61,5 +67,5 @@ def get_logger(name="pypr", level=None):
logger.propagate = False logger.propagate = False
for handler in LogObjects.handlers: for handler in LogObjects.handlers:
logger.addHandler(handler) logger.addHandler(handler)
logger.debug(f"Logger initialized for {name}") logger.debug("Logger initialized for %s", name)
return logger return logger

View file

@ -1,4 +1,5 @@
#!/bin/env python #!/bin/env python
""" Interact with hyprland using sockets """
import asyncio import asyncio
from logging import Logger from logging import Logger
from typing import Any from typing import Any
@ -14,17 +15,18 @@ EVENTS = f'/tmp/hypr/{ os.environ["HYPRLAND_INSTANCE_SIGNATURE"] }/.socket2.sock
async def get_event_stream(): async def get_event_stream():
"Returns a new event socket connection"
return await asyncio.open_unix_connection(EVENTS) return await asyncio.open_unix_connection(EVENTS)
async def hyprctlJSON(command) -> list[dict[str, Any]] | dict[str, Any]: async def hyprctlJSON(command) -> list[dict[str, Any]] | dict[str, Any]:
"""Run an IPC command and return the JSON output.""" """Run an IPC command and return the JSON output."""
log.debug(f"JS>> {command}") log.debug("JS>> %s", command)
try: try:
ctl_reader, ctl_writer = await asyncio.open_unix_connection(HYPRCTL) ctl_reader, ctl_writer = await asyncio.open_unix_connection(HYPRCTL)
except FileNotFoundError: except FileNotFoundError as e:
log.critical("hyprctl socket not found! is it running ?") log.critical("hyprctl socket not found! is it running ?")
raise PyprError() raise PyprError() from e
ctl_writer.write(f"-j/{command}".encode()) ctl_writer.write(f"-j/{command}".encode())
await ctl_writer.drain() await ctl_writer.drain()
resp = await ctl_reader.read() resp = await ctl_reader.read()
@ -36,6 +38,7 @@ async def hyprctlJSON(command) -> list[dict[str, Any]] | dict[str, Any]:
def _format_command(command_list, default_base_command): def _format_command(command_list, default_base_command):
"helper function to format BATCH commands"
for command in command_list: for command in command_list:
if isinstance(command, str): if isinstance(command, str):
yield f"{default_base_command} {command}" yield f"{default_base_command} {command}"
@ -45,12 +48,12 @@ def _format_command(command_list, default_base_command):
async def hyprctl(command, base_command="dispatch") -> bool: async def hyprctl(command, base_command="dispatch") -> bool:
"""Run an IPC command. Returns success value.""" """Run an IPC command. Returns success value."""
log.debug(f"JS>> {command}") log.debug("JS>> %s", command)
try: try:
ctl_reader, ctl_writer = await asyncio.open_unix_connection(HYPRCTL) ctl_reader, ctl_writer = await asyncio.open_unix_connection(HYPRCTL)
except FileNotFoundError: except FileNotFoundError as e:
log.critical("hyprctl socket not found! is it running ?") log.critical("hyprctl socket not found! is it running ?")
raise PyprError() raise PyprError() from e
if isinstance(command, list): if isinstance(command, list):
ctl_writer.write( ctl_writer.write(
@ -62,21 +65,23 @@ async def hyprctl(command, base_command="dispatch") -> bool:
resp = await ctl_reader.read(100) resp = await ctl_reader.read(100)
ctl_writer.close() ctl_writer.close()
await ctl_writer.wait_closed() await ctl_writer.wait_closed()
log.debug(f"<<JS {resp}") log.debug("<<JS %s", resp)
r: bool = resp == b"ok" * (len(resp) // 2) r: bool = resp == b"ok" * (len(resp) // 2)
if not r: if not r:
log.error(f"FAILED {resp}") log.error("FAILED %s", resp)
return r return r
async def get_focused_monitor_props() -> dict[str, Any]: async def get_focused_monitor_props() -> dict[str, Any]:
"Returns focused monitor data"
for monitor in await hyprctlJSON("monitors"): for monitor in await hyprctlJSON("monitors"):
assert isinstance(monitor, dict) assert isinstance(monitor, dict)
if monitor.get("focused") == True: if monitor.get("focused"):
return monitor return monitor
raise RuntimeError("no focused monitor") raise RuntimeError("no focused monitor")
def init(): def init():
"initialize logging"
global log global log
log = get_logger("ipc") log = get_logger("ipc")

View file

@ -1,7 +1,8 @@
" Plugin template "
from .interface import Plugin from .interface import Plugin
from ..ipc import hyprctlJSON, hyprctl # from ..ipc import hyprctlJSON, hyprctl
class Extension(Plugin): class Extension(Plugin):
pass "Sample plugin template"

View file

@ -5,8 +5,7 @@ from ..ipc import hyprctlJSON, hyprctl
class Extension(Plugin): class Extension(Plugin):
async def init(self) -> None: exposed = False
self.exposed = False
async def run_toggle_minimized(self, special_workspace="minimized"): async def run_toggle_minimized(self, special_workspace="minimized"):
"""[name] Toggles switching the focused window to the special workspace "name" (default: minimized)""" """[name] Toggles switching the focused window to the special workspace "name" (default: minimized)"""
@ -30,7 +29,7 @@ class Extension(Plugin):
else: else:
return [c for c in self.exposed if c["workspace"]["id"] > 0] return [c for c in self.exposed if c["workspace"]["id"] > 0]
async def run_expose(self, arg=""): async def run_expose(self):
"""Expose every client on the active workspace. If expose is active restores everything and move to the focused window""" """Expose every client on the active workspace. If expose is active restores everything and move to the focused window"""
if self.exposed: if self.exposed:
aw: dict[str, Any] = await hyprctlJSON("activewindow") aw: dict[str, Any] = await hyprctlJSON("activewindow")

View file

@ -1,19 +1,25 @@
" Common plugin interface "
from typing import Any from typing import Any
from ..common import get_logger from ..common import get_logger
class Plugin: class Plugin:
"Base plugin class, handles logger and config"
def __init__(self, name: str): def __init__(self, name: str):
"create a new plugin `name` and the matching logger"
self.name = name self.name = name
self.log = get_logger(name) self.log = get_logger(name)
self.config: dict[str, Any] = {}
async def init(self): async def init(self):
pass "empty init function"
async def exit(self): async def exit(self):
return "empty exit function"
async def load_config(self, config: dict[str, Any]): async def load_config(self, config: dict[str, Any]):
"Loads the configuration section from the passed `config`"
try: try:
self.config = config[self.name] self.config = config[self.name]
except KeyError: except KeyError:

View file

@ -18,7 +18,7 @@ def contains(monitor, window):
class Extension(Plugin): class Extension(Plugin):
async def run_attract_lost(self, *args): async def run_attract_lost(self):
"""Brings lost floating windows to the current workspace""" """Brings lost floating windows to the current workspace"""
monitors = await hyprctlJSON("monitors") monitors = await hyprctlJSON("monitors")
windows = await hyprctlJSON("clients") windows = await hyprctlJSON("clients")

View file

@ -1,11 +1,10 @@
from .interface import Plugin from .interface import Plugin
from ..ipc import hyprctlJSON, hyprctl from ..ipc import hyprctl
class Extension(Plugin): class Extension(Plugin):
async def init(self): zoomed = False
self.zoomed = False
async def run_zoom(self, *args): async def run_zoom(self, *args):
"""[factor] zooms to "factor" or toggles zoom level ommited""" """[factor] zooms to "factor" or toggles zoom level ommited"""

View file

@ -1,11 +1,13 @@
" The monitors plugin "
import subprocess
from typing import Any from typing import Any
from .interface import Plugin from .interface import Plugin
import subprocess
from ..ipc import hyprctlJSON from ..ipc import hyprctlJSON
def configure_monitors(monitors, screenid: str, x: int, y: int) -> None: def configure_monitors(monitors, screenid: str, x: int, y: int) -> None:
"Apply the configuration change"
x_offset = -x if x < 0 else 0 x_offset = -x if x < 0 else 0
y_offset = -y if y < 0 else 0 y_offset = -y if y < 0 else 0
@ -33,18 +35,19 @@ def configure_monitors(monitors, screenid: str, x: int, y: int) -> None:
subprocess.call(command) subprocess.call(command)
class Extension(Plugin): class Extension(Plugin): # pylint: disable=missing-class-docstring
async def load_config(self, config) -> None: async def load_config(self, config) -> None:
await super().load_config(config) await super().load_config(config)
monitors = await hyprctlJSON("monitors") monitors = await hyprctlJSON("monitors")
for monitor in monitors: for monitor in monitors:
await self.event_monitoradded( await self.event_monitoradded(
monitor["name"], noDefault=True, monitors=monitors monitor["name"], no_default=True, monitors=monitors
) )
async def event_monitoradded( async def event_monitoradded(
self, screenid, noDefault=False, monitors: list | None = None self, screenid, no_default=False, monitors: list | None = None
) -> None: ) -> None:
"Triggers when a monitor is plugged"
screenid = screenid.strip() screenid = screenid.strip()
if not monitors: if not monitors:
@ -55,7 +58,7 @@ class Extension(Plugin):
mon_name = mon["description"] mon_name = mon["description"]
break break
else: else:
self.log.info(f"Monitor {screenid} not found") self.log.info("Monitor %s not found", screenid)
return return
mon_by_name = {m["name"]: m for m in monitors} mon_by_name = {m["name"]: m for m in monitors}
@ -83,7 +86,7 @@ class Extension(Plugin):
configure_monitors(monitors, screenid, x, y) configure_monitors(monitors, screenid, x, y)
return return
if not noDefault: if not no_default:
default_command = self.config.get("unknown") default_command = self.config.get("unknown")
if default_command: if default_command:
subprocess.call(default_command, shell=True) subprocess.call(default_command, shell=True)

View file

@ -1,12 +1,14 @@
" Scratchpads addon "
import os
import asyncio
import subprocess import subprocess
from typing import Any from typing import Any
import asyncio
from ..ipc import ( from ..ipc import (
hyprctl, hyprctl,
hyprctlJSON, hyprctlJSON,
get_focused_monitor_props, get_focused_monitor_props,
) )
import os
from .interface import Plugin from .interface import Plugin
@ -14,6 +16,7 @@ DEFAULT_MARGIN = 60
async def get_client_props_by_address(addr: str): async def get_client_props_by_address(addr: str):
"Returns client properties given its address"
for client in await hyprctlJSON("clients"): for client in await hyprctlJSON("clients"):
assert isinstance(client, dict) assert isinstance(client, dict)
if client.get("address") == addr: if client.get("address") == addr:
@ -21,8 +24,11 @@ async def get_client_props_by_address(addr: str):
class Animations: class Animations:
"Animation store"
@classmethod @classmethod
async def fromtop(cls, monitor, client, client_uid, margin): async def fromtop(cls, monitor, client, client_uid, margin):
"Slide from/to top"
scale = float(monitor["scale"]) scale = float(monitor["scale"])
mon_x = monitor["x"] mon_x = monitor["x"]
mon_y = monitor["y"] mon_y = monitor["y"]
@ -35,6 +41,7 @@ class Animations:
@classmethod @classmethod
async def frombottom(cls, monitor, client, client_uid, margin): async def frombottom(cls, monitor, client, client_uid, margin):
"Slide from/to bottom"
scale = float(monitor["scale"]) scale = float(monitor["scale"])
mon_x = monitor["x"] mon_x = monitor["x"]
mon_y = monitor["y"] mon_y = monitor["y"]
@ -50,6 +57,7 @@ class Animations:
@classmethod @classmethod
async def fromleft(cls, monitor, client, client_uid, margin): async def fromleft(cls, monitor, client, client_uid, margin):
"Slide from/to left"
scale = float(monitor["scale"]) scale = float(monitor["scale"])
mon_x = monitor["x"] mon_x = monitor["x"]
mon_y = monitor["y"] mon_y = monitor["y"]
@ -62,6 +70,7 @@ class Animations:
@classmethod @classmethod
async def fromright(cls, monitor, client, client_uid, margin): async def fromright(cls, monitor, client, client_uid, margin):
"Slide from/to right"
scale = float(monitor["scale"]) scale = float(monitor["scale"])
mon_x = monitor["x"] mon_x = monitor["x"]
mon_y = monitor["y"] mon_y = monitor["y"]
@ -77,54 +86,62 @@ class Animations:
class Scratch: class Scratch:
"A scratchpad state including configuration & client state"
def __init__(self, uid, opts): def __init__(self, uid, opts):
self.uid = uid self.uid = uid
self.pid = 0 self.pid = 0
self.conf = opts self.conf = opts
self.visible = False self.visible = False
self.just_created = True self.just_created = True
self.clientInfo = {} self.client_info = {}
def isAlive(self) -> bool: def isAlive(self) -> bool:
"is the process running ?"
path = f"/proc/{self.pid}" path = f"/proc/{self.pid}"
if os.path.exists(path): if os.path.exists(path):
for line in open(os.path.join(path, "status"), "r").readlines(): with open(os.path.join(path, "status"), "r", encoding="utf-8") as f:
if line.startswith("State"): for line in f.readlines():
state = line.split()[1] if line.startswith("State"):
return state in "RSDTt" # not "Z (zombie)"or "X (dead)" state = line.split()[1]
return state in "RSDTt" # not "Z (zombie)"or "X (dead)"
return False return False
def reset(self, pid: int) -> None: def reset(self, pid: int) -> None:
"clear the object"
self.pid = pid self.pid = pid
self.visible = False self.visible = False
self.just_created = True self.just_created = True
self.clientInfo = {} self.client_info = {}
@property @property
def address(self) -> str: def address(self) -> str:
return str(self.clientInfo.get("address", ""))[2:] "Returns the client address"
return str(self.client_info.get("address", ""))[2:]
async def updateClientInfo(self, clientInfo=None) -> None: async def updateClientInfo(self, clientInfo=None) -> None:
"update the internal client info property, if not provided, refresh based on the current address"
if clientInfo is None: if clientInfo is None:
clientInfo = await get_client_props_by_address("0x" + self.address) clientInfo = await get_client_props_by_address("0x" + self.address)
assert isinstance(clientInfo, dict) assert isinstance(clientInfo, dict)
self.clientInfo.update(clientInfo) self.client_info.update(clientInfo)
def __str__(self): def __str__(self):
return f"{self.uid} {self.address} : {self.clientInfo} / {self.conf}" return f"{self.uid} {self.address} : {self.client_info} / {self.conf}"
class Extension(Plugin): class Extension(Plugin):
async def init(self) -> None: procs: dict[str, subprocess.Popen] = {}
self.procs: dict[str, subprocess.Popen] = {} scratches: dict[str, Scratch] = {}
self.scratches: dict[str, Scratch] = {} transitioning_scratches: set[str] = set()
self.transitioning_scratches: set[str] = set() _respawned_scratches: set[str] = set()
self._respawned_scratches: set[str] = set() scratches_by_address: dict[str, Scratch] = {}
self.scratches_by_address: dict[str, Scratch] = {} scratches_by_pid: dict[int, Scratch] = {}
self.scratches_by_pid: dict[int, Scratch] = {} focused_window_tracking: dict[str, dict] = {}
self.focused_window_tracking = dict()
async def exit(self) -> None: async def exit(self) -> None:
"exit hook"
async def die_in_piece(scratch: Scratch): async def die_in_piece(scratch: Scratch):
proc = self.procs[scratch.uid] proc = self.procs[scratch.uid]
proc.terminate() proc.terminate()
@ -141,6 +158,7 @@ class Extension(Plugin):
) )
async def load_config(self, config) -> None: async def load_config(self, config) -> None:
"config loader"
config: dict[str, dict[str, Any]] = config["scratchpads"] config: dict[str, dict[str, Any]] = config["scratchpads"]
scratches = {k: Scratch(k, v) for k, v in config.items()} scratches = {k: Scratch(k, v) for k, v in config.items()}
@ -159,6 +177,7 @@ class Extension(Plugin):
await self.start_scratch_command(name) await self.start_scratch_command(name)
async def start_scratch_command(self, name: str) -> None: async def start_scratch_command(self, name: str) -> None:
"spawns a given scratchpad's process"
self._respawned_scratches.add(name) self._respawned_scratches.add(name)
scratch = self.scratches[name] scratch = self.scratches[name]
old_pid = self.procs[name].pid if name in self.procs else 0 old_pid = self.procs[name].pid if name in self.procs else 0
@ -177,6 +196,7 @@ class Extension(Plugin):
# Events # Events
async def event_activewindowv2(self, addr) -> None: async def event_activewindowv2(self, addr) -> None:
"active windows hook"
addr = addr.strip() addr = addr.strip()
scratch = self.scratches_by_address.get(addr) scratch = self.scratches_by_address.get(addr)
if scratch: if scratch:
@ -185,7 +205,7 @@ class Extension(Plugin):
scratch.just_created = False scratch.just_created = False
else: else:
for uid, scratch in self.scratches.items(): for uid, scratch in self.scratches.items():
if scratch.clientInfo and scratch.address != addr: if scratch.client_info and scratch.address != addr:
if ( if (
scratch.visible scratch.visible
and scratch.conf.get("unfocus") == "hide" and scratch.conf.get("unfocus") == "hide"
@ -194,7 +214,8 @@ class Extension(Plugin):
await self.run_hide(uid, autohide=True) await self.run_hide(uid, autohide=True)
async def event_openwindow(self, params) -> None: async def event_openwindow(self, params) -> None:
addr, wrkspc, kls, title = params.split(",", 3) "open windows hook"
addr, wrkspc, _kls, _title = params.split(",", 3)
if wrkspc.startswith("special"): if wrkspc.startswith("special"):
item = self.scratches_by_address.get(addr) item = self.scratches_by_address.get(addr)
if not item and self._respawned_scratches: if not item and self._respawned_scratches:
@ -213,7 +234,7 @@ class Extension(Plugin):
self.scratches_by_address[ self.scratches_by_address[
client["address"][2:] client["address"][2:]
] = pending_scratch ] = pending_scratch
self.log.debug(f"client class found: {client}") self.log.debug("client class found: %s", client)
await pending_scratch.updateClientInfo(client) await pending_scratch.updateClientInfo(client)
else: else:
await self.updateScratchInfo() await self.updateScratchInfo()
@ -228,7 +249,7 @@ class Extension(Plugin):
uid = uid.strip() uid = uid.strip()
item = self.scratches.get(uid) item = self.scratches.get(uid)
if not item: if not item:
self.log.warn(f"{uid} is not configured") self.log.warning("%s is not configured", uid)
return return
if item.visible: if item.visible:
await self.run_hide(uid) await self.run_hide(uid)
@ -236,6 +257,8 @@ class Extension(Plugin):
await self.run_show(uid) await self.run_show(uid)
async def updateScratchInfo(self, scratch: Scratch | None = None) -> None: async def updateScratchInfo(self, scratch: Scratch | None = None) -> None:
"""Update every scratchpads information if no `scratch` given,
else update a specific scratchpad info"""
if scratch is None: if scratch is None:
for client in await hyprctlJSON("clients"): for client in await hyprctlJSON("clients"):
assert isinstance(client, dict) assert isinstance(client, dict)
@ -247,34 +270,34 @@ class Extension(Plugin):
if scratch: if scratch:
await scratch.updateClientInfo(client) await scratch.updateClientInfo(client)
else: else:
add_to_address_book = ("address" not in scratch.clientInfo) or ( add_to_address_book = ("address" not in scratch.client_info) or (
scratch.address not in self.scratches_by_address scratch.address not in self.scratches_by_address
) )
await scratch.updateClientInfo() await scratch.updateClientInfo()
if add_to_address_book: if add_to_address_book:
self.scratches_by_address[scratch.clientInfo["address"][2:]] = scratch self.scratches_by_address[scratch.client_info["address"][2:]] = scratch
async def run_hide(self, uid: str, force=False, autohide=False) -> None: async def run_hide(self, uid: str, force=False, autohide=False) -> None:
"""<name> hides scratchpad "name" """ """<name> hides scratchpad "name" """
uid = uid.strip() uid = uid.strip()
item = self.scratches.get(uid) item = self.scratches.get(uid)
if not item: if not item:
self.log.warn(f"{uid} is not configured") self.log.warning("%s is not configured", uid)
return return
if not item.visible and not force: if not item.visible and not force:
self.log.warn(f"{uid} is already hidden") self.log.warning("%s is already hidden", uid)
return return
self.log.info(f"Hiding {uid}") self.log.info("Hiding %s", uid)
item.visible = False item.visible = False
addr = "address:0x" + item.address addr = "address:0x" + item.address
animation_type: str = item.conf.get("animation", "").lower() animation_type: str = item.conf.get("animation", "").lower()
if animation_type: if animation_type:
offset = item.conf.get("offset") offset = item.conf.get("offset")
if offset is None: if offset is None:
if "size" not in item.clientInfo: if "size" not in item.client_info:
await self.updateScratchInfo(item) await self.updateScratchInfo(item)
offset = int(1.3 * item.clientInfo["size"][1]) offset = int(1.3 * item.client_info["size"][1])
if animation_type == "fromtop": if animation_type == "fromtop":
await hyprctl(f"movewindowpixel 0 -{offset},{addr}") await hyprctl(f"movewindowpixel 0 -{offset},{addr}")
@ -309,17 +332,17 @@ class Extension(Plugin):
self.focused_window_tracking[uid] = await hyprctlJSON("activewindow") self.focused_window_tracking[uid] = await hyprctlJSON("activewindow")
if not item: if not item:
self.log.warn(f"{uid} is not configured") self.log.warning("%s is not configured", uid)
return return
if item.visible and not force: if item.visible and not force:
self.log.warn(f"{uid} is already visible") self.log.warning("%s is already visible", uid)
return return
self.log.info(f"Showing {uid}") self.log.info("Showing %s", uid)
if not item.isAlive(): if not item.isAlive():
self.log.info(f"{uid} is not running, restarting...") self.log.info("%s is not running, restarting...", uid)
if uid in self.procs: if uid in self.procs:
self.procs[uid].kill() self.procs[uid].kill()
if item.pid in self.scratches_by_pid: if item.pid in self.scratches_by_pid:
@ -348,7 +371,7 @@ class Extension(Plugin):
if animation_type: if animation_type:
margin = item.conf.get("margin", DEFAULT_MARGIN) margin = item.conf.get("margin", DEFAULT_MARGIN)
fn = getattr(Animations, animation_type) fn = getattr(Animations, animation_type)
await fn(monitor, item.clientInfo, addr, margin) await fn(monitor, item.client_info, addr, margin)
await hyprctl(f"focuswindow {addr}") await hyprctl(f"focuswindow {addr}")
await asyncio.sleep(0.2) # ensure some time for events to propagate await asyncio.sleep(0.2) # ensure some time for events to propagate

View file

@ -4,6 +4,8 @@ from ..ipc import hyprctlJSON, hyprctl
class Extension(Plugin): class Extension(Plugin):
monitors: list[str] = []
async def init(self): async def init(self):
self.monitors = [mon["name"] for mon in await hyprctlJSON("monitors")] self.monitors = [mon["name"] for mon in await hyprctlJSON("monitors")]

View file

@ -1,15 +1,19 @@
import asyncio """ Force workspaces to follow the focus / mouse """
from .interface import Plugin from .interface import Plugin
from ..ipc import hyprctlJSON, hyprctl from ..ipc import hyprctlJSON, hyprctl
class Extension(Plugin): class Extension(Plugin): # pylint: disable=missing-class-docstring
workspace_list: list[int] = []
async def load_config(self, config): async def load_config(self, config):
"loads the config"
await super().load_config(config) await super().load_config(config)
self.workspace_list = list(range(1, self.config.get("max_workspaces", 10) + 1)) self.workspace_list = list(range(1, self.config.get("max_workspaces", 10) + 1))
async def event_focusedmon(self, screenid_index): async def event_focusedmon(self, screenid_index):
"reacts to monitor changes"
monitor_id, workspace_id = screenid_index.split(",") monitor_id, workspace_id = screenid_index.split(",")
workspace_id = int(workspace_id) workspace_id = int(workspace_id)
# move every free workspace to the currently focused desktop # move every free workspace to the currently focused desktop