more linting

This commit is contained in:
fdev31 2023-07-30 13:14:51 +02:00
parent 9c15ce42e2
commit 357f25e123
10 changed files with 129 additions and 91 deletions

View file

@ -45,16 +45,16 @@ def init_logger(filename=None, force_debug=False):
logging.basicConfig()
if filename:
handler = logging.FileHandler(filename)
handler.setFormatter(
file_handler = logging.FileHandler(filename)
file_handler.setFormatter(
logging.Formatter(
fmt=r"%(asctime)s [%(levelname)s] %(name)s :: %(message)s :: %(filename)s:%(lineno)d"
)
)
LogObjects.handlers.append(handler)
handler = logging.StreamHandler()
handler.setFormatter(ScreenLogFormatter())
LogObjects.handlers.append(handler)
LogObjects.handlers.append(file_handler)
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(ScreenLogFormatter())
LogObjects.handlers.append(stream_handler)
def get_logger(name="pypr", level=None):

View file

@ -8,7 +8,7 @@ import os
from .common import get_logger, PyprError
log: Logger = None
log: Logger | None = None
HYPRCTL = f'/tmp/hypr/{ os.environ["HYPRLAND_INSTANCE_SIGNATURE"] }/.socket.sock'
EVENTS = f'/tmp/hypr/{ os.environ["HYPRLAND_INSTANCE_SIGNATURE"] }/.socket2.sock'
@ -21,6 +21,7 @@ async def get_event_stream():
async def hyprctlJSON(command) -> list[dict[str, Any]] | dict[str, Any]:
"""Run an IPC command and return the JSON output."""
assert log
log.debug(command)
try:
ctl_reader, ctl_writer = await asyncio.open_unix_connection(HYPRCTL)
@ -48,6 +49,7 @@ def _format_command(command_list, default_base_command):
async def hyprctl(command, base_command="dispatch") -> bool:
"""Run an IPC command. Returns success value."""
assert log
log.debug(command)
try:
ctl_reader, ctl_writer = await asyncio.open_unix_connection(HYPRCTL)

View file

@ -1,10 +1,13 @@
""" expose Brings every client window to screen for selection
toggle_minimized allows having an "expose" like selection of minimized windows
"""
from typing import Any
from .interface import Plugin
from ..ipc import hyprctlJSON, hyprctl
class Extension(Plugin):
class Extension(Plugin): # pylint: disable=missing-class-docstring
exposed = False
async def run_toggle_minimized(self, special_workspace="minimized"):
@ -24,12 +27,14 @@ class Extension(Plugin):
@property
def exposed_clients(self):
"Returns the list of clients currently using exposed mode"
if self.config.get("include_special", False):
return self.exposed
return [c for c in self.exposed if c["workspace"]["id"] > 0]
async def run_expose(self):
"""Expose every client on the active workspace. If expose is active restores everything and move to the focused window"""
"""Expose every client on the active workspace.
If expose is active restores everything and move to the focused window"""
if self.exposed:
aw: dict[str, Any] = await hyprctlJSON("activewindow")
focused_addr = aw["address"]

View file

@ -1,9 +1,12 @@
" Moves unreachable client windows to the currently focused workspace"
from typing import Any
from .interface import Plugin
from ..ipc import hyprctlJSON, hyprctl
def contains(monitor, window):
"Tell if a window is visible in a monitor"
if not (
window["at"][0] > monitor["x"]
and window["at"][0] < monitor["x"] + monitor["width"]
@ -17,17 +20,17 @@ def contains(monitor, window):
return True
class Extension(Plugin):
class Extension(Plugin): # pylint: disable=missing-class-docstring
async def run_attract_lost(self):
"""Brings lost floating windows to the current workspace"""
monitors = await hyprctlJSON("monitors")
monitors: list[dict[str, Any]] = await hyprctlJSON("monitors")
windows = await hyprctlJSON("clients")
lost = [
win
for win in windows
if win["floating"] and not any(contains(mon, win) for mon in monitors)
]
focused = [mon for mon in monitors if mon["focused"]][0]
focused: dict[str, Any] = [mon for mon in monitors if mon["focused"]][0]
interval = focused["width"] / (1 + len(lost))
interval_y = focused["height"] / (1 + len(lost))
batch = []
@ -35,8 +38,8 @@ class Extension(Plugin):
margin = interval // 2
margin_y = interval_y // 2
for i, window in enumerate(lost):
pos_x = int(margin + focused["x"] + i * interval)
pos_y = {int(margin_y + focused["y"] + i * interval_y)}
batch.append(f'movetoworkspacesilent {workspace},pid:{window["pid"]}')
batch.append(
f'movewindowpixel exact {int(margin + focused["x"] + i*interval)} {int(margin_y + focused["y"] + i*interval_y)},pid:{window["pid"]}'
)
batch.append(f'movewindowpixel exact {pos_x} {pos_y},pid:{window["pid"]}')
await hyprctl(batch)

View file

@ -1,9 +1,10 @@
" Toggles workspace zooming "
from .interface import Plugin
from ..ipc import hyprctl
class Extension(Plugin):
class Extension(Plugin): # pylint: disable=missing-class-docstring
zoomed = False
async def run_zoom(self, *args):

View file

@ -47,30 +47,42 @@ class Extension(Plugin): # pylint: disable=missing-class-docstring
)
async def event_monitoradded(
self, screenid, no_default=False, monitors: list | None = None
self, monitor_name, no_default=False, monitors: list | None = None
) -> None:
"Triggers when a monitor is plugged"
screenid = screenid.strip()
monitor_name = monitor_name.strip()
if not monitors:
monitors: list[dict[str, Any]] = await hyprctlJSON("monitors")
monitors = await hyprctlJSON("monitors")
assert monitors
for mon in monitors:
if mon["name"].startswith(screenid):
mon_name = mon["description"]
if mon["name"].startswith(monitor_name):
mon_description = mon["description"]
break
else:
self.log.info("Monitor %s not found", screenid)
self.log.info("Monitor %s not found", monitor_name)
return
if self._place_monitors(monitor_name, mon_description, monitors):
return
if not no_default:
default_command = self.config.get("unknown")
if default_command:
subprocess.call(default_command, shell=True)
def _place_monitors(
self, monitor_name: str, mon_description: str, monitors: list[dict[str, Any]]
):
"place a given monitor according to config"
mon_by_name = {m["name"]: m for m in monitors}
newmon = mon_by_name[screenid]
newmon = mon_by_name[monitor_name]
for mon_pattern, conf in self.config["placement"].items():
if mon_pattern in mon_name:
for placement, mon_name in conf.items():
ref = mon_by_name[mon_name]
if mon_pattern in mon_description:
for placement, other_mon_description in conf.items():
ref = mon_by_name[other_mon_description]
if ref:
place = placement.lower()
if place == "topof":
@ -86,9 +98,6 @@ class Extension(Plugin): # pylint: disable=missing-class-docstring
x: int = ref["x"] + ref["width"]
y: int = ref["y"]
configure_monitors(monitors, screenid, x, y)
return
if not no_default:
default_command = self.config.get("unknown")
if default_command:
subprocess.call(default_command, shell=True)
configure_monitors(monitors, monitor_name, x, y)
return True
return False

View file

@ -26,8 +26,8 @@ async def get_client_props_by_address(addr: str):
class Animations:
"Animation store"
@classmethod
async def fromtop(cls, monitor, client, client_uid, margin):
@staticmethod
async def fromtop(monitor, client, client_uid, margin):
"Slide from/to top"
scale = float(monitor["scale"])
mon_x = monitor["x"]
@ -39,8 +39,8 @@ class Animations:
await hyprctl(f"movewindowpixel exact {margin_x} {mon_y + margin},{client_uid}")
@classmethod
async def frombottom(cls, monitor, client, client_uid, margin):
@staticmethod
async def frombottom(monitor, client, client_uid, margin):
"Slide from/to bottom"
scale = float(monitor["scale"])
mon_x = monitor["x"]
@ -55,8 +55,8 @@ class Animations:
f"movewindowpixel exact {margin_x} {mon_y + mon_height - client_height - margin},{client_uid}"
)
@classmethod
async def fromleft(cls, monitor, client, client_uid, margin):
@staticmethod
async def fromleft(monitor, client, client_uid, margin):
"Slide from/to left"
scale = float(monitor["scale"])
mon_x = monitor["x"]
@ -68,8 +68,8 @@ class Animations:
await hyprctl(f"movewindowpixel exact {margin + mon_x} {margin_y},{client_uid}")
@classmethod
async def fromright(cls, monitor, client, client_uid, margin):
@staticmethod
async def fromright(monitor, client, client_uid, margin):
"Slide from/to right"
scale = float(monitor["scale"])
mon_x = monitor["x"]
@ -130,7 +130,7 @@ class Scratch:
return f"{self.uid} {self.address} : {self.client_info} / {self.conf}"
class Extension(Plugin):
class Extension(Plugin): # pylint: disable=missing-class-docstring
procs: dict[str, subprocess.Popen] = {}
scratches: dict[str, Scratch] = {}
transitioning_scratches: set[str] = set()
@ -184,16 +184,18 @@ class Extension(Plugin):
self._respawned_scratches.add(name)
scratch = self.scratches[name]
old_pid = self.procs[name].pid if name in self.procs else 0
self.procs[name] = subprocess.Popen(
proc = subprocess.Popen(
scratch.conf["command"],
stdin=subprocess.DEVNULL,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
shell=True,
)
pid = self.procs[name].pid
self.procs[name] = proc
pid = proc.pid
self.scratches[name].reset(pid)
self.scratches_by_pid[self.procs[name].pid] = scratch
self.scratches_by_pid[proc.pid] = scratch
if old_pid and old_pid in self.scratches_by_pid:
del self.scratches_by_pid[old_pid]
@ -218,6 +220,25 @@ class Extension(Plugin):
self.log.debug("hide %s because another client is active", uid)
await self.run_hide(uid, autohide=True)
async def _alternative_lookup(self):
"if class attribute is defined, use class matching and return True"
class_lookup_hack = [
self.scratches[name]
for name in self._respawned_scratches
if self.scratches[name].conf.get("class")
]
if not class_lookup_hack:
return False
self.log.debug("Lookup hack triggered")
for client in await hyprctlJSON("clients"):
assert isinstance(client, dict)
for pending_scratch in class_lookup_hack:
if pending_scratch.conf["class"] == client["class"]:
self.scratches_by_address[client["address"][2:]] = pending_scratch
self.log.debug("client class found: %s", client)
await pending_scratch.updateClientInfo(client)
return True
async def event_openwindow(self, params) -> None:
"open windows hook"
addr, wrkspc, _kls, _title = params.split(",", 3)
@ -225,23 +246,7 @@ class Extension(Plugin):
item = self.scratches_by_address.get(addr)
if not item and self._respawned_scratches:
# hack for windows which aren't related to the process (see #8)
class_lookup_hack = [
self.scratches[name]
for name in self._respawned_scratches
if self.scratches[name].conf.get("class")
]
if class_lookup_hack:
self.log.debug("Lookup hack triggered")
for client in await hyprctlJSON("clients"):
assert isinstance(client, dict)
for pending_scratch in class_lookup_hack:
if pending_scratch.conf["class"] == client["class"]:
self.scratches_by_address[
client["address"][2:]
] = pending_scratch
self.log.debug("client class found: %s", client)
await pending_scratch.updateClientInfo(client)
else:
if not await self._alternative_lookup():
await self.updateScratchInfo()
item = self.scratches_by_address.get(addr)
if item and item.just_created:
@ -264,6 +269,29 @@ class Extension(Plugin):
else:
await self.run_show(uid)
async def _anim_hide(self, animation_type, scratch):
"animate hiding a scratchpad"
addr = "address:0x" + scratch.address
offset = scratch.conf.get("offset")
if offset is None:
if "size" not in scratch.client_info:
await self.updateScratchInfo(scratch)
offset = int(1.3 * scratch.client_info["size"][1])
if animation_type == "fromtop":
await hyprctl(f"movewindowpixel 0 -{offset},{addr}")
elif animation_type == "frombottom":
await hyprctl(f"movewindowpixel 0 {offset},{addr}")
elif animation_type == "fromleft":
await hyprctl(f"movewindowpixel -{offset} 0,{addr}")
elif animation_type == "fromright":
await hyprctl(f"movewindowpixel {offset} 0,{addr}")
if scratch.uid in self.transitioning_scratches:
return # abort sequence
await asyncio.sleep(0.2) # await for animation to finish
async def updateScratchInfo(self, scratch: Scratch | None = None) -> None:
"""Update every scratchpads information if no `scratch` given,
else update a specific scratchpad info"""
@ -288,37 +316,19 @@ class Extension(Plugin):
async def run_hide(self, uid: str, force=False, autohide=False) -> None:
"""<name> hides scratchpad "name" """
uid = uid.strip()
item = self.scratches.get(uid)
if not item:
scratch = self.scratches.get(uid)
if not scratch:
self.log.warning("%s is not configured", uid)
return
if not item.visible and not force:
if not scratch.visible and not force:
self.log.warning("%s is already hidden", uid)
return
self.log.info("Hiding %s", uid)
item.visible = False
addr = "address:0x" + item.address
animation_type: str = item.conf.get("animation", "").lower()
scratch.visible = False
addr = "address:0x" + scratch.address
animation_type: str = scratch.conf.get("animation", "").lower()
if animation_type:
offset = item.conf.get("offset")
if offset is None:
if "size" not in item.client_info:
await self.updateScratchInfo(item)
offset = int(1.3 * item.client_info["size"][1])
if animation_type == "fromtop":
await hyprctl(f"movewindowpixel 0 -{offset},{addr}")
elif animation_type == "frombottom":
await hyprctl(f"movewindowpixel 0 {offset},{addr}")
elif animation_type == "fromleft":
await hyprctl(f"movewindowpixel -{offset} 0,{addr}")
elif animation_type == "fromright":
await hyprctl(f"movewindowpixel {offset} 0,{addr}")
if uid in self.transitioning_scratches:
return # abort sequence
await asyncio.sleep(0.2) # await for animation to finish
await self._anim_hide(animation_type, scratch)
if uid not in self.transitioning_scratches:
await hyprctl(f"movetoworkspacesilent special:scratch_{uid},{addr}")

View file

@ -1,9 +1,10 @@
" shift workspaces across monitors "
from .interface import Plugin
from ..ipc import hyprctlJSON, hyprctl
class Extension(Plugin):
class Extension(Plugin): # pylint: disable=missing-class-docstring
monitors: list[str] = []
async def init(self):
@ -21,7 +22,9 @@ class Extension(Plugin):
await hyprctl(f"swapactiveworkspaces {mon} {self.monitors[i+direction]}")
async def event_monitoradded(self, monitor):
"keep track of monitors"
self.monitors.append(monitor.strip())
async def event_monitorremoved(self, monitor):
"keep track of monitors"
self.monitors.remove(monitor.strip())

View file

@ -1,12 +1,14 @@
" Toggle monitors on or off "
from typing import Any
from .interface import Plugin
from ..ipc import hyprctlJSON, hyprctl
class Extension(Plugin):
class Extension(Plugin): # pylint: disable=missing-class-docstring
async def run_toggle_dpms(self):
"""toggles dpms on/off for every monitor"""
monitors = await hyprctlJSON("monitors")
monitors: list[dict[str, Any]] = await hyprctlJSON("monitors")
powered_off = any(m["dpmsStatus"] for m in monitors)
if not powered_off:
await hyprctl("dpms on")

View file

@ -40,6 +40,9 @@ class Extension(Plugin): # pylint: disable=missing-class-docstring
for monitor in monitors:
if monitor["focused"]:
break
else:
self.log.error("Can not find a focused monitor")
return
assert isinstance(monitor, dict)
busy_workspaces = set(
m["activeWorkspace"]["id"] for m in monitors if m["id"] != monitor["id"]