You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					186 lines
				
				6.1 KiB
			
		
		
			
		
	
	
					186 lines
				
				6.1 KiB
			| 
								 
											3 years ago
										 
									 | 
							
								# event/legacy.py
							 | 
						||
| 
								 | 
							
								# Copyright (C) 2005-2022 the SQLAlchemy authors and contributors
							 | 
						||
| 
								 | 
							
								# <see AUTHORS file>
							 | 
						||
| 
								 | 
							
								#
							 | 
						||
| 
								 | 
							
								# This module is part of SQLAlchemy and is released under
							 | 
						||
| 
								 | 
							
								# the MIT License: https://www.opensource.org/licenses/mit-license.php
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								"""Routines to handle adaption of legacy call signatures,
							 | 
						||
| 
								 | 
							
								generation of deprecation notes and docstrings.
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								"""
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								from .. import util
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								def _legacy_signature(since, argnames, converter=None):
							 | 
						||
| 
								 | 
							
								    def leg(fn):
							 | 
						||
| 
								 | 
							
								        if not hasattr(fn, "_legacy_signatures"):
							 | 
						||
| 
								 | 
							
								            fn._legacy_signatures = []
							 | 
						||
| 
								 | 
							
								        fn._legacy_signatures.append((since, argnames, converter))
							 | 
						||
| 
								 | 
							
								        return fn
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    return leg
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								def _wrap_fn_for_legacy(dispatch_collection, fn, argspec):
							 | 
						||
| 
								 | 
							
								    for since, argnames, conv in dispatch_collection.legacy_signatures:
							 | 
						||
| 
								 | 
							
								        if argnames[-1] == "**kw":
							 | 
						||
| 
								 | 
							
								            has_kw = True
							 | 
						||
| 
								 | 
							
								            argnames = argnames[0:-1]
							 | 
						||
| 
								 | 
							
								        else:
							 | 
						||
| 
								 | 
							
								            has_kw = False
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        if len(argnames) == len(argspec.args) and has_kw is bool(
							 | 
						||
| 
								 | 
							
								            argspec.varkw
							 | 
						||
| 
								 | 
							
								        ):
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								            formatted_def = "def %s(%s%s)" % (
							 | 
						||
| 
								 | 
							
								                dispatch_collection.name,
							 | 
						||
| 
								 | 
							
								                ", ".join(dispatch_collection.arg_names),
							 | 
						||
| 
								 | 
							
								                ", **kw" if has_kw else "",
							 | 
						||
| 
								 | 
							
								            )
							 | 
						||
| 
								 | 
							
								            warning_txt = (
							 | 
						||
| 
								 | 
							
								                'The argument signature for the "%s.%s" event listener '
							 | 
						||
| 
								 | 
							
								                "has changed as of version %s, and conversion for "
							 | 
						||
| 
								 | 
							
								                "the old argument signature will be removed in a "
							 | 
						||
| 
								 | 
							
								                'future release.  The new signature is "%s"'
							 | 
						||
| 
								 | 
							
								                % (
							 | 
						||
| 
								 | 
							
								                    dispatch_collection.clsname,
							 | 
						||
| 
								 | 
							
								                    dispatch_collection.name,
							 | 
						||
| 
								 | 
							
								                    since,
							 | 
						||
| 
								 | 
							
								                    formatted_def,
							 | 
						||
| 
								 | 
							
								                )
							 | 
						||
| 
								 | 
							
								            )
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								            if conv:
							 | 
						||
| 
								 | 
							
								                assert not has_kw
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								                def wrap_leg(*args):
							 | 
						||
| 
								 | 
							
								                    util.warn_deprecated(warning_txt, version=since)
							 | 
						||
| 
								 | 
							
								                    return fn(*conv(*args))
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								            else:
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								                def wrap_leg(*args, **kw):
							 | 
						||
| 
								 | 
							
								                    util.warn_deprecated(warning_txt, version=since)
							 | 
						||
| 
								 | 
							
								                    argdict = dict(zip(dispatch_collection.arg_names, args))
							 | 
						||
| 
								 | 
							
								                    args = [argdict[name] for name in argnames]
							 | 
						||
| 
								 | 
							
								                    if has_kw:
							 | 
						||
| 
								 | 
							
								                        return fn(*args, **kw)
							 | 
						||
| 
								 | 
							
								                    else:
							 | 
						||
| 
								 | 
							
								                        return fn(*args)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								            return wrap_leg
							 | 
						||
| 
								 | 
							
								    else:
							 | 
						||
| 
								 | 
							
								        return fn
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								def _indent(text, indent):
							 | 
						||
| 
								 | 
							
								    return "\n".join(indent + line for line in text.split("\n"))
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								def _standard_listen_example(dispatch_collection, sample_target, fn):
							 | 
						||
| 
								 | 
							
								    example_kw_arg = _indent(
							 | 
						||
| 
								 | 
							
								        "\n".join(
							 | 
						||
| 
								 | 
							
								            "%(arg)s = kw['%(arg)s']" % {"arg": arg}
							 | 
						||
| 
								 | 
							
								            for arg in dispatch_collection.arg_names[0:2]
							 | 
						||
| 
								 | 
							
								        ),
							 | 
						||
| 
								 | 
							
								        "    ",
							 | 
						||
| 
								 | 
							
								    )
							 | 
						||
| 
								 | 
							
								    if dispatch_collection.legacy_signatures:
							 | 
						||
| 
								 | 
							
								        current_since = max(
							 | 
						||
| 
								 | 
							
								            since
							 | 
						||
| 
								 | 
							
								            for since, args, conv in dispatch_collection.legacy_signatures
							 | 
						||
| 
								 | 
							
								        )
							 | 
						||
| 
								 | 
							
								    else:
							 | 
						||
| 
								 | 
							
								        current_since = None
							 | 
						||
| 
								 | 
							
								    text = (
							 | 
						||
| 
								 | 
							
								        "from sqlalchemy import event\n\n\n"
							 | 
						||
| 
								 | 
							
								        "@event.listens_for(%(sample_target)s, '%(event_name)s')\n"
							 | 
						||
| 
								 | 
							
								        "def receive_%(event_name)s("
							 | 
						||
| 
								 | 
							
								        "%(named_event_arguments)s%(has_kw_arguments)s):\n"
							 | 
						||
| 
								 | 
							
								        "    \"listen for the '%(event_name)s' event\"\n"
							 | 
						||
| 
								 | 
							
								        "\n    # ... (event handling logic) ...\n"
							 | 
						||
| 
								 | 
							
								    )
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    text %= {
							 | 
						||
| 
								 | 
							
								        "current_since": " (arguments as of %s)" % current_since
							 | 
						||
| 
								 | 
							
								        if current_since
							 | 
						||
| 
								 | 
							
								        else "",
							 | 
						||
| 
								 | 
							
								        "event_name": fn.__name__,
							 | 
						||
| 
								 | 
							
								        "has_kw_arguments": ", **kw" if dispatch_collection.has_kw else "",
							 | 
						||
| 
								 | 
							
								        "named_event_arguments": ", ".join(dispatch_collection.arg_names),
							 | 
						||
| 
								 | 
							
								        "example_kw_arg": example_kw_arg,
							 | 
						||
| 
								 | 
							
								        "sample_target": sample_target,
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								    return text
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								def _legacy_listen_examples(dispatch_collection, sample_target, fn):
							 | 
						||
| 
								 | 
							
								    text = ""
							 | 
						||
| 
								 | 
							
								    for since, args, conv in dispatch_collection.legacy_signatures:
							 | 
						||
| 
								 | 
							
								        text += (
							 | 
						||
| 
								 | 
							
								            "\n# DEPRECATED calling style (pre-%(since)s, "
							 | 
						||
| 
								 | 
							
								            "will be removed in a future release)\n"
							 | 
						||
| 
								 | 
							
								            "@event.listens_for(%(sample_target)s, '%(event_name)s')\n"
							 | 
						||
| 
								 | 
							
								            "def receive_%(event_name)s("
							 | 
						||
| 
								 | 
							
								            "%(named_event_arguments)s%(has_kw_arguments)s):\n"
							 | 
						||
| 
								 | 
							
								            "    \"listen for the '%(event_name)s' event\"\n"
							 | 
						||
| 
								 | 
							
								            "\n    # ... (event handling logic) ...\n"
							 | 
						||
| 
								 | 
							
								            % {
							 | 
						||
| 
								 | 
							
								                "since": since,
							 | 
						||
| 
								 | 
							
								                "event_name": fn.__name__,
							 | 
						||
| 
								 | 
							
								                "has_kw_arguments": " **kw"
							 | 
						||
| 
								 | 
							
								                if dispatch_collection.has_kw
							 | 
						||
| 
								 | 
							
								                else "",
							 | 
						||
| 
								 | 
							
								                "named_event_arguments": ", ".join(args),
							 | 
						||
| 
								 | 
							
								                "sample_target": sample_target,
							 | 
						||
| 
								 | 
							
								            }
							 | 
						||
| 
								 | 
							
								        )
							 | 
						||
| 
								 | 
							
								    return text
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								def _version_signature_changes(parent_dispatch_cls, dispatch_collection):
							 | 
						||
| 
								 | 
							
								    since, args, conv = dispatch_collection.legacy_signatures[0]
							 | 
						||
| 
								 | 
							
								    return (
							 | 
						||
| 
								 | 
							
								        "\n.. deprecated:: %(since)s\n"
							 | 
						||
| 
								 | 
							
								        "    The :class:`.%(clsname)s.%(event_name)s` event now accepts the \n"
							 | 
						||
| 
								 | 
							
								        "    arguments ``%(named_event_arguments)s%(has_kw_arguments)s``.\n"
							 | 
						||
| 
								 | 
							
								        "    Support for listener functions which accept the previous \n"
							 | 
						||
| 
								 | 
							
								        '    argument signature(s) listed above as "deprecated" will be \n'
							 | 
						||
| 
								 | 
							
								        "    removed in a future release."
							 | 
						||
| 
								 | 
							
								        % {
							 | 
						||
| 
								 | 
							
								            "since": since,
							 | 
						||
| 
								 | 
							
								            "clsname": parent_dispatch_cls.__name__,
							 | 
						||
| 
								 | 
							
								            "event_name": dispatch_collection.name,
							 | 
						||
| 
								 | 
							
								            "named_event_arguments": ", ".join(dispatch_collection.arg_names),
							 | 
						||
| 
								 | 
							
								            "has_kw_arguments": ", **kw" if dispatch_collection.has_kw else "",
							 | 
						||
| 
								 | 
							
								        }
							 | 
						||
| 
								 | 
							
								    )
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								def _augment_fn_docs(dispatch_collection, parent_dispatch_cls, fn):
							 | 
						||
| 
								 | 
							
								    header = (
							 | 
						||
| 
								 | 
							
								        ".. container:: event_signatures\n\n"
							 | 
						||
| 
								 | 
							
								        "     Example argument forms::\n"
							 | 
						||
| 
								 | 
							
								        "\n"
							 | 
						||
| 
								 | 
							
								    )
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    sample_target = getattr(parent_dispatch_cls, "_target_class_doc", "obj")
							 | 
						||
| 
								 | 
							
								    text = header + _indent(
							 | 
						||
| 
								 | 
							
								        _standard_listen_example(dispatch_collection, sample_target, fn),
							 | 
						||
| 
								 | 
							
								        " " * 8,
							 | 
						||
| 
								 | 
							
								    )
							 | 
						||
| 
								 | 
							
								    if dispatch_collection.legacy_signatures:
							 | 
						||
| 
								 | 
							
								        text += _indent(
							 | 
						||
| 
								 | 
							
								            _legacy_listen_examples(dispatch_collection, sample_target, fn),
							 | 
						||
| 
								 | 
							
								            " " * 8,
							 | 
						||
| 
								 | 
							
								        )
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        text += _version_signature_changes(
							 | 
						||
| 
								 | 
							
								            parent_dispatch_cls, dispatch_collection
							 | 
						||
| 
								 | 
							
								        )
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    return util.inject_docstring_text(fn.__doc__, text, 1)
							 |