blueprint-compiler/blueprintcompiler/lsp.py
James Westman 544d152fb6
Rename to blueprint-compiler
This isn't an official GTK project so better to avoid using "GTK" in the
name.
2021-12-01 15:35:58 -06:00

277 lines
9.4 KiB
Python

# lsp.py
#
# Copyright 2021 James Westman <james@jwestman.net>
#
# This file is free software; you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as
# published by the Free Software Foundation; either version 3 of the
# License, or (at your option) any later version.
#
# This file is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# SPDX-License-Identifier: LGPL-3.0-or-later
import typing as T
import json, sys, traceback
from .completions import complete
from .errors import PrintableError, CompileError, MultipleErrors
from .lsp_utils import *
from . import tokenizer, parser, utils, xml_reader
def command(json_method):
def decorator(func):
func._json_method = json_method
return func
return decorator
class OpenFile:
def __init__(self, uri, text, version):
self.uri = uri
self.text = text
self.version = version
self.ast = None
self.tokens = None
self._update()
def apply_changes(self, changes):
for change in changes:
start = utils.pos_to_idx(change["range"]["start"]["line"], change["range"]["start"]["character"], self.text)
end = utils.pos_to_idx(change["range"]["end"]["line"], change["range"]["end"]["character"], self.text)
self.text = self.text[:start] + change["text"] + self.text[end:]
self._update()
def _update(self):
self.diagnostics = []
try:
self.tokens = tokenizer.tokenize(self.text)
self.ast, errors = parser.parse(self.tokens)
if errors is not None:
self.diagnostics += errors.errors
self.diagnostics += self.ast.errors
except MultipleErrors as e:
self.diagnostics += e.errors
except CompileError as e:
self.diagnostics.append(e)
def calc_semantic_tokens(self) -> T.List[int]:
tokens = list(self.ast.get_semantic_tokens())
token_lists = [
[
*utils.idx_to_pos(token.start, self.text), # line and column
token.end - token.start, # length
token.type,
0, # token modifiers
] for token in tokens]
# convert line, column numbers to deltas
for i, token_list in enumerate(token_lists[1:]):
token_list[0] -= token_lists[i][0]
if token_list[0] == 0:
token_list[1] -= token_lists[i][1]
# flatten the list
return [x for y in token_lists for x in y]
class LanguageServer:
commands: T.Dict[str, T.Callable] = {}
def __init__(self, logfile=None):
self.client_capabilities = {}
self._open_files: {str: OpenFile} = {}
self.logfile = logfile
def run(self):
# Read <doc> tags from gir files. During normal compilation these are
# ignored.
xml_reader.PARSE_GIR.add("doc")
try:
while True:
line = ""
content_len = -1
while content_len == -1 or (line != "\n" and line != "\r\n"):
line = sys.stdin.readline()
if line == "":
return
if line.startswith("Content-Length:"):
content_len = int(line.split("Content-Length:")[1].strip())
line = sys.stdin.read(content_len)
self._log("input: " + line)
data = json.loads(line)
method = data.get("method")
id = data.get("id")
params = data.get("params")
if method in self.commands:
self.commands[method](self, id, params)
except Exception as e:
self._log(traceback.format_exc())
def _send(self, data):
data["jsonrpc"] = "2.0"
line = json.dumps(data, separators=(",", ":")) + "\r\n"
self._log("output: " + line)
sys.stdout.write(f"Content-Length: {len(line)}\r\nContent-Type: application/vscode-jsonrpc; charset=utf-8\r\n\r\n{line}")
sys.stdout.flush()
def _log(self, msg):
if self.logfile is not None:
self.logfile.write(str(msg))
self.logfile.write("\n")
self.logfile.flush()
def _send_response(self, id, result):
self._send({
"id": id,
"result": result,
})
def _send_notification(self, method, params):
self._send({
"method": method,
"params": params,
})
@command("initialize")
def initialize(self, id, params):
self.client_capabilities = params.get("capabilities")
self._send_response(id, {
"capabilities": {
"textDocumentSync": {
"openClose": True,
"change": TextDocumentSyncKind.Incremental,
},
"semanticTokensProvider": {
"legend": {
"tokenTypes": ["enumMember"],
},
"full": True,
},
"completionProvider": {},
"codeActionProvider": {},
"hoverProvider": True,
}
})
@command("textDocument/didOpen")
def didOpen(self, id, params):
doc = params.get("textDocument")
uri = doc.get("uri")
version = doc.get("version")
text = doc.get("text")
open_file = OpenFile(uri, text, version)
self._open_files[uri] = open_file
self._send_file_updates(open_file)
@command("textDocument/didChange")
def didChange(self, id, params):
if params is not None:
open_file = self._open_files[params["textDocument"]["uri"]]
open_file.apply_changes(params["contentChanges"])
self._send_file_updates(open_file)
@command("textDocument/didClose")
def didClose(self, id, params):
del self._open_files[params["textDocument"]["uri"]]
@command("textDocument/hover")
def hover(self, id, params):
open_file = self._open_files[params["textDocument"]["uri"]]
docs = open_file.ast and open_file.ast.get_docs(utils.pos_to_idx(params["position"]["line"], params["position"]["character"], open_file.text))
if docs:
self._send_response(id, {
"contents": {
"kind": "markdown",
"value": docs,
}
})
else:
self._send_response(id, None)
@command("textDocument/completion")
def completion(self, id, params):
open_file = self._open_files[params["textDocument"]["uri"]]
if open_file.ast is None:
self._send_response(id, [])
return
idx = utils.pos_to_idx(params["position"]["line"], params["position"]["character"], open_file.text)
completions = complete(open_file.ast, open_file.tokens, idx)
self._send_response(id, [completion.to_json(True) for completion in completions])
@command("textDocument/semanticTokens/full")
def semantic_tokens(self, id, params):
open_file = self._open_files[params["textDocument"]["uri"]]
self._send_response(id, {
"data": open_file.calc_semantic_tokens(),
})
@command("textDocument/codeAction")
def code_actions(self, id, params):
open_file = self._open_files[params["textDocument"]["uri"]]
range_start = utils.pos_to_idx(params["range"]["start"]["line"], params["range"]["start"]["character"], open_file.text)
range_end = utils.pos_to_idx(params["range"]["end"]["line"], params["range"]["end"]["character"], open_file.text)
actions = [
{
"title": action.title,
"kind": "quickfix",
"diagnostics": [self._create_diagnostic(open_file.text, diagnostic)],
"edit": {
"changes": {
open_file.uri: [{
"range": utils.idxs_to_range(diagnostic.start, diagnostic.end, open_file.text),
"newText": action.replace_with
}]
}
}
}
for diagnostic in open_file.diagnostics
if not (diagnostic.end < range_start or diagnostic.start > range_end)
for action in diagnostic.actions
]
self._send_response(id, actions)
def _send_file_updates(self, open_file: OpenFile):
self._send_notification("textDocument/publishDiagnostics", {
"uri": open_file.uri,
"diagnostics": [self._create_diagnostic(open_file.text, err) for err in open_file.diagnostics],
})
def _create_diagnostic(self, text, err):
return {
"range": utils.idxs_to_range(err.start, err.end, text),
"message": err.message,
"severity": 1,
}
for name in dir(LanguageServer):
item = getattr(LanguageServer, name)
if callable(item) and hasattr(item, "_json_method"):
LanguageServer.commands[item._json_method] = item