Skip to content

Commit

Permalink
💥 simple Router & impl special option by router
Browse files Browse the repository at this point in the history
  • Loading branch information
RF-Tar-Railt committed Oct 17, 2024
1 parent 0279d03 commit dbeeb66
Show file tree
Hide file tree
Showing 19 changed files with 553 additions and 521 deletions.
14 changes: 7 additions & 7 deletions devtool.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def analyse_args(
command: list[str | Any],
raise_exception: bool = True,
context_style: Literal["bracket", "parentheses"] | None = None,
**kwargs
**kwargs,
):
meta = CommandMeta(keep_crlf=False, fuzzy_match=False, raise_exception=raise_exception, context_style=context_style)
argv: Argv[DataCollection] = Argv(meta, dev_space)
Expand All @@ -66,7 +66,7 @@ def analyse_header(
compact: bool = False,
raise_exception: bool = True,
context_style: Literal["bracket", "parentheses"] | None = None,
**kwargs
**kwargs,
):
meta = CommandMeta(keep_crlf=False, fuzzy_match=False, raise_exception=raise_exception, context_style=context_style)
argv: Argv[DataCollection] = Argv(meta, dev_space, separators=sep)
Expand All @@ -86,7 +86,7 @@ def analyse_option(
command: DataCollection[str | Any],
raise_exception: bool = True,
context_style: Literal["bracket", "parentheses"] | None = None,
**kwargs
**kwargs,
):
meta = CommandMeta(keep_crlf=False, fuzzy_match=False, raise_exception=raise_exception, context_style=context_style)
argv: Argv[DataCollection] = Argv(meta, dev_space)
Expand All @@ -95,12 +95,12 @@ def analyse_option(
_analyser.command.separators = " "
_analyser.need_main_args = False
_analyser.command.options.append(option)
default_compiler(_analyser, argv.param_ids)
default_compiler(_analyser)
_analyser.command.options.clear()
try:
argv.enter(kwargs)
argv.build(command)
alo(_analyser, argv, option)
alo(_analyser, argv, option, False)
return _analyser.options_result[option.dest]
except Exception as e:
if raise_exception:
Expand All @@ -113,7 +113,7 @@ def analyse_subcommand(
command: DataCollection[str | Any],
raise_exception: bool = True,
context_style: Literal["bracket", "parentheses"] | None = None,
**kwargs
**kwargs,
):
meta = CommandMeta(keep_crlf=False, fuzzy_match=False, raise_exception=raise_exception, context_style=context_style)
argv: Argv[DataCollection] = Argv(meta, dev_space)
Expand All @@ -122,7 +122,7 @@ def analyse_subcommand(
_analyser.command.separators = " "
_analyser.need_main_args = False
_analyser.command.options.append(subcommand)
default_compiler(_analyser, argv.param_ids)
default_compiler(_analyser)
_analyser.command.options.clear()
try:
argv.enter(kwargs)
Expand Down
109 changes: 53 additions & 56 deletions src/arclet/alconna/_internal/_analyser.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@
from ..action import Action
from ..args import Args
from ..arparma import Arparma
from ..base import Completion, Help, Option, Shortcut, Subcommand
from ..completion import comp_ctx
from ..base import Option, Subcommand
from ..completion import comp_ctx, prompt
from ..exceptions import (
ArgumentMissing,
AnalyseException,
FuzzyMatchSuccess,
InvalidHeader,
InvalidParam,
Expand All @@ -27,7 +28,6 @@
analyse_args,
analyse_param,
handle_opt_default,
prompt,
)
from ._util import levenshtein

Expand All @@ -43,7 +43,7 @@ def default_compiler(analyser: SubAnalyser):
analyser (SubAnalyser): 任意子解析器
"""
for opts in analyser.command.options:
if isinstance(opts, Option) and not isinstance(opts, (Help, Shortcut, Completion)):
if isinstance(opts, Option):
if opts.compact or opts.action.type == 2 or not set(analyser.command.separators).issuperset(opts.separators): # noqa: E501
analyser.compact_params.append(opts)
for alias in opts.aliases:
Expand Down Expand Up @@ -153,48 +153,34 @@ def process(self, argv: Argv[TDC], name_validated: bool = True) -> Self:
ParamsUnmatched: 名称不匹配
FuzzyMatchSuccess: 模糊匹配成功
"""
sub = argv.current_node = self.command
sub = self.command
if not name_validated:
name, _ = argv.next(sub.separators)
if name not in sub.aliases:
argv.rollback(name)
if not argv.fuzzy_match:
raise InvalidParam(lang.require("subcommand", "name_error").format(source=sub.dest, target=name))
raise InvalidParam(lang.require("subcommand", "name_error").format(source=sub.dest, target=name), sub)
for al in sub.aliases:
if levenshtein(name, al) >= argv.fuzzy_threshold:
raise FuzzyMatchSuccess(lang.require("fuzzy", "matched").format(source=al, target=name))
raise InvalidParam(lang.require("subcommand", "name_error").format(source=sub.dest, target=name))
raise FuzzyMatchSuccess(lang.require("fuzzy", "matched").format(source=al, target=name), sub)
raise InvalidParam(lang.require("subcommand", "name_error").format(source=sub.dest, target=name), sub)

self.value_result = sub.action.value
argv.stack_params.enter(self.compile_params)
while analyse_param(self, argv, self.command.separators):
while analyse_param(self, argv, self.command.separators) and argv.current_index != argv.ndata:
pass
if self.default_main_only and not self.args_result:
self.args_result = analyse_args(argv, self.self_args)
if not self.args_result and self.need_main_args:
raise ArgumentMissing(
self.self_args.argument[0].field.get_missing_tips(
lang.require("subcommand", "args_missing").format(name=self.command.dest)
)
),
sub
)
argv.stack_params.leave()
return self

def get_sub_analyser(self, target: Subcommand) -> SubAnalyser | None:
"""获取子解析器
Args:
target (Subcommand): 目标子命令
Returns:
SubAnalyser[TDC] | None: 子解析器
"""
if target == self.command:
return self
for param in self.compile_params.values():
if isinstance(param, SubAnalyser):
return param.get_sub_analyser(target)


class Analyser(SubAnalyser):
"""命令解析器"""
Expand All @@ -215,7 +201,7 @@ def __init__(self, alconna: Alconna, compiler: TCompile | None = None):
def compile(self):
self.extra_allow = not self.command.meta.strict or not self.command.namespace_config.strict
self._compiler(self)
command_manager.resolve(self.command).stack_params.enter(self.compile_params)
command_manager.resolve(self.command).stack_params.base = self.compile_params
return self

def __repr__(self):
Expand All @@ -242,39 +228,43 @@ def process(self, argv: Argv[TDC], name_validated: bool = True) -> Exception | N
pass
except FuzzyMatchSuccess as e:
return e
# except SpecialOptionTriggered as sot:
# return _SPECIAL[sot.args[0]](self, argv)
except (InvalidParam, ArgumentMissing) as e1:
# if (rest := argv.release()) and isinstance(rest[-1], str):
# if rest[-1] in argv.completion_names and "completion" not in argv.namespace.disable_builtin_options:
# argv.bak_data[-1] = argv.bak_data[-1][: -len(rest[-1])].rstrip()
# return handle_completion(self, argv)
# if (handler := argv.special.get(rest[-1])) and handler not in argv.namespace.disable_builtin_options:
# return _SPECIAL[handler](self, argv)
if comp_ctx.get(None):
if isinstance(e1, InvalidParam):
argv.free(argv.current_node.separators if argv.current_node else None)
return PauseTriggered(prompt(self, argv), e1, argv)
argv.free(e1.context_node.separators if e1.context_node else None)
return PauseTriggered(
prompt(self.command, argv, [*self.args_result.keys()], [*self.options_result.keys(), *self.subcommands_result.keys()], e1.context_node),
e1,
argv
)
return e1

if self.default_main_only and not self.args_result:
self.args_result = analyse_args(argv, self.self_args)
try:
self.args_result = analyse_args(argv, self.self_args)
except FuzzyMatchSuccess as e1:
return e1
except AnalyseException as e2:
e2.context_node = None
if not argv.error:
argv.error = e2

if argv.current_index == argv.ndata and (not self.need_main_args or self.args_result):
return

rest = argv.release()
if len(rest) > 0:
# if isinstance(rest[-1], str) and rest[-1] in argv.completion_names:
# argv.bak_data[-1] = argv.bak_data[-1][: -len(rest[-1])].rstrip()
# return handle_completion(self, argv, rest[-2])
exc = ParamsUnmatched(lang.require("analyser", "param_unmatched").format(target=argv.next(move=False)[0]))
exc = ParamsUnmatched(lang.require("analyser", "param_unmatched").format(target=argv.next()[0]))
else:
exc = ArgumentMissing(
self.self_args.argument[0].field.get_missing_tips(lang.require("analyser", "param_missing"))
)
if comp_ctx.get(None) and isinstance(exc, ArgumentMissing):
return PauseTriggered(prompt(self, argv), exc, argv)
if comp_ctx.get(None):
return PauseTriggered(
prompt(self.command, argv, [*self.args_result.keys()], [*self.options_result.keys(), *self.subcommands_result.keys()]),
exc,
argv
)
return exc

def export(
Expand All @@ -290,23 +280,30 @@ def export(
fail (bool, optional): 是否解析失败. Defaults to False.
exception (Exception | None, optional): 解析失败时的异常. Defaults to None.
"""
if argv.error:
fail = True
exception = argv.error
result = Arparma(self.command._hash, argv.origin, not fail, self.header_result, ctx=argv.exit())
if fail:
if self.command.meta.raise_exception and not isinstance(exception, FuzzyMatchSuccess):
raise exception
result.error_info = exception
result.error_data = argv.release()
else:
if self.default_opt_result:
handle_opt_default(self.default_opt_result, self.options_result)
if self.default_sub_result:
for k, v in self.default_sub_result.items():
if k not in self.subcommands_result:
self.subcommands_result[k] = v
result.main_args = self.args_result
result.options = self.options_result
result.subcommands = self.subcommands_result
result.unpack()
if argv.message_cache:
command_manager.record(argv.token, result)
if isinstance(exception, FuzzyMatchSuccess):
result.output = str(exception)

if self.default_opt_result:
handle_opt_default(self.default_opt_result, self.options_result)
if self.default_sub_result:
for k, v in self.default_sub_result.items():
if k not in self.subcommands_result:
self.subcommands_result[k] = v
result.main_args = self.args_result
result.options = self.options_result
result.subcommands = self.subcommands_result
result.unpack()
if not fail and argv.message_cache:
command_manager.record(argv.token, result)
self.reset()
return result # type: ignore

Expand Down
39 changes: 12 additions & 27 deletions src/arclet/alconna/_internal/_argv.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
from __future__ import annotations

from collections import deque
from dataclasses import InitVar, dataclass, field, fields
from typing import Any, Callable, ClassVar, Generic, Iterable, Literal, TYPE_CHECKING
from typing_extensions import Self

from tarina import lang, split, split_once

from ..args import Arg
from ..base import Option, Subcommand
from ..base import Option
from ..config import Namespace, config
from ..constraint import ARGV_OVERRIDES
from ..exceptions import NullMessage
Expand Down Expand Up @@ -51,11 +49,10 @@ class Argv(Generic[TDC]):
context_style: Literal["bracket", "parentheses"] | None = field(init=False)
"命令上下文插值的风格,None 为关闭,bracket 为 {...},parentheses 为 $(...)"

current_node: Arg | Subcommand | Option | None = field(init=False)
"""当前节点"""
current_index: int = field(init=False)
"""当前数据的索引"""
stack_params: ChainMap["SubAnalyser | Option"] = field(init=False, default_factory=ChainMap)
stack_params: ChainMap[SubAnalyser | Option] = field(init=False, default_factory=lambda: ChainMap())
error: Exception | None = field(init=False)
ndata: int = field(init=False)
"""原始数据的长度"""
bak_data: list[str | Any] = field(init=False)
Expand All @@ -67,8 +64,6 @@ class Argv(Generic[TDC]):
origin: TDC = field(init=False)
"""原始命令"""
context: dict[str, Any] = field(init=False, default_factory=dict)
# special: dict[str, str] = field(init=False, default_factory=dict)
# completion_names: set[str] = field(init=False, default_factory=set)
_sep: str | None = field(init=False)

_cache: ClassVar[dict[type, dict[str, Any]]] = {}
Expand All @@ -91,25 +86,18 @@ def compile(self, meta: CommandMeta):
self.message_cache = self.namespace.enable_message_cache
self.filter_crlf = not meta.keep_crlf
self.context_style = meta.context_style
# self.special = {}
# self.special.update(
# [(i, "help") for i in self.namespace.builtin_option_name["help"]]
# + [(i, "completion") for i in self.namespace.builtin_option_name["completion"]]
# + [(i, "shortcut") for i in self.namespace.builtin_option_name["shortcut"]]
# )
# self.completion_names = self.namespace.builtin_option_name["completion"]

def reset(self):
"""重置命令行参数"""
self.current_index = 0
self.ndata = 0
self.bak_data = []
self.raw_data = []
self.stack_params.maps = []
self.error = None
self.stack_params.stack = []
self.token = 0
self.origin = "None" # type: ignore
self._sep = None
self.current_node = None

@staticmethod
def generate_token(data: list) -> int:
Expand Down Expand Up @@ -190,12 +178,11 @@ def addon(self, data: Iterable[str | Any], merge_str: bool = True) -> Self:
self.token = self.generate_token(self.raw_data)
return self

def next(self, separate: str | None = None, move: bool = True) -> tuple[str | Any, bool]:
def next(self, separate: str | None = None) -> tuple[str | Any, bool]:
"""获取解析需要的下个数据
Args:
separate (str | None, optional): 分隔符.
move (bool, optional): 是否移动指针.
Returns:
tuple[str | Any, bool]: 下个数据, 是否是字符串.
Expand All @@ -208,15 +195,13 @@ def next(self, separate: str | None = None, move: bool = True) -> tuple[str | An
_current_data = self.raw_data[self.current_index]
if _current_data.__class__ is str:
_text, _rest_text = split_once(_current_data, separate, self.filter_crlf) # type: ignore
if move:
if _rest_text:
self._sep = separate
self.raw_data[self.current_index] = _rest_text
else:
self.current_index += 1
if _rest_text:
self._sep = separate
self.raw_data[self.current_index] = _rest_text
else:
self.current_index += 1
return _text, True
if move:
self.current_index += 1
self.current_index += 1
return _current_data, False

def rollback(self, data: str | Any, replace: bool = False):
Expand Down
Loading

0 comments on commit dbeeb66

Please sign in to comment.