212 lines
6.6 KiB
Python
212 lines
6.6 KiB
Python
# Copyright 2015 Google Inc. All rights reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
"""Helper functions for commonly used utilities."""
|
|
|
|
import functools
|
|
import inspect
|
|
import logging
|
|
import warnings
|
|
|
|
import six
|
|
from six.moves import urllib
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
POSITIONAL_WARNING = "WARNING"
|
|
POSITIONAL_EXCEPTION = "EXCEPTION"
|
|
POSITIONAL_IGNORE = "IGNORE"
|
|
POSITIONAL_SET = frozenset(
|
|
[POSITIONAL_WARNING, POSITIONAL_EXCEPTION, POSITIONAL_IGNORE]
|
|
)
|
|
|
|
positional_parameters_enforcement = POSITIONAL_WARNING
|
|
|
|
_SYM_LINK_MESSAGE = "File: {0}: Is a symbolic link."
|
|
_IS_DIR_MESSAGE = "{0}: Is a directory"
|
|
_MISSING_FILE_MESSAGE = "Cannot access {0}: No such file or directory"
|
|
|
|
|
|
def positional(max_positional_args):
|
|
"""A decorator to declare that only the first N arguments my be positional.
|
|
|
|
This decorator makes it easy to support Python 3 style keyword-only
|
|
parameters. For example, in Python 3 it is possible to write::
|
|
|
|
def fn(pos1, *, kwonly1=None, kwonly1=None):
|
|
...
|
|
|
|
All named parameters after ``*`` must be a keyword::
|
|
|
|
fn(10, 'kw1', 'kw2') # Raises exception.
|
|
fn(10, kwonly1='kw1') # Ok.
|
|
|
|
Example
|
|
^^^^^^^
|
|
|
|
To define a function like above, do::
|
|
|
|
@positional(1)
|
|
def fn(pos1, kwonly1=None, kwonly2=None):
|
|
...
|
|
|
|
If no default value is provided to a keyword argument, it becomes a
|
|
required keyword argument::
|
|
|
|
@positional(0)
|
|
def fn(required_kw):
|
|
...
|
|
|
|
This must be called with the keyword parameter::
|
|
|
|
fn() # Raises exception.
|
|
fn(10) # Raises exception.
|
|
fn(required_kw=10) # Ok.
|
|
|
|
When defining instance or class methods always remember to account for
|
|
``self`` and ``cls``::
|
|
|
|
class MyClass(object):
|
|
|
|
@positional(2)
|
|
def my_method(self, pos1, kwonly1=None):
|
|
...
|
|
|
|
@classmethod
|
|
@positional(2)
|
|
def my_method(cls, pos1, kwonly1=None):
|
|
...
|
|
|
|
The positional decorator behavior is controlled by
|
|
``_helpers.positional_parameters_enforcement``, which may be set to
|
|
``POSITIONAL_EXCEPTION``, ``POSITIONAL_WARNING`` or
|
|
``POSITIONAL_IGNORE`` to raise an exception, log a warning, or do
|
|
nothing, respectively, if a declaration is violated.
|
|
|
|
Args:
|
|
max_positional_arguments: Maximum number of positional arguments. All
|
|
parameters after the this index must be
|
|
keyword only.
|
|
|
|
Returns:
|
|
A decorator that prevents using arguments after max_positional_args
|
|
from being used as positional parameters.
|
|
|
|
Raises:
|
|
TypeError: if a key-word only argument is provided as a positional
|
|
parameter, but only if
|
|
_helpers.positional_parameters_enforcement is set to
|
|
POSITIONAL_EXCEPTION.
|
|
"""
|
|
|
|
def positional_decorator(wrapped):
|
|
@functools.wraps(wrapped)
|
|
def positional_wrapper(*args, **kwargs):
|
|
if len(args) > max_positional_args:
|
|
plural_s = ""
|
|
if max_positional_args != 1:
|
|
plural_s = "s"
|
|
message = (
|
|
"{function}() takes at most {args_max} positional "
|
|
"argument{plural} ({args_given} given)".format(
|
|
function=wrapped.__name__,
|
|
args_max=max_positional_args,
|
|
args_given=len(args),
|
|
plural=plural_s,
|
|
)
|
|
)
|
|
if positional_parameters_enforcement == POSITIONAL_EXCEPTION:
|
|
raise TypeError(message)
|
|
elif positional_parameters_enforcement == POSITIONAL_WARNING:
|
|
logger.warning(message)
|
|
return wrapped(*args, **kwargs)
|
|
|
|
return positional_wrapper
|
|
|
|
if isinstance(max_positional_args, six.integer_types):
|
|
return positional_decorator
|
|
else:
|
|
args, _, _, defaults = inspect.getargspec(max_positional_args)
|
|
return positional(len(args) - len(defaults))(max_positional_args)
|
|
|
|
|
|
def parse_unique_urlencoded(content):
|
|
"""Parses unique key-value parameters from urlencoded content.
|
|
|
|
Args:
|
|
content: string, URL-encoded key-value pairs.
|
|
|
|
Returns:
|
|
dict, The key-value pairs from ``content``.
|
|
|
|
Raises:
|
|
ValueError: if one of the keys is repeated.
|
|
"""
|
|
urlencoded_params = urllib.parse.parse_qs(content)
|
|
params = {}
|
|
for key, value in six.iteritems(urlencoded_params):
|
|
if len(value) != 1:
|
|
msg = "URL-encoded content contains a repeated value:" "%s -> %s" % (
|
|
key,
|
|
", ".join(value),
|
|
)
|
|
raise ValueError(msg)
|
|
params[key] = value[0]
|
|
return params
|
|
|
|
|
|
def update_query_params(uri, params):
|
|
"""Updates a URI with new query parameters.
|
|
|
|
If a given key from ``params`` is repeated in the ``uri``, then
|
|
the URI will be considered invalid and an error will occur.
|
|
|
|
If the URI is valid, then each value from ``params`` will
|
|
replace the corresponding value in the query parameters (if
|
|
it exists).
|
|
|
|
Args:
|
|
uri: string, A valid URI, with potential existing query parameters.
|
|
params: dict, A dictionary of query parameters.
|
|
|
|
Returns:
|
|
The same URI but with the new query parameters added.
|
|
"""
|
|
parts = urllib.parse.urlparse(uri)
|
|
query_params = parse_unique_urlencoded(parts.query)
|
|
query_params.update(params)
|
|
new_query = urllib.parse.urlencode(query_params)
|
|
new_parts = parts._replace(query=new_query)
|
|
return urllib.parse.urlunparse(new_parts)
|
|
|
|
|
|
def _add_query_parameter(url, name, value):
|
|
"""Adds a query parameter to a url.
|
|
|
|
Replaces the current value if it already exists in the URL.
|
|
|
|
Args:
|
|
url: string, url to add the query parameter to.
|
|
name: string, query parameter name.
|
|
value: string, query parameter value.
|
|
|
|
Returns:
|
|
Updated query parameter. Does not update the url if value is None.
|
|
"""
|
|
if value is None:
|
|
return url
|
|
else:
|
|
return update_query_params(url, {name: value})
|