Compare commits

..

No commits in common. "main" and "1.0.2" have entirely different histories.
main ... 1.0.2

23 changed files with 393 additions and 1206 deletions

View file

@ -1,23 +0,0 @@
name: Pylint
on: [push]
jobs:
build:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.10"]
steps:
- uses: actions/checkout@v3
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v3
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install pylint
- name: Analysing the code with pylint
run: |
pylint $(git ls-files '*.py')

2
.gitignore vendored
View file

@ -1,2 +0,0 @@
# Nix
result

View file

@ -11,7 +11,7 @@ repos:
# hooks: # hooks:
# - id: prettier # - id: prettier
- repo: https://github.com/ambv/black - repo: https://github.com/ambv/black
rev: "23.7.0" rev: "23.3.0"
hooks: hooks:
- id: black - id: black
- repo: https://github.com/lovesegfault/beautysh - repo: https://github.com/lovesegfault/beautysh
@ -19,7 +19,7 @@ repos:
hooks: hooks:
- id: beautysh - id: beautysh
- repo: https://github.com/adrienverge/yamllint - repo: https://github.com/adrienverge/yamllint
rev: "v1.32.0" rev: "v1.31.0"
hooks: hooks:
- id: yamllint - id: yamllint

209
.pylintrc
View file

@ -1,209 +0,0 @@
[MASTER]
# Specify a configuration file.
#rcfile=
# Python code to execute, usually for sys.path manipulation such as
# pygtk.require().
#init-hook=
# Add <file or directory> to the black list. It should be a base name, not a
# path. You may set this option multiple times.
ignore=.hg
# Pickle collected data for later comparisons.
persistent=yes
# List of plugins (as comma separated values of python modules names) to load,
# usually to register additional checkers.
load-plugins=
[MESSAGES CONTROL]
# Enable the message, report, category or checker with the given id(s). You can
# either give multiple identifier separated by comma (,) or put this option
# multiple time.
#enable=
# Disable the message, report, category or checker with the given id(s). You
# can either give multiple identifier separated by comma (,) or put this option
# multiple time (only on the command line, not in the configuration file where
# it should appear only once).
disable=R0903,W0603
[REPORTS]
# Set the output format. Available formats are text, parseable, colorized, msvs
# (visual studio) and html
output-format=text
# Tells whether to display a full report or only the messages
reports=yes
# Python expression which should return a note less than 10 (10 is the highest
# note). You have access to the variables errors warning, statement which
# respectively contain the number of errors / warnings messages and the total
# number of statements analyzed. This is used by the global evaluation report
# (R0004).
evaluation=10.0 - ((float(5 * error + warning + refactor + convention) / statement) * 10)
[BASIC]
# Regular expression which should only match correct module names
module-rgx=(([a-z_][a-z0-9_]*)|([A-Z][a-zA-Z0-9]+))$
# Regular expression which should only match correct module level names
const-rgx=(([A-Z_][A-Z0-9_]*)|(__.*__)|log(_.*)?)$
# Regular expression which should only match correct class names
class-rgx=[A-Z_][a-zA-Z0-9]+$
# Regular expression which should only match correct function names
function-rgx=[a-z_][a-zA-Z0-9_]{2,30}$
# Regular expression which should only match correct method names
method-rgx=[a-z_][a-zA-Z0-9_]{2,30}$
# Regular expression which should only match correct instance attribute names
attr-rgx=[a-z_][a-z0-9_]{2,30}$
# Regular expression which should only match correct argument names
argument-rgx=[a-z_][a-z0-9_]{2,30}$
# Regular expression which should only match correct variable names
variable-rgx=[a-z_][a-z0-9_]{,30}$
# Regular expression which should only match correct list comprehension /
# generator expression variable names
inlinevar-rgx=[A-Za-z_][A-Za-z0-9_]*$
# Good variable names which should always be accepted, separated by a comma
good-names=i,j,k,ex,Run,_
# Bad variable names which should always be refused, separated by a comma
bad-names=foo,bar,baz,toto,tutu,tata,pdb
# Regular expression which should only match functions or classes name which do
# not require a docstring
no-docstring-rgx=__.*__
[FORMAT]
# Maximum number of characters on a single line.
max-line-length=120
# Maximum number of lines in a module
max-module-lines=1000
# String used as indentation unit. This is usually " " (4 spaces) or "\t" (1
# tab).
indent-string=' '
[MISCELLANEOUS]
# List of note tags to take in consideration, separated by a comma.
notes=FIXME,XXX,TODO
[SIMILARITIES]
# Minimum lines number of a similarity.
min-similarity-lines=4
# Ignore comments when computing similarities.
ignore-comments=yes
# Ignore docstrings when computing similarities.
ignore-docstrings=yes
[TYPECHECK]
# Tells whether missing members accessed in mixin class should be ignored. A
# mixin class is detected if its name ends with "mixin" (case insensitive).
ignore-mixin-members=yes
# List of classes names for which member attributes should not be checked
# (useful for classes with attributes dynamically set).
ignored-classes=SQLObject
# List of members which are set dynamically and missed by pylint inference
# system, and so shouldn't trigger E0201 when accessed.
generated-members=REQUEST,acl_users,aq_parent
[VARIABLES]
# Tells whether we should check for unused import in __init__ files.
init-import=no
# A regular expression matching names used for dummy variables (i.e. not used).
dummy-variables-rgx=_|dummy
# List of additional names supposed to be defined in builtins. Remember that
# you should avoid to define new builtins when possible.
#additional-builtins=
additional-builtins = _,DBG
[CLASSES]
# List of method names used to declare (i.e. assign) instance attributes.
defining-attr-methods=__init__,__new__,setUp
[DESIGN]
# Maximum number of arguments for function / method
max-args=7
# Argument names that match this expression will be ignored. Default to name
# with leading underscore
ignored-argument-names=_.*
# Maximum number of locals for function / method body
max-locals=15
# Maximum number of return / yield for function / method body
max-returns=6
# Maximum number of statements in function / method body
max-statements=50
# Maximum number of parents for a class (see R0901).
max-parents=7
# Maximum number of attributes for a class (see R0902).
max-attributes=10
# Minimum number of public methods for a class (see R0903).
min-public-methods=2
# Maximum number of public methods for a class (see R0904).
max-public-methods=20
[IMPORTS]
# Deprecated modules which should not be used, separated by a comma
deprecated-modules=regsub,string,TERMIOS,Bastion,rexec
# Create a graph of every (i.e. internal and external) dependencies in the
# given file (report RP0402 must not be disabled)
import-graph=
# Create a graph of external dependencies in the given file (report RP0402 must
# not be disabled)
ext-import-graph=
# Create a graph of internal dependencies in the given file (report RP0402 must
# not be disabled)
int-import-graph=

202
README.md
View file

@ -1,62 +1,188 @@
# Pyprland # Extensions & tweaks for hyprland
## Scratchpads, smart monitor placement and other tweaks for hyprland
Host process for multiple Hyprland plugins. Host process for multiple Hyprland plugins.
A single config file `~/.config/hypr/pyprland.json` is used, using the following syntax:
Check the [wiki](https://github.com/hyprland-community/pyprland/wiki) for more information. ```json
{
"pyprland": {
"plugins": ["plugin_name"]
},
"plugin_name": {
"plugin_option": 42
}
}
```
# 1.4.2 (WIP) Built-in plugins are:
- [two new options](https://github.com/hyprland-community/pyprland/wiki/Plugins#size-optional) for scratchpads: `position` and `size` - from @iliayar - `scratchpad` implements dropdowns & togglable poppups
- bugfixes - `monitors` allows relative placement of monitors depending on the model
- `workspaces_follow_focus` provides commands and handlers allowing a more flexible workspaces usage on multi-monitor setups
# 1.4.1 ## Installation
- minor bugfixes ```
pip install pyprland
```
# 1.4.0 Don't forget to start the process, for instance:
- Add [expose](https://github.com/hyprland-community/pyprland/wiki/Plugins#expose) addon ```
- scratchpad: add [lazy](https://github.com/hyprland-community/pyprland/wiki/Plugins#lazy-optional) option exec-once = pypr
- fix `scratchpads`'s position on monitors using scaling ```
- improve error handling & logging, enable debug logs with `--debug <filename>`
## 1.3.1 ## Getting started
- `monitors` triggers rules on startup (not only when a monitor is plugged) Create a configuration file in `~/.config/hypr/pyprland.json` enabling a list of plugins, each plugin may have its own configuration needs, eg:
## 1.3.0 ```json
{
"pyprland": {
"plugins": [
"scratchpads",
"monitors",
"workspaces_follow_focus"
]
},
"scratchpads": {
"term": {
"command": "kitty --class kitty-dropterm",
"animation": "fromTop",
"unfocus": "hide"
},
"volume": {
"command": "pavucontrol",
"unfocus": "hide",
"animation": "fromRight"
}
},
"monitors": {
"placement": {
"BenQ PJ": {
"topOf": "eDP-1"
}
}
}
}
```
- Add `shift_monitors` addon # Configuring plugins
- Add `monitors` addon
- scratchpads: more reliable client tracking
- bugfixes
## 1.2.1 ## `monitors`
- scratchpads have their own special workspaces now Requires `wlr-randr`.
- misc improvements
## 1.2.0 Allows relative placement of monitors depending on the model ("description" returned by `hyprctl monitors`).
- Add `magnify` addon ### Configuration
- focus fix when closing a scratchpad
- misc improvements
## 1.1.0 Supported placements are:
- Add `lost_windows` addon - leftOf
- Add `toggle_dpms` addon - topOf
- `workspaces_follow_focus` now requires hyprland 0.25.0 - rightOf
- misc improvements - bottomOf
## 1.0.1, 1.0.2 ## `workspaces_follow_focus`
- bugfixes & improvements Make non-visible workspaces follow the focused monitor.
Also provides commands to switch between workspaces wile preserving the current monitor assignments:
## 1.0 ### Commands
- First release, a modular hpr-scratcher (`scratchpads` plugin) - `change_workspace` `<direction>`: changes the workspace of the focused monitor
- Add `workspaces_follow_focus` addon
Example usage in `hyprland.conf`:
```
bind = $mainMod, K, exec, pypr change_workspace +1
bind = $mainMod, J, exec, pypr change_workspace -1
```
### Configuration
You can set the `max_workspaces` property, defaults to `10`.
## `scratchpads`
Check [hpr-scratcher](https://github.com/hyprland-community/hpr-scratcher), it's fully compatible, just put the configuration under "scratchpads".
As an example, defining two scratchpads:
- _term_ which would be a kitty terminal on upper part of the screen
- _volume_ which would be a pavucontrol window on the right part of the screen
In your `hyprland.conf` add something like this:
```ini
exec-once = hpr-scratcher
# Repeat this for each scratchpad you need
bind = $mainMod,V,exec,hpr-scratcher toggle volume
windowrule = float,^(pavucontrol)$
windowrule = workspace special silent,^(pavucontrol)$
bind = $mainMod,A,exec,hpr-scratcher toggle term
$dropterm = ^(kitty-dropterm)$
windowrule = float,$dropterm
windowrule = workspace special silent,$dropterm
windowrule = size 75% 60%,$dropterm
```
Then in the configuration file, add something like this:
```json
"scratchpads": {
"term": {
"command": "kitty --class kitty-dropterm",
"animation": "fromTop",
"margin": 50,
"unfocus": "hide"
},
"volume": {
"command": "pavucontrol",
"animation": "fromRight"
}
}
```
And you'll be able to toggle pavucontrol with MOD + V.
### Command-line options
- `reload` : reloads the configuration file
- `toggle <scratchpad name>` : toggle the given scratchpad
- `show <scratchpad name>` : show the given scratchpad
- `hide <scratchpad name>` : hide the given scratchpad
Note: with no argument it runs the daemon (doesn't fork in the background)
### Scratchpad Options
#### command
This is the command you wish to run in the scratchpad.
For a nice startup you need to be able to identify this window in `hyprland.conf`, using `--class` is often a good idea.
#### animation
Type of animation to use
- `null` / `""` / not defined
- "fromTop"
- "fromBottom"
- "fromLeft"
- "fromRight"
#### offset (optional)
number of pixels for the animation.
#### unfocus (optional)
allow to hide the window when the focus is lost when set to "hide"
#### margin (optional)
number of pixels separating the scratchpad from the screen border

61
flake.lock generated
View file

@ -1,61 +0,0 @@
{
"nodes": {
"flake-utils": {
"inputs": {
"systems": "systems"
},
"locked": {
"lastModified": 1689068808,
"narHash": "sha256-6ixXo3wt24N/melDWjq70UuHQLxGV8jZvooRanIHXw0=",
"owner": "numtide",
"repo": "flake-utils",
"rev": "919d646de7be200f3bf08cb76ae1f09402b6f9b4",
"type": "github"
},
"original": {
"owner": "numtide",
"repo": "flake-utils",
"type": "github"
}
},
"nixpkgs": {
"locked": {
"lastModified": 1695416179,
"narHash": "sha256-610o1+pwbSu+QuF3GE0NU5xQdTHM3t9wyYhB9l94Cd8=",
"owner": "nixos",
"repo": "nixpkgs",
"rev": "715d72e967ec1dd5ecc71290ee072bcaf5181ed6",
"type": "github"
},
"original": {
"owner": "nixos",
"ref": "nixos-23.05",
"repo": "nixpkgs",
"type": "github"
}
},
"root": {
"inputs": {
"flake-utils": "flake-utils",
"nixpkgs": "nixpkgs"
}
},
"systems": {
"locked": {
"lastModified": 1681028828,
"narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=",
"owner": "nix-systems",
"repo": "default",
"rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e",
"type": "github"
},
"original": {
"owner": "nix-systems",
"repo": "default",
"type": "github"
}
}
},
"root": "root",
"version": 7
}

View file

@ -1,27 +0,0 @@
{
description = "pyprland";
inputs = {
flake-utils.url = "github:numtide/flake-utils";
nixpkgs.url = "github:nixos/nixpkgs/nixos-23.05";
};
outputs = { self, nixpkgs, flake-utils, ... }:
flake-utils.lib.eachDefaultSystem
(system:
let
pkgs = import nixpkgs {
inherit system;
};
in
{
packages = rec {
pyprland = pkgs.poetry2nix.mkPoetryApplication {
projectDir = ./.;
python = pkgs.python310;
};
default = pyprland;
};
}
);
}

7
poetry.lock generated
View file

@ -1,7 +0,0 @@
# This file is automatically @generated by Poetry 1.4.2 and should not be changed by hand.
package = []
[metadata]
lock-version = "2.0"
python-versions = "^3.10"
content-hash = "53f2eabc9c26446fbcc00d348c47878e118afc2054778c3c803a0a8028af27d9"

View file

@ -1,16 +1,14 @@
#!/bin/env python #!/bin/env python
""" Pyprland - an Hyprland companion app (cli client & daemon) """
import asyncio import asyncio
import importlib
import itertools
import json import json
import os
import sys import sys
from typing import cast import os
import importlib
import traceback
from .common import PyprError, get_logger, init_logger
from .ipc import get_event_stream from .ipc import get_event_stream
from .ipc import init as ipc_init from .common import DEBUG
from .plugins.interface import Plugin from .plugins.interface import Plugin
CONTROL = f'/tmp/hypr/{ os.environ["HYPRLAND_INSTANCE_SIGNATURE"] }/.pyprland.sock' CONTROL = f'/tmp/hypr/{ os.environ["HYPRLAND_INSTANCE_SIGNATURE"] }/.pyprland.sock'
@ -19,92 +17,58 @@ CONFIG_FILE = "~/.config/hypr/pyprland.json"
class Pyprland: class Pyprland:
"Main app object"
server: asyncio.Server server: asyncio.Server
event_reader: asyncio.StreamReader event_reader: asyncio.StreamReader
stopped = False stopped = False
name = "builtin" name = "builtin"
config: None | dict[str, dict] = None
def __init__(self): def __init__(self):
self.plugins: dict[str, Plugin] = {} self.plugins: dict[str, Plugin] = {}
self.log = get_logger()
async def load_config(self, init=True): async def load_config(self):
"""Loads the configuration self.config = json.loads(
open(os.path.expanduser(CONFIG_FILE), encoding="utf-8").read()
if `init` is true, also initializes the plugins"""
try:
with open(os.path.expanduser(CONFIG_FILE), encoding="utf-8") as f:
self.config = json.loads(f.read())
except FileNotFoundError as e:
self.log.critical(
"No config file found, create one at ~/.config/hypr/pyprland.json with a valid pyprland.plugins list"
) )
raise PyprError() from e for name in self.config["pyprland"]["plugins"]:
assert self.config
for name in cast(dict, self.config["pyprland"]["plugins"]):
if name not in self.plugins: if name not in self.plugins:
modname = name if "." in name else f"pyprland.plugins.{name}" modname = name if "." in name else f"pyprland.plugins.{name}"
try: try:
plug = importlib.import_module(modname).Extension(name) plug = importlib.import_module(modname).Extension(name)
if init:
await plug.init() await plug.init()
self.plugins[name] = plug self.plugins[name] = plug
except Exception as e: except Exception as e:
self.log.error("Error loading plugin %s:", name, exc_info=True) print(f"Error loading plugin {name}: {e}")
raise PyprError() from e if DEBUG:
if init: traceback.print_exc()
try:
await self.plugins[name].load_config(self.config) await self.plugins[name].load_config(self.config)
except PyprError:
raise
except Exception as e:
self.log.error("Error initializing plugin %s:", name, exc_info=True)
raise PyprError() from e
async def _callHandler(self, full_name, *params): async def _callHandler(self, full_name, *params):
"Call an event handler with params"
for plugin in [self] + list(self.plugins.values()): for plugin in [self] + list(self.plugins.values()):
if hasattr(plugin, full_name): if hasattr(plugin, full_name):
self.log.debug("%s.%s%s", plugin.name, full_name, params)
try: try:
await getattr(plugin, full_name)(*params) await getattr(plugin, full_name)(*params)
except AssertionError as e: except Exception as e:
self.log.error( print(f"{plugin.name}::{full_name}({params}) failed:")
"Bug detected, please report on https://github.com/fdev31/pyprland/issues" if DEBUG:
) traceback.print_exc()
self.log.exception(e)
except Exception as e: # pylint: disable=W0718
self.log.warning(
"%s::%s(%s) failed:", plugin.name, full_name, params
)
self.log.exception(e)
async def read_events_loop(self): async def read_events_loop(self):
"Consumes the event loop and calls corresponding handlers"
while not self.stopped: while not self.stopped:
try:
data = (await self.event_reader.readline()).decode() data = (await self.event_reader.readline()).decode()
except UnicodeDecodeError:
self.log.error("Invalid unicode while reading events")
continue
if not data: if not data:
self.log.critical("Reader starved") print("Reader starved")
return return
cmd, params = data.split(">>", 1) cmd, params = data.split(">>")
full_name = f"event_{cmd}" full_name = f"event_{cmd}"
if DEBUG:
print(f"EVT {full_name}({params.strip()})")
await self._callHandler(full_name, params) await self._callHandler(full_name, params)
async def read_command(self, reader, writer) -> None: async def read_command(self, reader, writer) -> None:
"Receives a socket command"
data = (await reader.readline()).decode() data = (await reader.readline()).decode()
if not data: if not data:
self.log.critical("Server starved") print("Server starved")
return return
if data == "exit\n": if data == "exit\n":
self.stopped = True self.stopped = True
@ -121,16 +85,13 @@ class Pyprland:
args = args[1:] args = args[1:]
full_name = f"run_{cmd}" full_name = f"run_{cmd}"
# Demos:
# run mako for notifications & uncomment this
# os.system(f"notify-send '{data}'")
self.log.debug("CMD: %s(%s)", full_name, args) if DEBUG:
print(f"CMD: {full_name}({args})")
await self._callHandler(full_name, *args) await self._callHandler(full_name, *args)
async def serve(self): async def serve(self):
"Runs the server"
try: try:
async with self.server: async with self.server:
await self.server.serve_forever() await self.server.serve_forever()
@ -138,7 +99,6 @@ class Pyprland:
await asyncio.gather(*(plugin.exit() for plugin in self.plugins.values())) await asyncio.gather(*(plugin.exit() for plugin in self.plugins.values()))
async def run(self): async def run(self):
"Runs the server and the event listener"
await asyncio.gather( await asyncio.gather(
asyncio.create_task(self.serve()), asyncio.create_task(self.serve()),
asyncio.create_task(self.read_events_loop()), asyncio.create_task(self.read_events_loop()),
@ -148,42 +108,25 @@ class Pyprland:
async def run_daemon(): async def run_daemon():
"Runs the server / daemon"
manager = Pyprland() manager = Pyprland()
err_count = itertools.count()
manager.server = await asyncio.start_unix_server(manager.read_command, CONTROL) manager.server = await asyncio.start_unix_server(manager.read_command, CONTROL)
max_retry = 10
while True:
attempt = next(err_count)
try:
events_reader, events_writer = await get_event_stream() events_reader, events_writer = await get_event_stream()
except Exception as e: # pylint: disable=W0718
if attempt > max_retry:
manager.log.critical("Failed to open hyprland event stream: %s.", e)
raise PyprError() from e
manager.log.warning(
"Failed to get event stream: %s}, retry %s/%s...", e, attempt, max_retry
)
await asyncio.sleep(1)
else:
break
manager.event_reader = events_reader manager.event_reader = events_reader
try: try:
await manager.load_config() # ensure sockets are connected first await manager.load_config() # ensure sockets are connected first
except PyprError as e: except FileNotFoundError:
raise SystemExit(1) from e print(
except Exception as e: f"No config file found, create one at {CONFIG_FILE} with a valid pyprland.plugins list"
manager.log.critical("Failed to load config.", exc_info=True) )
raise SystemExit(1) from e raise SystemExit(1)
try: try:
await manager.run() await manager.run()
except KeyboardInterrupt: except KeyboardInterrupt:
print("Interrupted") print("Interrupted")
except asyncio.CancelledError: except asyncio.CancelledError:
manager.log.critical("cancelled") print("Bye!")
finally: finally:
events_writer.close() events_writer.close()
await events_writer.wait_closed() await events_writer.wait_closed()
@ -192,36 +135,20 @@ async def run_daemon():
async def run_client(): async def run_client():
"Runs the client (CLI)" if sys.argv[1] == "--help":
manager = Pyprland()
if sys.argv[1] in ("--help", "-h", "help"):
await manager.load_config(init=False)
print( print(
"""Syntax: pypr [command] """Commands:
reload
show <scratchpad name>
hide <scratchpad name>
toggle <scratchpad name>
If command is ommited, runs the daemon which will start every configured command. If arguments are ommited, runs the daemon which will start every configured command.
"""
Commands:
reload Reloads the config file (only supports adding or updating plugins)"""
) )
for plug in manager.plugins.values():
for name in dir(plug):
if name.startswith("run_"):
fn = getattr(plug, name)
if callable(fn):
print(
f" {name[4:]:20} {fn.__doc__.strip() if fn.__doc__ else 'N/A'} (from {plug.name})"
)
return return
try:
_, writer = await asyncio.open_unix_connection(CONTROL) _, writer = await asyncio.open_unix_connection(CONTROL)
except FileNotFoundError as e:
manager.log.critical("Failed to open control socket, is pypr daemon running ?")
raise PyprError() from e
writer.write((" ".join(sys.argv[1:])).encode()) writer.write((" ".join(sys.argv[1:])).encode())
await writer.drain() await writer.drain()
writer.close() writer.close()
@ -229,25 +156,10 @@ Commands:
def main(): def main():
"runs the command"
if "--debug" in sys.argv:
i = sys.argv.index("--debug")
init_logger(filename=sys.argv[i + 1], force_debug=True)
del sys.argv[i : i + 2]
else:
init_logger()
ipc_init()
log = get_logger("startup")
try: try:
asyncio.run(run_daemon() if len(sys.argv) <= 1 else run_client()) asyncio.run(run_daemon() if len(sys.argv) <= 1 else run_client())
except KeyboardInterrupt: except KeyboardInterrupt:
pass pass
except PyprError:
log.critical("Command failed.")
except json.decoder.JSONDecodeError as e:
log.critical("Invalid JSON syntax in the config file: %s", e.args[0])
except Exception: # pylint: disable=W0718
log.critical("Unhandled exception:", exc_info=True)
if __name__ == "__main__": if __name__ == "__main__":

View file

@ -1,71 +1,4 @@
""" Shared utilities: logging """
import logging
import os import os
__all__ = ["DEBUG", "get_logger", "init_logger"]
DEBUG = os.environ.get("DEBUG", False) DEBUG = os.environ.get("DEBUG", False)
CONFIG_FILE = os.path.expanduser("~/.config/hypr/scratchpads.json")
class PyprError(Exception):
"""Used for errors which already triggered logging"""
class LogObjects:
"""Reusable objects for loggers"""
handlers: list[logging.Handler] = []
def init_logger(filename=None, force_debug=False):
"""initializes the logging system"""
global DEBUG
if force_debug:
DEBUG = True
class ScreenLogFormatter(logging.Formatter):
"A custom formatter, adding colors"
LOG_FORMAT = (
r"%(name)25s - %(message)s // %(filename)s:%(lineno)d"
if DEBUG
else r"%(message)s"
)
RESET_ANSI = "\x1b[0m"
FORMATTERS = {
logging.DEBUG: logging.Formatter(LOG_FORMAT + RESET_ANSI),
logging.INFO: logging.Formatter(LOG_FORMAT + RESET_ANSI),
logging.WARNING: logging.Formatter("\x1b[33;20m" + LOG_FORMAT + RESET_ANSI),
logging.ERROR: logging.Formatter("\x1b[31;20m" + LOG_FORMAT + RESET_ANSI),
logging.CRITICAL: logging.Formatter("\x1b[31;1m" + LOG_FORMAT + RESET_ANSI),
}
def format(self, record):
return self.FORMATTERS[record.levelno].format(record)
logging.basicConfig()
if filename:
file_handler = logging.FileHandler(filename)
file_handler.setFormatter(
logging.Formatter(
fmt=r"%(asctime)s [%(levelname)s] %(name)s :: %(message)s :: %(filename)s:%(lineno)d"
)
)
LogObjects.handlers.append(file_handler)
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(ScreenLogFormatter())
LogObjects.handlers.append(stream_handler)
def get_logger(name="pypr", level=None):
"Returns a logger for `name`"
logger = logging.getLogger(name)
if level is None:
logger.setLevel(logging.DEBUG if DEBUG else logging.WARNING)
else:
logger.setLevel(level)
logger.propagate = False
for handler in LogObjects.handlers:
logger.addHandler(handler)
logger.info("Logger initialized for %s", name)
return logger

View file

@ -1,33 +1,25 @@
#!/bin/env python #!/bin/env python
""" Interact with hyprland using sockets """
import asyncio import asyncio
from typing import Any
import json import json
import os import os
from logging import Logger
from typing import Any
from .common import PyprError, get_logger from .common import DEBUG
log: Logger | None = None
HYPRCTL = f'/tmp/hypr/{ os.environ["HYPRLAND_INSTANCE_SIGNATURE"] }/.socket.sock' HYPRCTL = f'/tmp/hypr/{ os.environ["HYPRLAND_INSTANCE_SIGNATURE"] }/.socket.sock'
EVENTS = f'/tmp/hypr/{ os.environ["HYPRLAND_INSTANCE_SIGNATURE"] }/.socket2.sock' EVENTS = f'/tmp/hypr/{ os.environ["HYPRLAND_INSTANCE_SIGNATURE"] }/.socket2.sock'
async def get_event_stream(): async def get_event_stream():
"Returns a new event socket connection"
return await asyncio.open_unix_connection(EVENTS) return await asyncio.open_unix_connection(EVENTS)
async def hyprctlJSON(command) -> list[dict[str, Any]] | dict[str, Any]: async def hyprctlJSON(command) -> list[dict[str, Any]] | dict[str, Any]:
"""Run an IPC command and return the JSON output.""" """Run an IPC command and return the JSON output."""
assert log if DEBUG:
log.debug(command) print("(JS)>>>", command)
try:
ctl_reader, ctl_writer = await asyncio.open_unix_connection(HYPRCTL) ctl_reader, ctl_writer = await asyncio.open_unix_connection(HYPRCTL)
except FileNotFoundError as e:
log.critical("hyprctl socket not found! is it running ?")
raise PyprError() from e
ctl_writer.write(f"-j/{command}".encode()) ctl_writer.write(f"-j/{command}".encode())
await ctl_writer.drain() await ctl_writer.drain()
resp = await ctl_reader.read() resp = await ctl_reader.read()
@ -39,7 +31,6 @@ async def hyprctlJSON(command) -> list[dict[str, Any]] | dict[str, Any]:
def _format_command(command_list, default_base_command): def _format_command(command_list, default_base_command):
"helper function to format BATCH commands"
for command in command_list: for command in command_list:
if isinstance(command, str): if isinstance(command, str):
yield f"{default_base_command} {command}" yield f"{default_base_command} {command}"
@ -49,14 +40,9 @@ def _format_command(command_list, default_base_command):
async def hyprctl(command, base_command="dispatch") -> bool: async def hyprctl(command, base_command="dispatch") -> bool:
"""Run an IPC command. Returns success value.""" """Run an IPC command. Returns success value."""
assert log if DEBUG:
log.debug(command) print(">>>", command)
try:
ctl_reader, ctl_writer = await asyncio.open_unix_connection(HYPRCTL) ctl_reader, ctl_writer = await asyncio.open_unix_connection(HYPRCTL)
except FileNotFoundError as e:
log.critical("hyprctl socket not found! is it running ?")
raise PyprError() from e
if isinstance(command, list): if isinstance(command, list):
ctl_writer.write( ctl_writer.write(
f"[[BATCH]] {' ; '.join(_format_command(command, base_command))}".encode() f"[[BATCH]] {' ; '.join(_format_command(command, base_command))}".encode()
@ -67,22 +53,17 @@ async def hyprctl(command, base_command="dispatch") -> bool:
resp = await ctl_reader.read(100) resp = await ctl_reader.read(100)
ctl_writer.close() ctl_writer.close()
await ctl_writer.wait_closed() await ctl_writer.wait_closed()
if DEBUG:
print("<<<", resp)
r: bool = resp == b"ok" * (len(resp) // 2) r: bool = resp == b"ok" * (len(resp) // 2)
if not r: if DEBUG and not r:
log.error("FAILED %s", resp) print(f"FAILED {resp}")
return r return r
async def get_focused_monitor_props() -> dict[str, Any]: async def get_focused_monitor_props() -> dict[str, Any]:
"Returns focused monitor data"
for monitor in await hyprctlJSON("monitors"): for monitor in await hyprctlJSON("monitors"):
assert isinstance(monitor, dict) assert isinstance(monitor, dict)
if monitor.get("focused"): if monitor.get("focused") == True:
return monitor return monitor
raise RuntimeError("no focused monitor") raise RuntimeError("no focused monitor")
def init():
"initialize logging"
global log
log = get_logger("ipc")

View file

@ -1,8 +1,7 @@
" Plugin template "
from .interface import Plugin from .interface import Plugin
# from ..ipc import hyprctlJSON, hyprctl from ..ipc import hyprctlJSON, hyprctl
class Extension(Plugin): class Extension(Plugin):
"Sample plugin template" pass

View file

@ -1,54 +0,0 @@
""" expose Brings every client window to screen for selection
toggle_minimized allows having an "expose" like selection of minimized windows
"""
from typing import Any, cast
from ..ipc import hyprctl, hyprctlJSON
from .interface import Plugin
class Extension(Plugin): # pylint: disable=missing-class-docstring
exposed: list[dict] = []
async def run_toggle_minimized(self, special_workspace="minimized"):
"""[name] Toggles switching the focused window to the special workspace "name" (default: minimized)"""
aw = cast(dict, await hyprctlJSON("activewindow"))
wid = aw["workspace"]["id"]
assert isinstance(wid, int)
if wid < 1: # special workspace: unminimize
wrk = cast(dict, await hyprctlJSON("activeworkspace"))
await hyprctl(f"togglespecialworkspace {special_workspace}")
await hyprctl(f"movetoworkspacesilent {wrk['id']},address:{aw['address']}")
await hyprctl(f"focuswindow address:{aw['address']}")
else:
await hyprctl(
f"movetoworkspacesilent special:{special_workspace},address:{aw['address']}"
)
@property
def exposed_clients(self):
"Returns the list of clients currently using exposed mode"
if self.config.get("include_special", False):
return self.exposed
return [c for c in self.exposed if c["workspace"]["id"] > 0]
async def run_expose(self):
"""Expose every client on the active workspace.
If expose is active restores everything and move to the focused window"""
if self.exposed:
aw: dict[str, Any] = cast(dict, await hyprctlJSON("activewindow"))
focused_addr = aw["address"]
for client in self.exposed_clients:
await hyprctl(
f"movetoworkspacesilent {client['workspace']['id']},address:{client['address']}"
)
# await hyprctl("togglespecialworkspace exposed")
await hyprctl(f"focuswindow address:{focused_addr}")
self.exposed = []
else:
self.exposed = cast(list, await hyprctlJSON("clients"))
for client in self.exposed_clients:
await hyprctl(
f"movetoworkspacesilent special:exposed,address:{client['address']}"
)
await hyprctl("togglespecialworkspace exposed")

View file

@ -1,26 +1,17 @@
" Common plugin interface "
from typing import Any from typing import Any
from ..common import get_logger
class Plugin: class Plugin:
"Base plugin class, handles logger and config"
def __init__(self, name: str): def __init__(self, name: str):
"create a new plugin `name` and the matching logger"
self.name = name self.name = name
self.log = get_logger(name)
self.config: dict[str, Any] = {}
async def init(self): async def init(self):
"empty init function" pass
async def exit(self): async def exit(self):
"empty exit function" return
async def load_config(self, config: dict[str, Any]): async def load_config(self, config: dict[str, Any]):
"Loads the configuration section from the passed `config`"
try: try:
self.config = config[self.name] self.config = config[self.name]
except KeyError: except KeyError:

View file

@ -1,27 +0,0 @@
" Ironbar Plugin "
import os
import json
import asyncio
from .interface import Plugin
SOCKET = f"/run/user/{os.getuid()}/ironbar-ipc.sock"
async def ipcCall(**params):
ctl_reader, ctl_writer = await asyncio.open_unix_connection(SOCKET)
ctl_writer.write(json.dumps(params).encode("utf-8"))
await ctl_writer.drain()
ret = await ctl_reader.read()
ctl_writer.close()
await ctl_writer.wait_closed()
return json.loads(ret)
class Extension(Plugin):
"Toggles ironbar on/off"
is_visible = True
async def run_toggle_ironbar(self, bar_name: str):
self.is_visible = not self.is_visible
await ipcCall(type="set_visible", visible=self.is_visible, bar_name=bar_name)

View file

@ -1,45 +0,0 @@
" Moves unreachable client windows to the currently focused workspace"
from typing import Any, cast
from ..ipc import hyprctl, hyprctlJSON
from .interface import Plugin
def contains(monitor, window):
"Tell if a window is visible in a monitor"
if not (
window["at"][0] > monitor["x"]
and window["at"][0] < monitor["x"] + monitor["width"]
):
return False
if not (
window["at"][1] > monitor["y"]
and window["at"][1] < monitor["y"] + monitor["height"]
):
return False
return True
class Extension(Plugin): # pylint: disable=missing-class-docstring
async def run_attract_lost(self):
"""Brings lost floating windows to the current workspace"""
monitors = cast(list, await hyprctlJSON("monitors"))
windows = cast(list, await hyprctlJSON("clients"))
lost = [
win
for win in windows
if win["floating"] and not any(contains(mon, win) for mon in monitors)
]
focused: dict[str, Any] = [mon for mon in monitors if mon["focused"]][0]
interval = focused["width"] / (1 + len(lost))
interval_y = focused["height"] / (1 + len(lost))
batch = []
workspace: int = focused["activeWorkspace"]["id"]
margin = interval // 2
margin_y = interval_y // 2
for i, window in enumerate(lost):
pos_x = int(margin + focused["x"] + i * interval)
pos_y = {int(margin_y + focused["y"] + i * interval_y)}
batch.append(f'movetoworkspacesilent {workspace},pid:{window["pid"]}')
batch.append(f'movewindowpixel exact {pos_x} {pos_y},pid:{window["pid"]}')
await hyprctl(batch)

View file

@ -1,21 +0,0 @@
" Toggles workspace zooming "
from ..ipc import hyprctl
from .interface import Plugin
class Extension(Plugin): # pylint: disable=missing-class-docstring
zoomed = False
async def run_zoom(self, *args):
"""[factor] zooms to "factor" or toggles zoom level ommited"""
if args:
value = int(args[0])
await hyprctl(f"misc:cursor_zoom_factor {value}", "keyword")
self.zoomed = value != 1
else: # toggle
if self.zoomed:
await hyprctl("misc:cursor_zoom_factor 1", "keyword")
else:
fact = int(self.config.get("factor", 2))
await hyprctl(f"misc:cursor_zoom_factor {fact}", "keyword")
self.zoomed = not self.zoomed

View file

@ -1,111 +1,45 @@
" The monitors plugin " from typing import Any
from .interface import Plugin
import subprocess import subprocess
from typing import Any, cast
from ..ipc import hyprctlJSON from ..ipc import hyprctlJSON
from .interface import Plugin
def configure_monitors(monitors, screenid: str, pos_x: int, pos_y: int) -> None: class Extension(Plugin):
"Apply the configuration change" async def event_monitoradded(self, screenid):
x_offset = -pos_x if pos_x < 0 else 0 screenid = screenid.strip()
y_offset = -pos_y if pos_y < 0 else 0
min_x = pos_x
min_y = pos_y
command = ["wlr-randr"]
other_monitors = [mon for mon in monitors if mon["name"] != screenid]
for mon in other_monitors:
min_x = min(min_x, mon["x"])
min_y = min(min_y, mon["y"])
x_offset = -min_x
y_offset = -min_y
for mon in other_monitors:
command.extend(
[
"--output",
mon["name"],
"--pos",
f"{mon['x']+x_offset},{mon['y']+y_offset}",
]
)
command.extend(
["--output", screenid, "--pos", f"{pos_x+x_offset},{pos_y+y_offset}"]
)
subprocess.call(command)
class Extension(Plugin): # pylint: disable=missing-class-docstring
async def load_config(self, config) -> None:
await super().load_config(config)
await self.run_relayout()
async def run_relayout(self):
monitors = cast(list[dict], await hyprctlJSON("monitors"))
for monitor in monitors:
await self.event_monitoradded(
monitor["name"], no_default=True, monitors=monitors
)
async def event_monitoradded(
self, monitor_name, no_default=False, monitors: list | None = None
) -> None:
"Triggers when a monitor is plugged"
monitor_name = monitor_name.strip()
if not monitors:
monitors = cast(list, await hyprctlJSON("monitors"))
assert monitors
monitors: list[dict[str, Any]] = await hyprctlJSON("monitors")
for mon in monitors: for mon in monitors:
if mon["name"].startswith(monitor_name): if mon["name"].startswith(screenid):
mon_description = mon["description"] mon_name = mon["description"]
break break
else: else:
self.log.info("Monitor %s not found", monitor_name) print(f"Monitor {screenid} not found")
return return
if self._place_monitors(monitor_name, mon_description, monitors):
return
if not no_default:
default_command = self.config.get("unknown")
if default_command:
subprocess.call(default_command, shell=True)
def _place_monitors(
self, monitor_name: str, mon_description: str, monitors: list[dict[str, Any]]
):
"place a given monitor according to config"
mon_by_name = {m["name"]: m for m in monitors} mon_by_name = {m["name"]: m for m in monitors}
newmon = mon_by_name[monitor_name]
newmon = mon_by_name[screenid]
for mon_pattern, conf in self.config["placement"].items(): for mon_pattern, conf in self.config["placement"].items():
if mon_pattern in mon_description: if mon_pattern in mon_name:
for placement, other_mon_description in conf.items(): for placement, mon_name in conf.items():
try: ref = mon_by_name[mon_name]
ref = mon_by_name[other_mon_description]
except KeyError:
continue
if ref: if ref:
place = placement.lower() place = placement.lower()
x: int = 0
y: int = 0
if place == "topof": if place == "topof":
x = ref["x"] x: int = ref["x"]
y = ref["y"] - newmon["height"] y: int = ref["y"] - newmon["height"]
elif place == "bottomof": elif place == "bottomof":
x = ref["x"] x: int = ref["x"]
y = ref["y"] + ref["height"] y: int = ref["y"] + ref["height"]
elif place == "leftof": elif place == "leftof":
x = ref["x"] - newmon["width"] x: int = ref["x"] - newmon["width"]
y = ref["y"] y: int = ref["y"]
else: # rightof else: # rightof
x = ref["x"] + ref["width"] x: int = ref["x"] + ref["width"]
y = ref["y"] y: int = ref["y"]
subprocess.call(
configure_monitors(monitors, monitor_name, x, y) ["wlr-randr", "--output", screenid, "--pos", f"{x},{y}"]
return True )
return False

View file

@ -1,50 +1,42 @@
" Scratchpads addon "
import asyncio
import os
from itertools import count
import subprocess import subprocess
from typing import Any, cast from typing import Any
import logging import asyncio
from ..ipc import (
hyprctl,
hyprctlJSON,
get_focused_monitor_props,
)
import os
from ..ipc import get_focused_monitor_props, hyprctl, hyprctlJSON
from .interface import Plugin from .interface import Plugin
DEFAULT_MARGIN = 60 DEFAULT_MARGIN = 60
async def get_client_props_by_address(addr: str): async def get_client_props_by_pid(pid: int):
"Returns client properties given its address"
assert len(addr) > 2, "Client address is invalid"
for client in await hyprctlJSON("clients"): for client in await hyprctlJSON("clients"):
assert isinstance(client, dict) assert isinstance(client, dict)
if client.get("address") == addr: if client.get("pid") == pid:
return client return client
class Animations: class Animations:
"Animation store" @classmethod
async def fromtop(cls, monitor, client, client_uid, margin):
@staticmethod
async def fromtop(monitor, client, client_uid, margin):
"Slide from/to top"
scale = float(monitor["scale"])
mon_x = monitor["x"] mon_x = monitor["x"]
mon_y = monitor["y"] mon_y = monitor["y"]
mon_width = int(monitor["width"] / scale) mon_width = monitor["width"]
client_width = client["size"][0] client_width = client["size"][0]
margin_x = int((mon_width - client_width) / 2) + mon_x margin_x = int((mon_width - client_width) / 2) + mon_x
await hyprctl(f"movewindowpixel exact {margin_x} {mon_y + margin},{client_uid}") await hyprctl(f"movewindowpixel exact {margin_x} {mon_y + margin},{client_uid}")
@staticmethod @classmethod
async def frombottom(monitor, client, client_uid, margin): async def frombottom(cls, monitor, client, client_uid, margin):
"Slide from/to bottom"
scale = float(monitor["scale"])
mon_x = monitor["x"] mon_x = monitor["x"]
mon_y = monitor["y"] mon_y = monitor["y"]
mon_width = int(monitor["width"] / scale) mon_width = monitor["width"]
mon_height = int(monitor["height"] / scale) mon_height = monitor["height"]
client_width = client["size"][0] client_width = client["size"][0]
client_height = client["size"][1] client_height = client["size"][1]
@ -53,27 +45,23 @@ class Animations:
f"movewindowpixel exact {margin_x} {mon_y + mon_height - client_height - margin},{client_uid}" f"movewindowpixel exact {margin_x} {mon_y + mon_height - client_height - margin},{client_uid}"
) )
@staticmethod @classmethod
async def fromleft(monitor, client, client_uid, margin): async def fromleft(cls, monitor, client, client_uid, margin):
"Slide from/to left"
scale = float(monitor["scale"])
mon_x = monitor["x"] mon_x = monitor["x"]
mon_y = monitor["y"] mon_y = monitor["y"]
mon_height = int(monitor["height"] / scale) mon_height = monitor["height"]
client_height = client["size"][1] client_height = client["size"][1]
margin_y = int((mon_height - client_height) / 2) + mon_y margin_y = int((mon_height - client_height) / 2) + mon_y
await hyprctl(f"movewindowpixel exact {margin + mon_x} {margin_y},{client_uid}") await hyprctl(f"movewindowpixel exact {margin + mon_x} {margin_y},{client_uid}")
@staticmethod @classmethod
async def fromright(monitor, client, client_uid, margin): async def fromright(cls, monitor, client, client_uid, margin):
"Slide from/to right"
scale = float(monitor["scale"])
mon_x = monitor["x"] mon_x = monitor["x"]
mon_y = monitor["y"] mon_y = monitor["y"]
mon_width = int(monitor["width"] / scale) mon_width = monitor["width"]
mon_height = int(monitor["height"] / scale) mon_height = monitor["height"]
client_width = client["size"][0] client_width = client["size"][0]
client_height = client["size"][1] client_height = client["size"][1]
@ -84,75 +72,54 @@ class Animations:
class Scratch: class Scratch:
"A scratchpad state including configuration & client state"
log = logging.getLogger("scratch")
def __init__(self, uid, opts): def __init__(self, uid, opts):
self.uid = uid self.uid = uid
self.pid = 0 self.pid = 0
self.conf = opts self.conf = opts
self.visible = False self.visible = False
self.just_created = True self.just_created = True
self.client_info = {} self.clientInfo = {}
def isAlive(self) -> bool: def isAlive(self) -> bool:
"is the process running ?"
path = f"/proc/{self.pid}" path = f"/proc/{self.pid}"
if os.path.exists(path): if os.path.exists(path):
with open(os.path.join(path, "status"), "r", encoding="utf-8") as f: for line in open(os.path.join(path, "status"), "r").readlines():
for line in f.readlines():
if line.startswith("State"): if line.startswith("State"):
state = line.split()[1] state = line.split()[1]
return state not in "ZX" # not "Z (zombie)"or "X (dead)" return state in "RSDTt" # not "Z (zombie)"or "X (dead)"
return False return False
def reset(self, pid: int) -> None: def reset(self, pid: int) -> None:
"clear the object"
self.pid = pid self.pid = pid
self.visible = False self.visible = False
self.just_created = True self.just_created = True
self.client_info = {} self.clientInfo = {}
@property @property
def address(self) -> str: def address(self) -> str:
"Returns the client address" return str(self.clientInfo.get("address", ""))[2:]
return str(self.client_info.get("address", ""))[2:]
async def updateClientInfo(self, client_info=None) -> None: async def updateClientInfo(self, clientInfo=None) -> None:
"update the internal client info property, if not provided, refresh based on the current address" if clientInfo is None:
if client_info is None: clientInfo = await get_client_props_by_pid(self.pid)
client_info = await get_client_props_by_address("0x" + self.address) assert isinstance(clientInfo, dict)
try: self.clientInfo.update(clientInfo)
assert isinstance(client_info, dict)
except AssertionError as e:
self.log.error(
f"client_info of {self.address} must be a dict: {client_info}"
)
raise AssertionError(e) from e
self.client_info.update(client_info)
def __str__(self):
return f"{self.uid} {self.address} : {self.client_info} / {self.conf}"
class Extension(Plugin): # pylint: disable=missing-class-docstring class Extension(Plugin):
procs: dict[str, subprocess.Popen] = {} async def init(self) -> None:
scratches: dict[str, Scratch] = {} self.procs: dict[str, subprocess.Popen] = {}
transitioning_scratches: set[str] = set() self.scratches: dict[str, Scratch] = {}
_new_scratches: set[str] = set() self.transitioning_scratches: set[str] = set()
_respawned_scratches: set[str] = set() self._respawned_scratches: set[str] = set()
scratches_by_address: dict[str, Scratch] = {} self.scratches_by_address: dict[str, Scratch] = {}
scratches_by_pid: dict[int, Scratch] = {} self.scratches_by_pid: dict[int, Scratch] = {}
focused_window_tracking: dict[str, dict] = {}
async def exit(self) -> None: async def exit(self) -> None:
"exit hook"
async def die_in_piece(scratch: Scratch): async def die_in_piece(scratch: Scratch):
proc = self.procs[scratch.uid] proc = self.procs[scratch.uid]
proc.terminate() proc.terminate()
for _ in range(10): for n in range(10):
if not scratch.isAlive(): if not scratch.isAlive():
break break
await asyncio.sleep(0.1) await asyncio.sleep(0.1)
@ -164,10 +131,9 @@ class Extension(Plugin): # pylint: disable=missing-class-docstring
*(die_in_piece(scratch) for scratch in self.scratches.values()) *(die_in_piece(scratch) for scratch in self.scratches.values())
) )
async def load_config(self, config: dict[str, Any]) -> None: async def load_config(self, config) -> None:
"config loader" config: dict[str, dict[str, Any]] = config["scratchpads"]
my_config: dict[str, dict[str, Any]] = config["scratchpads"] scratches = {k: Scratch(k, v) for k, v in config.items()}
scratches = {k: Scratch(k, v) for k, v in my_config.items()}
new_scratches = set() new_scratches = set()
@ -180,215 +146,139 @@ class Extension(Plugin): # pylint: disable=missing-class-docstring
# not known yet # not known yet
for name in new_scratches: for name in new_scratches:
if not self.scratches[name].conf.get("lazy", False): self.start_scratch_command(name)
await self.start_scratch_command(name, is_new=True)
async def start_scratch_command(self, name: str, is_new=False) -> None: def start_scratch_command(self, name: str) -> None:
"spawns a given scratchpad's process"
if is_new:
self._new_scratches.add(name)
self._respawned_scratches.add(name) self._respawned_scratches.add(name)
scratch = self.scratches[name] scratch = self.scratches[name]
old_pid = self.procs[name].pid if name in self.procs else 0 old_pid = self.procs[name].pid if name in self.procs else 0
proc = subprocess.Popen( self.procs[name] = subprocess.Popen(
scratch.conf["command"], scratch.conf["command"],
stdin=subprocess.DEVNULL, stdin=subprocess.DEVNULL,
stdout=subprocess.DEVNULL, stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
shell=True, shell=True,
) )
self.procs[name] = proc pid = self.procs[name].pid
pid = proc.pid
self.scratches[name].reset(pid) self.scratches[name].reset(pid)
self.scratches_by_pid[pid] = scratch self.scratches_by_pid[self.procs[name].pid] = scratch
self.log.info(f"scratch {scratch.uid} has pid {pid}") if old_pid:
if old_pid and old_pid in self.scratches_by_pid:
del self.scratches_by_pid[old_pid] del self.scratches_by_pid[old_pid]
# Events # Events
async def event_activewindowv2(self, addr) -> None: async def event_activewindowv2(self, addr) -> None:
"active windows hook"
addr = addr.strip() addr = addr.strip()
scratch = self.scratches_by_address.get(addr) scratch = self.scratches_by_address.get(addr)
if scratch: if scratch:
if scratch.just_created: if scratch.just_created:
self.log.debug("Hiding just created scratch %s", scratch.uid)
await self.run_hide(scratch.uid, force=True) await self.run_hide(scratch.uid, force=True)
scratch.just_created = False scratch.just_created = False
else: else:
for uid, scratch in self.scratches.items(): for uid, scratch in self.scratches.items():
self.log.info((scratch.address, addr)) if scratch.clientInfo and scratch.address != addr:
if scratch.client_info and scratch.address != addr:
if ( if (
scratch.visible scratch.visible
and scratch.conf.get("unfocus") == "hide" and scratch.conf.get("unfocus") == "hide"
and scratch.uid not in self.transitioning_scratches and scratch.uid not in self.transitioning_scratches
): ):
self.log.debug("hide %s because another client is active", uid) await self.run_hide(uid)
await self.run_hide(uid, autohide=True)
async def _alternative_lookup(self):
"if class attribute is defined, use class matching and return True"
class_lookup_hack = [
self.scratches[name]
for name in self._respawned_scratches
if self.scratches[name].conf.get("class")
]
if not class_lookup_hack:
return False
self.log.debug("Lookup hack triggered")
# hack to update the client info from the provided class
for client in await hyprctlJSON("clients"):
assert isinstance(client, dict)
for pending_scratch in class_lookup_hack:
if pending_scratch.conf["class"] == client["class"]:
self.scratches_by_address[client["address"][2:]] = pending_scratch
self.log.debug("client class found: %s", client)
await pending_scratch.updateClientInfo(client)
return True
async def event_openwindow(self, params) -> None: async def event_openwindow(self, params) -> None:
"open windows hook" addr, wrkspc, kls, title = params.split(",", 3)
addr, wrkspc, _kls, _title = params.split(",", 3) if wrkspc.startswith("special"):
if self._respawned_scratches:
item = self.scratches_by_address.get(addr) item = self.scratches_by_address.get(addr)
if not item and self._respawned_scratches: if not item and self._respawned_scratches:
# hack for windows which aren't related to the process (see #8)
if not await self._alternative_lookup():
await self.updateScratchInfo() await self.updateScratchInfo()
item = self.scratches_by_address.get(addr) item = self.scratches_by_address.get(addr)
if item and item.just_created: if item and item.just_created:
if item.uid in self._new_scratches:
await self.run_hide(item.uid, force=True)
self._new_scratches.discard(item.uid)
self._respawned_scratches.discard(item.uid) self._respawned_scratches.discard(item.uid)
await self.run_hide(item.uid, force=True)
item.just_created = False item.just_created = False
async def run_toggle(self, uid: str) -> None: async def run_toggle(self, uid: str) -> None:
"""<name> toggles visibility of scratchpad "name" """
uid = uid.strip() uid = uid.strip()
item = self.scratches.get(uid) item = self.scratches.get(uid)
if not item: if not item:
self.log.warning("%s is not configured", uid) print(f"{uid} is not configured")
return return
self.log.debug("%s is visible = %s", uid, item.visible)
if item.visible: if item.visible:
await self.run_hide(uid) await self.run_hide(uid)
else: else:
await self.run_show(uid) await self.run_show(uid)
async def _anim_hide(self, animation_type, scratch): async def updateScratchInfo(self, scratch: Scratch | None = None) -> None:
"animate hiding a scratchpad" if scratch is None:
addr = "address:0x" + scratch.address
offset = scratch.conf.get("offset")
if offset is None:
if "size" not in scratch.client_info:
await self.updateScratchInfo(scratch)
offset = int(1.3 * scratch.client_info["size"][1])
if animation_type == "fromtop":
await hyprctl(f"movewindowpixel 0 -{offset},{addr}")
elif animation_type == "frombottom":
await hyprctl(f"movewindowpixel 0 {offset},{addr}")
elif animation_type == "fromleft":
await hyprctl(f"movewindowpixel -{offset} 0,{addr}")
elif animation_type == "fromright":
await hyprctl(f"movewindowpixel {offset} 0,{addr}")
if scratch.uid in self.transitioning_scratches:
return # abort sequence
await asyncio.sleep(0.2) # await for animation to finish
async def updateScratchInfo(self, orig_scratch: Scratch | None = None) -> None:
"""Update every scratchpads information if no `scratch` given,
else update a specific scratchpad info"""
pid = orig_scratch.pid if orig_scratch else None
for client in await hyprctlJSON("clients"): for client in await hyprctlJSON("clients"):
assert isinstance(client, dict) assert isinstance(client, dict)
if pid and pid != client["pid"]: pid = client["pid"]
continue assert isinstance(pid, int)
scratch = self.scratches_by_address.get(client["address"][2:]) scratch = self.scratches_by_pid.get(pid)
if not scratch:
scratch = self.scratches_by_pid.get(client["pid"])
if scratch:
self.scratches_by_address[client["address"][2:]] = scratch
if scratch: if scratch:
await scratch.updateClientInfo(client) await scratch.updateClientInfo(client)
self.scratches_by_address[
async def run_hide(self, uid: str, force=False, autohide=False) -> None: scratch.clientInfo["address"][2:]
"""<name> hides scratchpad "name" """ ] = scratch
uid = uid.strip() else:
scratch = self.scratches.get(uid) add_to_address_book = ("address" not in scratch.clientInfo) or (
if not scratch: scratch.address not in self.scratches_by_address
self.log.warning("%s is not configured", uid)
return
if not scratch.visible and not force:
self.log.warning("%s is already hidden", uid)
return
scratch.visible = False
if not scratch.isAlive():
await self.run_show(uid, force=True)
return
self.log.info("Hiding %s", uid)
addr = "address:0x" + scratch.address
animation_type: str = scratch.conf.get("animation", "").lower()
if animation_type:
await self._anim_hide(animation_type, scratch)
if uid not in self.transitioning_scratches:
await hyprctl(f"movetoworkspacesilent special:scratch_{uid},{addr}")
if (
animation_type and uid in self.focused_window_tracking
): # focus got lost when animating
if not autohide and "address" in self.focused_window_tracking[uid]:
await hyprctl(
f"focuswindow address:{self.focused_window_tracking[uid]['address']}"
) )
del self.focused_window_tracking[uid] await scratch.updateClientInfo()
if add_to_address_book:
self.scratches_by_address[scratch.clientInfo["address"][2:]] = scratch
async def ensure_alive(self, uid, item=None): async def run_hide(self, uid: str, force=False) -> None:
if item is None: uid = uid.strip()
item = self.scratches.get(uid) item = self.scratches.get(uid)
if not item:
print(f"{uid} is not configured")
return
if not item.visible and not force:
print(f"{uid} is already hidden")
return
item.visible = False
pid = "pid:%d" % item.pid
animation_type: str = item.conf.get("animation", "").lower()
if animation_type:
offset = item.conf.get("offset")
if offset is None:
if "size" not in item.clientInfo:
await self.updateScratchInfo(item)
if not item.isAlive(): offset = int(1.3 * item.clientInfo["size"][1])
self.log.info("%s is not running, restarting...", uid)
if uid in self.procs: if animation_type == "fromtop":
self.procs[uid].kill() await hyprctl(f"movewindowpixel 0 -{offset},{pid}")
if item.pid in self.scratches_by_pid: elif animation_type == "frombottom":
del self.scratches_by_pid[item.pid] await hyprctl(f"movewindowpixel 0 {offset},{pid}")
if item.address in self.scratches_by_address: elif animation_type == "fromleft":
del self.scratches_by_address[item.address] await hyprctl(f"movewindowpixel -{offset} 0,{pid}")
self.log.info(f"starting {uid}") elif animation_type == "fromright":
await self.start_scratch_command(uid) await hyprctl(f"movewindowpixel {offset} 0,{pid}")
self.log.info(f"{uid} started")
self.log.info("==> Wait for spawning") if uid in self.transitioning_scratches:
loop_count = count() return # abort sequence
while uid in self._respawned_scratches and next(loop_count) < 10: await asyncio.sleep(0.2) # await for animation to finish
await asyncio.sleep(0.05) if uid not in self.transitioning_scratches:
self.log.info(f"=> spawned {uid} as proc {item.pid}") await hyprctl(f"movetoworkspacesilent special:scratch,{pid}")
async def run_show(self, uid, force=False) -> None: async def run_show(self, uid, force=False) -> None:
"""<name> shows scratchpad "name" """
uid = uid.strip() uid = uid.strip()
item = self.scratches.get(uid) item = self.scratches.get(uid)
self.focused_window_tracking[uid] = cast(
dict[str, Any], await hyprctlJSON("activewindow")
)
if not item: if not item:
self.log.warning("%s is not configured", uid) print(f"{uid} is not configured")
return return
if item.visible and not force: if item.visible and not force:
self.log.warning("%s is already visible", uid) print(f"{uid} is already visible")
return return
self.log.info("Showing %s", uid) if not item.isAlive():
await self.ensure_alive(uid, item) print(f"{uid} is not running, restarting...")
self.procs[uid].kill()
self.start_scratch_command(uid)
while uid in self._respawned_scratches:
await asyncio.sleep(0.05)
item.visible = True item.visible = True
monitor = await get_focused_monitor_props() monitor = await get_focused_monitor_props()
@ -396,64 +286,20 @@ class Extension(Plugin): # pylint: disable=missing-class-docstring
await self.updateScratchInfo(item) await self.updateScratchInfo(item)
assert item.address, "No address !" pid = "pid:%d" % item.pid
addr = "address:0x" + item.address
animation_type = item.conf.get("animation", "").lower() animation_type = item.conf.get("animation", "").lower()
wrkspc = monitor["activeWorkspace"]["id"] wrkspc = monitor["activeWorkspace"]["id"]
self.transitioning_scratches.add(uid) self.transitioning_scratches.add(uid)
await hyprctl(f"moveworkspacetomonitor special:scratch_{uid} {monitor['name']}") await hyprctl(f"moveworkspacetomonitor special:scratch {monitor['name']}")
await hyprctl(f"movetoworkspacesilent {wrkspc},{addr}") await hyprctl(f"movetoworkspacesilent {wrkspc},{pid}")
if animation_type: if animation_type:
margin = item.conf.get("margin", DEFAULT_MARGIN) margin = item.conf.get("margin", DEFAULT_MARGIN)
fn = getattr(Animations, animation_type) fn = getattr(Animations, animation_type)
await fn(monitor, item.client_info, addr, margin) await fn(monitor, item.clientInfo, pid, margin)
await hyprctl(f"focuswindow {addr}")
size = item.conf.get("size")
if size:
x_size, y_size = self._convert_coords(size, monitor)
await hyprctl(f"resizewindowpixel exact {x_size} {y_size},{addr}")
position = item.conf.get("position")
if position:
x_pos, y_pos = self._convert_coords(position, monitor)
x_pos_abs, y_pos_abs = x_pos + monitor["x"], y_pos + monitor["y"]
await hyprctl(f"movewindowpixel exact {x_pos_abs} {y_pos_abs},{addr}")
await hyprctl(f"focuswindow {pid}")
await asyncio.sleep(0.2) # ensure some time for events to propagate await asyncio.sleep(0.2) # ensure some time for events to propagate
self.transitioning_scratches.discard(uid) self.transitioning_scratches.discard(uid)
def _convert_coords(self, coords, monitor):
"""
Converts a string like "X Y" to coordinates relative to monitor
Supported formats for X, Y:
- Percentage: "V%". V in [0; 100]
Example:
"10% 20%", monitor 800x600 => 80, 120
"""
assert coords, "coords must be non null"
def convert(s, dim):
if s[-1] == "%":
p = int(s[:-1])
if p < 0 or p > 100:
raise Exception(f"Percentage must be in range [0; 100], got {p}")
scale = float(monitor["scale"])
return int(monitor[dim] / scale * p / 100)
else:
raise Exception(f"Unsupported format for dimension {dim} size, got {s}")
try:
x_str, y_str = coords.split()
return convert(x_str, "width"), convert(y_str, "height")
except Exception as e:
self.log.error(f"Failed to read coordinates: {e}")
raise e

View file

@ -1,33 +0,0 @@
" shift workspaces across monitors "
from typing import cast
from ..ipc import hyprctl, hyprctlJSON
from .interface import Plugin
class Extension(Plugin): # pylint: disable=missing-class-docstring
monitors: list[str] = []
async def init(self):
self.monitors: list[str] = [
mon["name"] for mon in cast(list[dict], await hyprctlJSON("monitors"))
]
async def run_shift_monitors(self, arg: str):
"""Swaps monitors' workspaces in the given direction"""
direction: int = int(arg)
if direction > 0:
mon_list = self.monitors[:-1]
else:
mon_list = list(reversed(self.monitors[1:]))
for i, mon in enumerate(mon_list):
await hyprctl(f"swapactiveworkspaces {mon} {self.monitors[i+direction]}")
async def event_monitoradded(self, monitor):
"keep track of monitors"
self.monitors.append(monitor.strip())
async def event_monitorremoved(self, monitor):
"keep track of monitors"
self.monitors.remove(monitor.strip())

View file

@ -1,16 +0,0 @@
" Toggle monitors on or off "
from typing import Any, cast
from ..ipc import hyprctl, hyprctlJSON
from .interface import Plugin
class Extension(Plugin): # pylint: disable=missing-class-docstring
async def run_toggle_dpms(self):
"""toggles dpms on/off for every monitor"""
monitors = cast(list[dict[str, Any]], await hyprctlJSON("monitors"))
powered_off = any(m["dpmsStatus"] for m in monitors)
if not powered_off:
await hyprctl("dpms on")
else:
await hyprctl("dpms off")

View file

@ -1,43 +1,36 @@
""" Force workspaces to follow the focus / mouse """ import asyncio
from typing import cast
from ..ipc import hyprctl, hyprctlJSON
from .interface import Plugin from .interface import Plugin
from ..ipc import hyprctlJSON, hyprctl
class Extension(Plugin): # pylint: disable=missing-class-docstring
workspace_list: list[int] = []
class Extension(Plugin):
async def load_config(self, config): async def load_config(self, config):
"loads the config"
await super().load_config(config) await super().load_config(config)
self.workspace_list = list(range(1, self.config.get("max_workspaces", 10) + 1)) self.workspace_list = list(range(1, self.config.get("max_workspaces", 10)))
async def event_focusedmon(self, screenid_index): async def event_focusedmon(self, screenid_index):
"reacts to monitor changes"
monitor_id, workspace_id = screenid_index.split(",") monitor_id, workspace_id = screenid_index.split(",")
workspace_id = int(workspace_id) workspace_id = int(workspace_id)
# move every free workspace to the currently focused desktop # move every free workspace to the currently focused desktop
busy_workspaces = set( busy_workspaces = set(
mon["activeWorkspace"]["id"] mon["activeWorkspace"]["id"]
for mon in cast(list[dict], await hyprctlJSON("monitors")) for mon in await hyprctlJSON("monitors")
if mon["name"] != monitor_id if mon["name"] != monitor_id
) )
workspaces = [ workspaces = [w["id"] for w in await hyprctlJSON("workspaces") if w["id"] > 0]
w["id"]
for w in cast(list[dict], await hyprctlJSON("workspaces"))
if w["id"] > 0
]
batch: list[str | list[str]] = [] batch: list[str | list[str]] = [["animations:enabled false", "keyword"]]
for n in workspaces: for n in workspaces:
if n in busy_workspaces or n == workspace_id: if n in busy_workspaces or n == workspace_id:
continue continue
batch.append(f"moveworkspacetomonitor {n} {monitor_id}") batch.append(f"moveworkspacetomonitor {n} {monitor_id}")
batch.append(f"workspace {workspace_id}")
await hyprctl(batch) await hyprctl(batch)
await asyncio.sleep(0.05)
await hyprctl("animations:enabled true", base_command="keyword")
async def run_change_workspace(self, direction: str): async def run_change_workspace(self, direction: str):
"""<+1/-1> Switch workspaces of current monitor, avoiding displayed workspaces"""
increment = int(direction) increment = int(direction)
# get focused screen info # get focused screen info
monitors = await hyprctlJSON("monitors") monitors = await hyprctlJSON("monitors")
@ -45,22 +38,19 @@ class Extension(Plugin): # pylint: disable=missing-class-docstring
for monitor in monitors: for monitor in monitors:
if monitor["focused"]: if monitor["focused"]:
break break
else:
self.log.error("Can not find a focused monitor")
return
assert isinstance(monitor, dict) assert isinstance(monitor, dict)
busy_workspaces = set( busy_workspaces = set(
m["activeWorkspace"]["id"] for m in monitors if m["id"] != monitor["id"] m["activeWorkspace"]["id"] for m in monitors if m["id"] != monitor["id"]
) )
# get workspaces info
workspaces = await hyprctlJSON("workspaces")
assert isinstance(workspaces, list)
workspaces.sort(key=lambda x: x["id"])
cur_workspace = monitor["activeWorkspace"]["id"] cur_workspace = monitor["activeWorkspace"]["id"]
available_workspaces = [ available_workspaces = [
i for i in self.workspace_list if i not in busy_workspaces i for i in self.workspace_list if i not in busy_workspaces
] ]
try:
idx = available_workspaces.index(cur_workspace) idx = available_workspaces.index(cur_workspace)
except ValueError:
next_workspace = available_workspaces[0 if increment > 0 else -1]
else:
next_workspace = available_workspaces[ next_workspace = available_workspaces[
(idx + increment) % len(available_workspaces) (idx + increment) % len(available_workspaces)
] ]

View file

@ -1,12 +1,12 @@
[tool.poetry] [tool.poetry]
name = "pyprland" name = "pyprland"
version = "1.4.1" version = "1.0.2"
description = "An hyperland plugin system" description = "An hyperland plugin system"
authors = ["fdev31 <fdev31@gmail.com>"] authors = ["fdev31 <fdev31@gmail.com>"]
license = "MIT" license = "MIT"
readme = "README.md" readme = "README.md"
packages = [{include = "pyprland"}] packages = [{include = "pyprland"}]
homepage = "https://github.com/hyprland-community/pyprland/" homepage = "https://github.com/fdev31/pyprland/"
[tool.poetry.scripts] [tool.poetry.scripts]
pypr = "pyprland.command:main" pypr = "pyprland.command:main"