Skip to content

Commit

Permalink
add FuncSchema
Browse files Browse the repository at this point in the history
  • Loading branch information
lidong committed Apr 26, 2024
1 parent 350d9a4 commit 11d102f
Show file tree
Hide file tree
Showing 4 changed files with 192 additions and 110 deletions.
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ Module Docs - https://github.com/ClericPy/morebuiltins/blob/master/doc.md

2.4 `NamedLock` - Reusable named locks, support for timeouts, support for multiple concurrent locks.

2.5 `FuncSchema` - Parse the parameters and types required by a function into a dictionary, and convert an incoming parameter into the appropriate type.


## 3. morebuiltins.ipc

Expand Down Expand Up @@ -115,10 +117,10 @@ Module Docs - https://github.com/ClericPy/morebuiltins/blob/master/doc.md

- [x] add zipapps as a submodule(https://github.com/ClericPy/zipapps) - v0.0.3
- [x] asyncio free-flying tasks(bg_task) - v0.0.3
- [x] named lock with timeout
- [x] named lock with timeout - v0.0.4
- [ ] FuncSchema (parse function to get the query-dict) - v0.0.4
- [ ] progress_bar
- [ ] http.server (upload)
- [ ] function parser (signature.parameters)
- [ ] time reach syntax
- [ ] quick tkinter
- [ ] http request/response parser
Expand Down
137 changes: 82 additions & 55 deletions doc.md
Original file line number Diff line number Diff line change
Expand Up @@ -504,70 +504,97 @@

2.4 `NamedLock` - Reusable named locks, support for timeouts, support for multiple concurrent locks.

```python

def test_named_lock():
def test_sync():
import time
from concurrent.futures import ThreadPoolExecutor
from threading import Lock, Semaphore

def _test1():
with NamedLock("_test1", Lock, timeout=0.05) as lock:
time.sleep(0.1)
return bool(lock)

with ThreadPoolExecutor(10) as pool:
tasks = [pool.submit(_test1) for _ in range(3)]
result = [i.result() for i in tasks]
assert result == [True, False, False], result
assert len(NamedLock._SYNC_CACHE) == 1
NamedLock.clear_unlocked()
assert len(NamedLock._SYNC_CACHE) == 0

def _test2():
with NamedLock("_test2", lambda: Semaphore(2), timeout=0.05) as lock:
time.sleep(0.1)
return bool(lock)

with ThreadPoolExecutor(10) as pool:
tasks = [pool.submit(_test2) for _ in range(3)]
result = [i.result() for i in tasks]
assert result == [True, True, False], result

def test_async():
import asyncio

async def main():
async def _test1():
async with NamedLock("_test1", asyncio.Lock, timeout=0.05) as lock:
await asyncio.sleep(0.1)
```python

def test_named_lock():
def test_sync():
import time
from concurrent.futures import ThreadPoolExecutor
from threading import Lock, Semaphore

def _test1():
with NamedLock("_test1", Lock, timeout=0.05) as lock:
time.sleep(0.1)
return bool(lock)

tasks = [asyncio.create_task(_test1()) for _ in range(3)]
result = [await i for i in tasks]
assert result == [True, False, False], result
assert len(NamedLock._ASYNC_CACHE) == 1
with ThreadPoolExecutor(10) as pool:
tasks = [pool.submit(_test1) for _ in range(3)]
result = [i.result() for i in tasks]
assert result == [True, False, False], result
assert len(NamedLock._SYNC_CACHE) == 1
NamedLock.clear_unlocked()
assert len(NamedLock._ASYNC_CACHE) == 0
assert len(NamedLock._SYNC_CACHE) == 0

async def _test2():
async with NamedLock(
"_test2", lambda: asyncio.Semaphore(2), timeout=0.05
) as lock:
await asyncio.sleep(0.1)
def _test2():
with NamedLock("_test2", lambda: Semaphore(2), timeout=0.05) as lock:
time.sleep(0.1)
return bool(lock)

tasks = [asyncio.create_task(_test2()) for _ in range(3)]
result = [await i for i in tasks]
assert result == [True, True, False], result
with ThreadPoolExecutor(10) as pool:
tasks = [pool.submit(_test2) for _ in range(3)]
result = [i.result() for i in tasks]
assert result == [True, True, False], result

asyncio.get_event_loop().run_until_complete(main())
def test_async():
import asyncio

test_sync()
test_async()
async def main():
async def _test1():
async with NamedLock("_test1", asyncio.Lock, timeout=0.05) as lock:
await asyncio.sleep(0.1)
return bool(lock)

```
tasks = [asyncio.create_task(_test1()) for _ in range(3)]
result = [await i for i in tasks]
assert result == [True, False, False], result
assert len(NamedLock._ASYNC_CACHE) == 1
NamedLock.clear_unlocked()
assert len(NamedLock._ASYNC_CACHE) == 0

async def _test2():
async with NamedLock(
"_test2", lambda: asyncio.Semaphore(2), timeout=0.05
) as lock:
await asyncio.sleep(0.1)
return bool(lock)

tasks = [asyncio.create_task(_test2()) for _ in range(3)]
result = [await i for i in tasks]
assert result == [True, True, False], result

asyncio.get_event_loop().run_until_complete(main())

test_sync()
test_async()

```


---


2.5 `FuncSchema` - Parse the parameters and types required by a function into a dictionary, and convert an incoming parameter into the appropriate type.

>>> def test(a, b: str, /, c=1, *, d=["d"], e=0.1, f={"f"}, g=(1, 2), h=True, i={1}, **kws):
... return
>>> FuncSchema.parse(test)
{'b': {'type': <class 'str'>, 'default': <class 'inspect._empty'>}, 'c': {'type': <class 'int'>, 'default': 1}, 'd': {'type': <class 'list'>, 'default': ['d']}, 'e': {'type': <class 'float'>, 'default': 0.1}, 'f': {'type': <class 'set'>, 'default': {'f'}}, 'g': {'type': <class 'tuple'>, 'default': (1, 2)}, 'h': {'type': <class 'bool'>, 'default': True}, 'i': {'type': <class 'set'>, 'default': {1}}}
>>> FuncSchema.convert("1", int)
1
>>> FuncSchema.convert("1", str)
'1'
>>> FuncSchema.convert("1", float)
1.0
>>> FuncSchema.convert(0, bool)
False
>>> FuncSchema.convert('1', bool)
True
>>> FuncSchema.convert('[[1, 1]]', dict)
{1: 1}
>>> FuncSchema.convert('[1, 1]', set)
{1}
>>> FuncSchema.convert('[1, 1]', tuple)
(1, 1)


---
Expand Down
2 changes: 1 addition & 1 deletion morebuiltins/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = "0.0.3"
__version__ = "0.0.4"
__all__ = [
"morebuiltins.utils",
"morebuiltins.functools",
Expand Down
157 changes: 105 additions & 52 deletions morebuiltins/functools.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import asyncio
import inspect
import json
import time
from concurrent.futures import ThreadPoolExecutor
from functools import wraps
Expand All @@ -7,7 +9,7 @@
from typing import Callable, Coroutine, Dict, Optional, OrderedDict, Set, Union
from weakref import WeakSet

__all__ = ["lru_cache_ttl", "threads", "bg_task", "NamedLock"]
__all__ = ["lru_cache_ttl", "threads", "bg_task", "NamedLock", "FuncSchema"]


def lru_cache_ttl(
Expand Down Expand Up @@ -189,70 +191,70 @@ def bg_task(coro: Coroutine) -> asyncio.Task:
class NamedLock:
"""Reusable named locks, support for timeouts, support for multiple concurrent locks.
```python
```python
def test_named_lock():
def test_sync():
import time
from concurrent.futures import ThreadPoolExecutor
from threading import Lock, Semaphore
def test_named_lock():
def test_sync():
import time
from concurrent.futures import ThreadPoolExecutor
from threading import Lock, Semaphore
def _test1():
with NamedLock("_test1", Lock, timeout=0.05) as lock:
time.sleep(0.1)
return bool(lock)
def _test1():
with NamedLock("_test1", Lock, timeout=0.05) as lock:
time.sleep(0.1)
return bool(lock)
with ThreadPoolExecutor(10) as pool:
tasks = [pool.submit(_test1) for _ in range(3)]
result = [i.result() for i in tasks]
assert result == [True, False, False], result
assert len(NamedLock._SYNC_CACHE) == 1
NamedLock.clear_unlocked()
assert len(NamedLock._SYNC_CACHE) == 0
with ThreadPoolExecutor(10) as pool:
tasks = [pool.submit(_test1) for _ in range(3)]
result = [i.result() for i in tasks]
assert result == [True, False, False], result
assert len(NamedLock._SYNC_CACHE) == 1
NamedLock.clear_unlocked()
assert len(NamedLock._SYNC_CACHE) == 0
def _test2():
with NamedLock("_test2", lambda: Semaphore(2), timeout=0.05) as lock:
time.sleep(0.1)
return bool(lock)
def _test2():
with NamedLock("_test2", lambda: Semaphore(2), timeout=0.05) as lock:
time.sleep(0.1)
return bool(lock)
with ThreadPoolExecutor(10) as pool:
tasks = [pool.submit(_test2) for _ in range(3)]
result = [i.result() for i in tasks]
assert result == [True, True, False], result
with ThreadPoolExecutor(10) as pool:
tasks = [pool.submit(_test2) for _ in range(3)]
result = [i.result() for i in tasks]
assert result == [True, True, False], result
def test_async():
import asyncio
def test_async():
import asyncio
async def main():
async def _test1():
async with NamedLock("_test1", asyncio.Lock, timeout=0.05) as lock:
await asyncio.sleep(0.1)
return bool(lock)
async def main():
async def _test1():
async with NamedLock("_test1", asyncio.Lock, timeout=0.05) as lock:
await asyncio.sleep(0.1)
return bool(lock)
tasks = [asyncio.create_task(_test1()) for _ in range(3)]
result = [await i for i in tasks]
assert result == [True, False, False], result
assert len(NamedLock._ASYNC_CACHE) == 1
NamedLock.clear_unlocked()
assert len(NamedLock._ASYNC_CACHE) == 0
tasks = [asyncio.create_task(_test1()) for _ in range(3)]
result = [await i for i in tasks]
assert result == [True, False, False], result
assert len(NamedLock._ASYNC_CACHE) == 1
NamedLock.clear_unlocked()
assert len(NamedLock._ASYNC_CACHE) == 0
async def _test2():
async with NamedLock(
"_test2", lambda: asyncio.Semaphore(2), timeout=0.05
) as lock:
await asyncio.sleep(0.1)
return bool(lock)
async def _test2():
async with NamedLock(
"_test2", lambda: asyncio.Semaphore(2), timeout=0.05
) as lock:
await asyncio.sleep(0.1)
return bool(lock)
tasks = [asyncio.create_task(_test2()) for _ in range(3)]
result = [await i for i in tasks]
assert result == [True, True, False], result
tasks = [asyncio.create_task(_test2()) for _ in range(3)]
result = [await i for i in tasks]
assert result == [True, True, False], result
asyncio.get_event_loop().run_until_complete(main())
asyncio.get_event_loop().run_until_complete(main())
test_sync()
test_async()
test_sync()
test_async()
```
```
"""

_SYNC_CACHE: Dict[str, LockType] = {}
Expand Down Expand Up @@ -309,6 +311,57 @@ async def __aexit__(self, *_):
self.lock.release()


class FuncSchema:
"""Parse the parameters and types required by a function into a dictionary, and convert an incoming parameter into the appropriate type.
>>> def test(a, b: str, /, c=1, *, d=["d"], e=0.1, f={"f"}, g=(1, 2), h=True, i={1}, **kws):
... return
>>> FuncSchema.parse(test)
{'b': {'type': <class 'str'>, 'default': <class 'inspect._empty'>}, 'c': {'type': <class 'int'>, 'default': 1}, 'd': {'type': <class 'list'>, 'default': ['d']}, 'e': {'type': <class 'float'>, 'default': 0.1}, 'f': {'type': <class 'set'>, 'default': {'f'}}, 'g': {'type': <class 'tuple'>, 'default': (1, 2)}, 'h': {'type': <class 'bool'>, 'default': True}, 'i': {'type': <class 'set'>, 'default': {1}}}
>>> FuncSchema.convert("1", int)
1
>>> FuncSchema.convert("1", str)
'1'
>>> FuncSchema.convert("1", float)
1.0
>>> FuncSchema.convert(0, bool)
False
>>> FuncSchema.convert('1', bool)
True
>>> FuncSchema.convert('[[1, 1]]', dict)
{1: 1}
>>> FuncSchema.convert('[1, 1]', set)
{1}
>>> FuncSchema.convert('[1, 1]', tuple)
(1, 1)
"""

ALLOW_TYPES = {int, float, str, tuple, list, set, dict, bool}
JSON_TYPES = {tuple, set, dict, bool}

@classmethod
def parse(cls, function: Callable):
sig = inspect.signature(function)
result = {}
for param in sig.parameters.values():
if param.annotation is param.empty:
if param.default is param.empty:
continue
tp = type(param.default)
else:
tp = param.annotation
if tp in cls.ALLOW_TYPES:
result[param.name] = {"type": tp, "default": param.default}
return result

@classmethod
def convert(cls, obj, target_type):
if isinstance(obj, str) and target_type in cls.JSON_TYPES:
return target_type(json.loads(obj))
else:
return target_type(obj)


def test_bg_task():
async def _test_bg_task():
async def coro():
Expand Down

0 comments on commit 11d102f

Please sign in to comment.