|
39 | 39 | from typing import Sequence, List, Union, Tuple, Type, Dict, Any, Iterable, Optional, BinaryIO, Generator, Mapping |
40 | 40 | from privex.helpers import settings |
41 | 41 | from privex.helpers.collections import DictObject, OrderedDictObject |
42 | | -from privex.helpers.types import T, K, V, C, USE_ORIG_VAR, STRBYTES, Number, NumberStr |
| 42 | +from privex.helpers.types import T, K, V, C, USE_ORIG_VAR, STRBYTES, NumberStr |
43 | 43 | from privex.helpers.exceptions import NestedContextException |
44 | 44 |
|
45 | 45 |
|
@@ -262,14 +262,15 @@ def parse_csv(line: str, csvsplit: str = ',') -> List[str]: |
262 | 262 | return [x.strip() for x in line.strip().split(csvsplit)] |
263 | 263 |
|
264 | 264 |
|
265 | | -def env_csv(env_key: str, env_default = None, csvsplit=',') -> List[str]: |
| 265 | +def env_csv(env_key: str, env_default=None, csvsplit=',') -> List[str]: |
266 | 266 | """ |
267 | 267 | Quick n' dirty parsing of simple CSV formatted environment variables, with fallback |
268 | 268 | to user specified ``env_default`` (defaults to None) |
269 | 269 |
|
270 | 270 | Example: |
271 | 271 |
|
272 | | - >>> os.setenv('EXAMPLE', ' hello , world, test') |
| 272 | + >>> import os |
| 273 | + >>> os.environ['EXAMPLE'] = ' hello , world, test') |
273 | 274 | >>> env_csv('EXAMPLE', []) |
274 | 275 | ['hello', 'world', 'test'] |
275 | 276 | >>> env_csv('NONEXISTANT', []) |
@@ -502,6 +503,226 @@ class - ``class MyClass:``, with settings defined as static class attributes) |
502 | 503 | return {**defaults, **set_conf, **kwargs, **merge_conf} |
503 | 504 |
|
504 | 505 |
|
| 506 | +def get_return_type(f: callable) -> Optional[Union[type, object, callable]]: |
| 507 | + """ |
| 508 | + Extract the return type for a function/method. Note that this only works with functions/methods which have their |
| 509 | + return type annotated, e.g. ``def somefunc(x: int) -> float: return x * 2.1`` |
| 510 | + |
| 511 | + .. Attention:: If you want to extract a function/method return type and have any Generic :mod:`typing` types simplified |
| 512 | + down to their native Python base types (important to be able to compare with :func:`.isinstance` etc.), |
| 513 | + then you should use :func:`.extract_type` instead (handles raw types, objects, and function pointers) |
| 514 | + |
| 515 | + |
| 516 | + **Example 1** - Extracting a generic return type from a function:: |
| 517 | + |
| 518 | + >>> def list_wrap(v: T) -> List[T]: |
| 519 | + ... return [v] |
| 520 | + ... |
| 521 | + >>> rt = get_return_type(list_wrap) |
| 522 | + typing.List[~T] |
| 523 | + >>> rt._name # We can get the string type name via _name |
| 524 | + 'List' |
| 525 | + >>> l = rt.__args__[0] # We can access the types inside of the [] via .__args__ |
| 526 | + ~T |
| 527 | + >>> l.__name__ # Get the name of 'l' - the type inside of the [] |
| 528 | + 'T' |
| 529 | + |
| 530 | + **Example 2** - What happens if you use this on a function/method with no return type annotation? |
| 531 | + |
| 532 | + The answer is: **nothing** - it will simply return ``None`` if the function/method has no return type annotation:: |
| 533 | + |
| 534 | + >>> def hello(x): |
| 535 | + ... return x * 5 |
| 536 | + >>> repr(get_return_type(hello)) |
| 537 | + 'None' |
| 538 | + |
| 539 | + |
| 540 | + :param callable f: A function/method to extract the return type from |
| 541 | + :return return_type: The return type, usually either a :class:`.type` or a :class:`.object` |
| 542 | + """ |
| 543 | + if f is None: return None |
| 544 | + if not inspect.isclass(f) and any([inspect.isfunction(f), inspect.ismethod(f), inspect.iscoroutinefunction(f)]): |
| 545 | + sig = inspect.signature(f) |
| 546 | + ret = sig.return_annotation |
| 547 | + # noinspection PyUnresolvedReferences,PyProtectedMember |
| 548 | + if ret is inspect._empty or empty(ret, True): |
| 549 | + return None |
| 550 | + return ret |
| 551 | + return f |
| 552 | + |
| 553 | + |
| 554 | +def typing_to_base(tp, fail=False, return_orig=True, clean_union=True) -> Optional[Union[type, object, callable, tuple, Tuple[type]]]: |
| 555 | + """ |
| 556 | + Attempt to extract one or more native Python base types from a :mod:`typing` type, including generics such as ``List[str]``, |
| 557 | + and combined types such as ``Union[list, str]`` |
| 558 | + |
| 559 | + >>> typing_to_base(List[str]) |
| 560 | + list |
| 561 | + >>> typing_to_base(Union[str, Dict[str, list], int]) |
| 562 | + (str, dict, int) |
| 563 | + >>> typing_to_base(Union[str, Dict[str, list], int], clean_union=False) |
| 564 | + (str, typing.Dict[str, list], int) |
| 565 | + >>> typing_to_base(str) |
| 566 | + str |
| 567 | + >>> typing_to_base(str, fail=True) |
| 568 | + TypeError: Failed to extract base type for type object: <class 'str'> |
| 569 | + >>> repr(typing_to_base(str, return_orig=False)) |
| 570 | + 'None' |
| 571 | + |
| 572 | + :param tp: The :mod:`typing` type object to extract base/native type(s) from. |
| 573 | + :param bool fail: (Default: ``False``) If True, then raises :class:`.TypeError` if ``tp`` doesn't appear to be a :mod:`typing` type. |
| 574 | + :param bool return_orig: (Default: ``True``) If True, returns ``tp`` as-is if it's not a typing type. When ``False``, |
| 575 | + non- :mod:`typing` types will cause ``None`` to be returned. |
| 576 | + :param bool clean_union: (Default: ``True``) If True, :class:`typing.Union`'s will have each type |
| 577 | + converted/validated into a normal type using :func:`.extract_type` |
| 578 | + :return type_res: Either a :class:`.type` base type, a :class:`.tuple` of types, a :mod:`typing` type object, or something else |
| 579 | + depending on what type ``tp`` was. |
| 580 | + """ |
| 581 | + # We can't use isinstance() with Union generic objects, so we have to identify them by checking their repr string. |
| 582 | + if repr(tp).startswith('typing.Union['): |
| 583 | + # For Union's (including Optional[]), we iterate over the object's ``__args__`` which contains the Union's types, |
| 584 | + # and pass each type through extract_type to cleanup any ``typing`` generics such as ``List[str]`` back into |
| 585 | + # their native type (e.g. ``str`` for ``List[str]``) |
| 586 | + ntypes = [] |
| 587 | + # noinspection PyUnresolvedReferences |
| 588 | + targs = tp.__args__ |
| 589 | + for t in targs: |
| 590 | + try: |
| 591 | + ntypes.append(extract_type(t) if clean_union else t) |
| 592 | + except Exception as e: |
| 593 | + log.warning("Error while extracting type for %s (part of %s). Reason: %s - %s", t, repr(tp), type(e), str(e)) |
| 594 | + ntypes.append(t) |
| 595 | + return tuple(ntypes) |
| 596 | + # For Python 3.6, __origin__ contains the typing type without the generic part, while __orig_bases__ is a tuple containing the |
| 597 | + # native/base type, and some typing type. |
| 598 | + # On 3.7+, __origin__ contains the native/base type, while __orig_bases__ doesn't exist |
| 599 | + if hasattr(tp, '__orig_bases__'): return tp.__orig_bases__[0] |
| 600 | + |
| 601 | + # __origin__ / __extra__ are exposed by :mod:`typing` types, including generics such as Dict[str,str] |
| 602 | + # original SO answer: https://stackoverflow.com/a/54241536/2648583 |
| 603 | + if hasattr(tp, '__origin__'): return tp.__origin__ |
| 604 | + if hasattr(tp, '__extra__'): return tp.__extra__ |
| 605 | + if fail: |
| 606 | + raise TypeError(f"Failed to extract base type for type object: {repr(tp)}") |
| 607 | + if return_orig: |
| 608 | + return tp |
| 609 | + return None |
| 610 | + |
| 611 | + |
| 612 | +def extract_type(tp: Union[type, callable, object], **kwargs) -> Optional[Union[type, object, callable, tuple, Tuple[type]]]: |
| 613 | + """ |
| 614 | + Attempt to identify the :class:`.type` of a given value, or for functions/methods - identify their RETURN value type. |
| 615 | + |
| 616 | + This function can usually detect :mod:`typing` types, including generics such as ``List[str]``, and will attempt to extract |
| 617 | + their native Python base type, e.g. :class:`.list`. |
| 618 | + |
| 619 | + For :class:`typing.Union` based types (including :class:`typing.Optional`), it can extract a tuple of base types, including |
| 620 | + from nested :class:`typing.Union`'s - e.g. ``Union[str, list, Union[dict, set], int`` would be simplified down |
| 621 | + to ``(str, list, dict, set, int)`` |
| 622 | +
|
| 623 | + .. Attention:: If you want to extract the original return type from a function/method, including generic types such as ``List[str]``, |
| 624 | + then you should use :func:`.get_return_type` instead. |
| 625 | +
|
| 626 | + **Example 1** - convert a generic type e.g. ``Dict[str, str]`` into it's native type (e.g. ``dict``):: |
| 627 | +
|
| 628 | + >>> dtype = Dict[str, str] |
| 629 | + >>> # noinspection PyTypeHints,PyTypeChecker |
| 630 | + >>> isinstance({}, dtype) |
| 631 | + TypeError: Subscripted generics cannot be used with class and instance checks |
| 632 | + >>> extract_type(dtype) |
| 633 | + dict |
| 634 | + >>> isinstance({}, extract_type(dtype)) |
| 635 | + True |
| 636 | +
|
| 637 | + **Example 2** - extract the return type of a function/method, and if the return type is a generic (e.g. ``List[str]``), automatically |
| 638 | + convert it into the native type (e.g. ``list``) for use in comparisons such as :func:`.isinstance`:: |
| 639 | +
|
| 640 | + >>> def list_wrap(v: T) -> List[T]: |
| 641 | + ... return [v] |
| 642 | + >>> |
| 643 | + >>> extract_type(list_wrap) |
| 644 | + list |
| 645 | + >>> isinstance([1, 2, 3], extract_type(list_wrap)) |
| 646 | + True |
| 647 | +
|
| 648 | + **Example 3** - extract the type from an instantiated object, allowing for :func:`.isinstance` comparisons:: |
| 649 | +
|
| 650 | + >>> from privex.helpers import DictObject |
| 651 | + >>> db = DictObject(hello='world', lorem='ipsum') |
| 652 | + {'hello': 'world', 'lorem': 'ipsum'} |
| 653 | + >>> type_db = extract_type(db) |
| 654 | + privex.helpers.collections.DictObject |
| 655 | + >>> isinstance(db, type_db) |
| 656 | + True |
| 657 | + >>> isinstance(DictObject(test=123), type_db) |
| 658 | + True |
| 659 | + |
| 660 | + **Example 4** - extract a tuple of types from a :class:`typing.Union` or :class:`typing.Optional` (inc. return types) :: |
| 661 | + |
| 662 | + >>> def hello(x) -> Optional[str]: |
| 663 | + ... return x * 5 |
| 664 | + ... |
| 665 | + >>> extract_type(hello) |
| 666 | + (str, NoneType) |
| 667 | + >>> # Even with a Union[] containing a List[], another Union[] (containing a Tuple and set), and a Dict[], |
| 668 | + >>> # extract_type is still able to recursively flatten and simplify it down to a tuple of base Python types |
| 669 | + >>> extract_type(Union[ |
| 670 | + ... List[str], |
| 671 | + ... Union[Tuple[str, int, str], set], |
| 672 | + ... Dict[int, str] |
| 673 | + ... ]) |
| 674 | + (list, tuple, set, dict) |
| 675 | + |
| 676 | + |
| 677 | + **Return Types** |
| 678 | + |
| 679 | + A :class:`.type` will be returned for most calls where ``tp`` is either: |
| 680 | + |
| 681 | + * Already a native :class:`.type` e.g. :class:`.list` |
| 682 | + * A generic type such as ``List[str]`` (which are technically instances of :class:`.object`) |
| 683 | + * A function/method with a valid return type annotation, including generic return types |
| 684 | + * An instance of a class (an object), where the original type can be easily extracted via ``tp.__class__`` |
| 685 | + |
| 686 | + If ``tp`` was an :class:`.object` and the type/class couldn't be extracted, then it would be returned in it's original object form. |
| 687 | + |
| 688 | + If ``tp`` was an unusual function/method which couldn't be detected as one, or issues occurred while extracting the return type, |
| 689 | + then ``tp`` may be returned in it's original :class:`.callable` form. |
| 690 | + |
| 691 | + |
| 692 | + :param tp: The type/object/function etc. to extract the most accurate type from |
| 693 | + :return type|object|callable ret: A :class:`.type` will be returned for most calls, but may be an :class:`.object` |
| 694 | + or :class:`.callable` if there were issues detecting the type. |
| 695 | + """ |
| 696 | + # If tp is None, there's nothing we can do with it, so return None. |
| 697 | + if tp is None: return None |
| 698 | + # If 'tp' is a known native type, we don't need to extract anything, just return tp. |
| 699 | + if tp in [list, set, tuple, dict, str, bytes, int, float, Decimal]: return tp |
| 700 | + is_func = any([inspect.isfunction(tp), inspect.ismethod(tp), inspect.iscoroutinefunction(tp)]) |
| 701 | + # Functions count as class instances (instances of object), therefore to narrow down a real class/type instance, |
| 702 | + # we have to confirm it's NOT a function/method/coro, NOT a raw class/type, but IS an instance of object. |
| 703 | + # if not is_func and not inspect.isclass(tp) and isinstance(tp, object): |
| 704 | + if not is_func and isinstance(tp, object): |
| 705 | + # Handle extracting base types from generic :mod:`typing` objects, including tuples of types from Union's |
| 706 | + tbase = typing_to_base(tp, return_orig=False) |
| 707 | + if tbase is not None: # If the result wasn't None, then we know it was a typing type and base type(s) were extracted properly |
| 708 | + return tbase |
| 709 | + # Before checking __class__, we make sure that tp is an instance by checking isclass(tp) is False |
| 710 | + if not inspect.isclass(tp) and hasattr(tp, '__class__'): |
| 711 | + return tp.__class__ # If tp isn't a typing type, __class__ (if it exists) should be the "type" of tp |
| 712 | + return tp # If all else fails, return tp as-is |
| 713 | + |
| 714 | + # If is_func matches at this point, we're dealing with a function/method/coroutine and need to extract the return type. |
| 715 | + # To prevent an infinite loop, we set _sec_layer when passing the return type to extract_type(), ensuring that we don't |
| 716 | + # call extract_type(rt) AGAIN if the return type just so happened to be a function |
| 717 | + if is_func and not kwargs.get('_sec_layer'): |
| 718 | + # Extract the original return type, then pass it through extract_type again, since if it's a generic type, |
| 719 | + # we'll want to extract the native type from it, since generics like ``List[str]`` can't be used with ``isinstance()`` |
| 720 | + rt = get_return_type(tp) |
| 721 | + return extract_type(rt, _sec_layer=True) |
| 722 | + # If all else fails, return tp as-is |
| 723 | + return tp |
| 724 | + |
| 725 | + |
505 | 726 | def dec_round(amount: Decimal, dp: int = 2, rounding=None) -> Decimal: |
506 | 727 | """ |
507 | 728 | Round a Decimal to x decimal places using ``quantize`` (``dp`` must be >= 1 and the default dp is 2) |
@@ -768,7 +989,7 @@ def shell_quote(*args: str) -> str: |
768 | 989 | return shlex.join(args) if hasattr(shlex, 'join') else " ".join([shlex.quote(a) for a in args]).strip() |
769 | 990 |
|
770 | 991 |
|
771 | | -def call_sys(proc, *args, write: STRBYTES = None, **kwargs) -> Tuple[bytes, bytes]: |
| 992 | +def call_sys(proc, *args, write: STRBYTES = None, **kwargs) -> Union[Tuple[bytes, bytes], Tuple[str, str]]: |
772 | 993 | """ |
773 | 994 | A small wrapper around :class:`subprocess.Popen` which allows executing processes, while optionally piping |
774 | 995 | data (``write``) into the process's stdin, then finally returning the process's output and error results. |
@@ -1033,9 +1254,9 @@ def almost(compare: NumberStr, *numbers: NumberStr, tolerance: NumberStr = Decim |
1033 | 1254 | AssertionError |
1034 | 1255 |
|
1035 | 1256 | |
1036 | | - :param Decimal|int|float compare: The base number which all ``numbers`` will be compared against. |
1037 | | - :param Decimal|int|float numbers: One or more numbers to compare against ``compare`` |
1038 | | - :param Decimal|int|float tolerance: (kwarg only) Amount that each ``numbers`` can be greater/smaller than ``compare`` before |
| 1257 | + :param Decimal|int|float|str compare: The base number which all ``numbers`` will be compared against. |
| 1258 | + :param Decimal|int|float|str numbers: One or more numbers to compare against ``compare`` |
| 1259 | + :param Decimal|int|float|str tolerance: (kwarg only) Amount that each ``numbers`` can be greater/smaller than ``compare`` before |
1039 | 1260 | returning ``False``. |
1040 | 1261 | :keyword bool fail: (default: ``False``) If true, will raise :class:`.AssertionError` on failed tolerance check, instead of |
1041 | 1262 | returning ``False``. (mutually exclusive with ``assert``) |
@@ -1076,13 +1297,13 @@ def almost(compare: NumberStr, *numbers: NumberStr, tolerance: NumberStr = Decim |
1076 | 1297 | """Pre-compiled regex for matching catch-all keyword argument parameter names like ``**args``""" |
1077 | 1298 | T_PARAM = inspect.Parameter |
1078 | 1299 | """Type alias for :class:`inspect.Parameter`""" |
1079 | | -T_PARAM_LIST = Union[Dict[str, T_PARAM], List[T_PARAM], Iterable[T_PARAM]] |
| 1300 | +T_PARAM_LIST = Union[Dict[str, T_PARAM], Mapping[str, T_PARAM], List[T_PARAM], Iterable[T_PARAM]] |
1080 | 1301 | """ |
1081 | 1302 | Type alias for dict's containing strings mapped to :class:`inspect.Parameter`'s, lists of just |
1082 | 1303 | :class:`inspect.Parameter`'s, and any iterable of :class:`inspect.Parameter` |
1083 | 1304 | """ |
1084 | 1305 |
|
1085 | | -# noinspection PyProtectedMember |
| 1306 | +# noinspection PyProtectedMember,PyUnresolvedReferences |
1086 | 1307 | INS_EMPTY = inspect._empty |
1087 | 1308 | """ |
1088 | 1309 | Type alias for :class:`inspect.empty` |
@@ -1134,8 +1355,6 @@ def _filter_params(params: T_PARAM_LIST, ignore_xargs=False, ignore_xkwargs=Fals |
1134 | 1355 | ignore_defaults = kwargs.pop('ignore_defaults', False) |
1135 | 1356 | ignore_positional = kwargs.pop('ignore_positional', False) |
1136 | 1357 |
|
1137 | | - |
1138 | | - |
1139 | 1358 | _params = params |
1140 | 1359 | if isinstance(params, (dict, OrderedDict)) or hasattr(params, 'values'): |
1141 | 1360 | _params = params.values() |
|
0 commit comments