import json import sys from functools import partial from io import BytesIO import pytest from werkzeug.datastructures import Authorization from werkzeug.datastructures import FileStorage from werkzeug.datastructures import Headers from werkzeug.datastructures import MultiDict from werkzeug.formparser import parse_form_data from werkzeug.http import parse_authorization_header from werkzeug.test import Client from werkzeug.test import ClientRedirectError from werkzeug.test import create_environ from werkzeug.test import EnvironBuilder from werkzeug.test import run_wsgi_app from werkzeug.test import stream_encode_multipart from werkzeug.utils import redirect from werkzeug.wrappers import Request from werkzeug.wrappers import Response def cookie_app(environ, start_response): """A WSGI application which sets a cookie, and returns as a response any cookie which exists. """ response = Response(environ.get("HTTP_COOKIE", "No Cookie"), mimetype="text/plain") response.set_cookie("test", "test") return response(environ, start_response) def redirect_loop_app(environ, start_response): response = redirect("http://localhost/some/redirect/") return response(environ, start_response) def redirect_with_get_app(environ, start_response): req = Request(environ) if req.url not in ( "http://localhost/", "http://localhost/first/request", "http://localhost/some/redirect/", ): raise AssertionError(f'redirect_demo_app() did not expect URL "{req.url}"') if "/some/redirect" not in req.url: response = redirect("http://localhost/some/redirect/") else: response = Response(f"current url: {req.url}") return response(environ, start_response) def external_redirect_demo_app(environ, start_response): response = redirect("http://example.com/") return response(environ, start_response) def external_subdomain_redirect_demo_app(environ, start_response): if "test.example.com" in environ["HTTP_HOST"]: response = Response("redirected successfully to subdomain") else: response = redirect("http://test.example.com/login") return response(environ, start_response) def multi_value_post_app(environ, start_response): req = Request(environ) assert req.form["field"] == "val1", req.form["field"] assert req.form.getlist("field") == ["val1", "val2"], req.form.getlist("field") response = Response("ok") return response(environ, start_response) def test_cookie_forging(): c = Client(cookie_app) c.set_cookie("localhost", "foo", "bar") response = c.open() assert response.text == "foo=bar" def test_set_cookie_app(): c = Client(cookie_app) response = c.open() assert "Set-Cookie" in response.headers def test_cookiejar_stores_cookie(): c = Client(cookie_app) c.open() assert "test" in c.cookie_jar._cookies["localhost.local"]["/"] def test_no_initial_cookie(): c = Client(cookie_app) response = c.open() assert response.text == "No Cookie" def test_resent_cookie(): c = Client(cookie_app) c.open() response = c.open() assert response.text == "test=test" def test_disable_cookies(): c = Client(cookie_app, use_cookies=False) c.open() response = c.open() assert response.text == "No Cookie" def test_cookie_for_different_path(): c = Client(cookie_app) c.open("/path1") response = c.open("/path2") assert response.text == "test=test" def test_environ_builder_basics(): b = EnvironBuilder() assert b.content_type is None b.method = "POST" assert b.content_type is None b.form["test"] = "normal value" assert b.content_type == "application/x-www-form-urlencoded" b.files.add_file("test", BytesIO(b"test contents"), "test.txt") assert b.files["test"].content_type == "text/plain" b.form["test_int"] = 1 assert b.content_type == "multipart/form-data" req = b.get_request() b.close() assert req.url == "http://localhost/" assert req.method == "POST" assert req.form["test"] == "normal value" assert req.files["test"].content_type == "text/plain" assert req.files["test"].filename == "test.txt" assert req.files["test"].read() == b"test contents" req.close() def test_environ_builder_data(): b = EnvironBuilder(data="foo") assert b.input_stream.getvalue() == b"foo" b = EnvironBuilder(data=b"foo") assert b.input_stream.getvalue() == b"foo" b = EnvironBuilder(data={"foo": "bar"}) assert b.form["foo"] == "bar" b = EnvironBuilder(data={"foo": ["bar1", "bar2"]}) assert b.form.getlist("foo") == ["bar1", "bar2"] def check_list_content(b, length): foo = b.files.getlist("foo") assert len(foo) == length for obj in foo: assert isinstance(obj, FileStorage) b = EnvironBuilder(data={"foo": BytesIO()}) check_list_content(b, 1) b = EnvironBuilder(data={"foo": [BytesIO(), BytesIO()]}) check_list_content(b, 2) b = EnvironBuilder(data={"foo": (BytesIO(),)}) check_list_content(b, 1) b = EnvironBuilder(data={"foo": [(BytesIO(),), (BytesIO(),)]}) check_list_content(b, 2) def test_environ_builder_json(): @Request.application def app(request): assert request.content_type == "application/json" return Response(json.loads(request.get_data(as_text=True))["foo"]) c = Client(app) response = c.post("/", json={"foo": "bar"}) assert response.text == "bar" with pytest.raises(TypeError): c.post("/", json={"foo": "bar"}, data={"baz": "qux"}) def test_environ_builder_headers(): b = EnvironBuilder( environ_base={"HTTP_USER_AGENT": "Foo/0.1"}, environ_overrides={"wsgi.version": (1, 1)}, ) b.headers["X-Beat-My-Horse"] = "very well sir" env = b.get_environ() assert env["HTTP_USER_AGENT"] == "Foo/0.1" assert env["HTTP_X_BEAT_MY_HORSE"] == "very well sir" assert env["wsgi.version"] == (1, 1) b.headers["User-Agent"] = "Bar/1.0" env = b.get_environ() assert env["HTTP_USER_AGENT"] == "Bar/1.0" def test_environ_builder_headers_content_type(): b = EnvironBuilder(headers={"Content-Type": "text/plain"}) env = b.get_environ() assert env["CONTENT_TYPE"] == "text/plain" assert "HTTP_CONTENT_TYPE" not in env b = EnvironBuilder(content_type="text/html", headers={"Content-Type": "text/plain"}) env = b.get_environ() assert env["CONTENT_TYPE"] == "text/html" assert "HTTP_CONTENT_TYPE" not in env b = EnvironBuilder() env = b.get_environ() assert "CONTENT_TYPE" not in env assert "HTTP_CONTENT_TYPE" not in env def test_envrion_builder_multiple_headers(): h = Headers() h.add("FOO", "bar") h.add("FOO", "baz") b = EnvironBuilder(headers=h) env = b.get_environ() assert env["HTTP_FOO"] == "bar, baz" def test_environ_builder_paths(): b = EnvironBuilder(path="/foo", base_url="http://example.com/") assert b.base_url == "http://example.com/" assert b.path == "/foo" assert b.script_root == "" assert b.host == "example.com" b = EnvironBuilder(path="/foo", base_url="http://example.com/bar") assert b.base_url == "http://example.com/bar/" assert b.path == "/foo" assert b.script_root == "/bar" assert b.host == "example.com" b.host = "localhost" assert b.base_url == "http://localhost/bar/" b.base_url = "http://localhost:8080/" assert b.host == "localhost:8080" assert b.server_name == "localhost" assert b.server_port == 8080 b.host = "foo.invalid" b.url_scheme = "https" b.script_root = "/test" env = b.get_environ() assert env["SERVER_NAME"] == "foo.invalid" assert env["SERVER_PORT"] == "443" assert env["SCRIPT_NAME"] == "/test" assert env["PATH_INFO"] == "/foo" assert env["HTTP_HOST"] == "foo.invalid" assert env["wsgi.url_scheme"] == "https" assert b.base_url == "https://foo.invalid/test/" def test_environ_builder_content_type(): builder = EnvironBuilder() assert builder.content_type is None builder.method = "POST" assert builder.content_type is None builder.method = "PUT" assert builder.content_type is None builder.method = "PATCH" assert builder.content_type is None builder.method = "DELETE" assert builder.content_type is None builder.method = "GET" assert builder.content_type is None builder.form["foo"] = "bar" assert builder.content_type == "application/x-www-form-urlencoded" builder.files.add_file("data", BytesIO(b"foo"), "test.txt") assert builder.content_type == "multipart/form-data" req = builder.get_request() builder.close() assert req.form["foo"] == "bar" assert req.files["data"].read() == b"foo" req.close() def test_basic_auth(): builder = EnvironBuilder(auth=("username", "password")) request = builder.get_request() auth = parse_authorization_header(request.headers["Authorization"]) assert auth.username == "username" assert auth.password == "password" def test_auth_object(): builder = EnvironBuilder( auth=Authorization("digest", {"username": "u", "password": "p"}) ) request = builder.get_request() assert request.headers["Authorization"].startswith("Digest ") def test_environ_builder_stream_switch(): d = MultiDict(dict(foo="bar", blub="blah", hu="hum")) for use_tempfile in False, True: stream, length, boundary = stream_encode_multipart( d, use_tempfile, threshold=150 ) assert isinstance(stream, BytesIO) != use_tempfile form = parse_form_data( { "wsgi.input": stream, "CONTENT_LENGTH": str(length), "CONTENT_TYPE": f'multipart/form-data; boundary="{boundary}"', } )[1] assert form == d stream.close() def test_environ_builder_unicode_file_mix(): for use_tempfile in False, True: f = FileStorage(BytesIO(rb"\N{SNOWMAN}"), "snowman.txt") d = MultiDict(dict(f=f, s="\N{SNOWMAN}")) stream, length, boundary = stream_encode_multipart( d, use_tempfile, threshold=150 ) assert isinstance(stream, BytesIO) != use_tempfile _, form, files = parse_form_data( { "wsgi.input": stream, "CONTENT_LENGTH": str(length), "CONTENT_TYPE": f'multipart/form-data; boundary="{boundary}"', } ) assert form["s"] == "\N{SNOWMAN}" assert files["f"].name == "f" assert files["f"].filename == "snowman.txt" assert files["f"].read() == rb"\N{SNOWMAN}" stream.close() files["f"].close() def test_create_environ(): env = create_environ("/foo?bar=baz", "http://example.org/") expected = { "wsgi.multiprocess": False, "wsgi.version": (1, 0), "wsgi.run_once": False, "wsgi.errors": sys.stderr, "wsgi.multithread": False, "wsgi.url_scheme": "http", "SCRIPT_NAME": "", "SERVER_NAME": "example.org", "REQUEST_METHOD": "GET", "HTTP_HOST": "example.org", "PATH_INFO": "/foo", "SERVER_PORT": "80", "SERVER_PROTOCOL": "HTTP/1.1", "QUERY_STRING": "bar=baz", } for key, value in iter(expected.items()): assert env[key] == value assert env["wsgi.input"].read(0) == b"" assert create_environ("/foo", "http://example.com/")["SCRIPT_NAME"] == "" def test_create_environ_query_string_error(): with pytest.raises(ValueError): create_environ("/foo?bar=baz", query_string={"a": "b"}) def test_builder_from_environ(): environ = create_environ( "/ㄱ", base_url="https://example.com/base", query_string={"name": "Werkzeug"}, data={"foo": "ㄴ"}, headers={"X-Foo": "ㄷ"}, ) builder = EnvironBuilder.from_environ(environ) try: new_environ = builder.get_environ() finally: builder.close() assert new_environ == environ def test_file_closing(): closed = [] class SpecialInput: def read(self, size): return "" def close(self): closed.append(self) create_environ(data={"foo": SpecialInput()}) assert len(closed) == 1 builder = EnvironBuilder() builder.files.add_file("blah", SpecialInput()) builder.close() assert len(closed) == 2 def test_follow_redirect(): env = create_environ("/", base_url="http://localhost") c = Client(redirect_with_get_app) response = c.open(environ_overrides=env, follow_redirects=True) assert response.status == "200 OK" assert response.text == "current url: http://localhost/some/redirect/" # Test that the :cls:`Client` is aware of user defined response wrappers c = Client(redirect_with_get_app) resp = c.get("/", follow_redirects=True) assert resp.status_code == 200 assert resp.text == "current url: http://localhost/some/redirect/" # test with URL other than '/' to make sure redirected URL's are correct c = Client(redirect_with_get_app) resp = c.get("/first/request", follow_redirects=True) assert resp.status_code == 200 assert resp.text == "current url: http://localhost/some/redirect/" def test_follow_local_redirect(): class LocalResponse(Response): autocorrect_location_header = False def local_redirect_app(environ, start_response): req = Request(environ) if "/from/location" in req.url: response = redirect("/to/location", Response=LocalResponse) else: response = Response(f"current path: {req.path}") return response(environ, start_response) c = Client(local_redirect_app) resp = c.get("/from/location", follow_redirects=True) assert resp.status_code == 200 assert resp.text == "current path: /to/location" @pytest.mark.parametrize( ("code", "keep"), ((302, False), (301, False), (307, True), (308, True)) ) def test_follow_redirect_body(code, keep): @Request.application def app(request): if request.url == "http://localhost/some/redirect/": assert request.method == "POST" if keep else "GET" assert request.headers["X-Foo"] == "bar" if keep: assert request.form["foo"] == "bar" else: assert not request.form return Response(f"current url: {request.url}") return redirect("http://localhost/some/redirect/", code=code) c = Client(app) response = c.post( "/", follow_redirects=True, data={"foo": "bar"}, headers={"X-Foo": "bar"} ) assert response.status_code == 200 assert response.text == "current url: http://localhost/some/redirect/" def test_follow_external_redirect(): env = create_environ("/", base_url="http://localhost") c = Client(external_redirect_demo_app) pytest.raises( RuntimeError, lambda: c.get(environ_overrides=env, follow_redirects=True) ) def test_follow_external_redirect_on_same_subdomain(): env = create_environ("/", base_url="http://example.com") c = Client(external_subdomain_redirect_demo_app, allow_subdomain_redirects=True) c.get(environ_overrides=env, follow_redirects=True) # check that this does not work for real external domains env = create_environ("/", base_url="http://localhost") pytest.raises( RuntimeError, lambda: c.get(environ_overrides=env, follow_redirects=True) ) # check that subdomain redirects fail if no `allow_subdomain_redirects` is applied c = Client(external_subdomain_redirect_demo_app) pytest.raises( RuntimeError, lambda: c.get(environ_overrides=env, follow_redirects=True) ) def test_follow_redirect_loop(): c = Client(redirect_loop_app) with pytest.raises(ClientRedirectError): c.get("/", follow_redirects=True) def test_follow_redirect_non_root_base_url(): @Request.application def app(request): if request.path == "/redirect": return redirect("done") return Response(request.path) c = Client(app) response = c.get( "/redirect", base_url="http://localhost/other", follow_redirects=True ) assert response.text == "/done" def test_follow_redirect_exhaust_intermediate(): class Middleware: def __init__(self, app): self.app = app self.active = 0 def __call__(self, environ, start_response): # Test client must exhaust response stream, otherwise the # cleanup code that decrements this won't have run by the # time the next request is started. assert not self.active self.active += 1 try: yield from self.app(environ, start_response) finally: self.active -= 1 app = Middleware(redirect_with_get_app) client = Client(Middleware(redirect_with_get_app)) response = client.get("/", follow_redirects=True, buffered=False) assert response.text == "current url: http://localhost/some/redirect/" assert not app.active def test_redirects_are_tracked(): @Request.application def app(request): if request.path == "/first": return redirect("/second") if request.path == "/second": return redirect("/third") return Response("done") c = Client(app) response = c.get("/first", follow_redirects=True) assert response.text == "done" assert len(response.history) == 2 assert response.history[-1].request.path == "/second" assert response.history[-1].status_code == 302 assert response.history[-1].location == "/third" assert len(response.history[-1].history) == 1 assert response.history[-1].history[-1] is response.history[-2] assert response.history[-2].request.path == "/first" assert response.history[-2].status_code == 302 assert response.history[-2].location == "/second" assert len(response.history[-2].history) == 0 def test_cookie_across_redirect(): @Request.application def app(request): if request.path == "/": return Response(request.cookies.get("auth", "out")) if request.path == "/in": rv = redirect("/") rv.set_cookie("auth", "in") return rv if request.path == "/out": rv = redirect("/") rv.delete_cookie("auth") return rv c = Client(app) assert c.get("/").text == "out" assert c.get("/in", follow_redirects=True).text == "in" assert c.get("/").text == "in" assert c.get("/out", follow_redirects=True).text == "out" assert c.get("/").text == "out" def test_path_info_script_name_unquoting(): def test_app(environ, start_response): start_response("200 OK", [("Content-Type", "text/plain")]) return [f"{environ['PATH_INFO']}\n{environ['SCRIPT_NAME']}"] c = Client(test_app) resp = c.get("/foo%40bar") assert resp.text == "/foo@bar\n" c = Client(test_app) resp = c.get("/foo%40bar", "http://localhost/bar%40baz") assert resp.text == "/foo@bar\n/bar@baz" def test_multi_value_submit(): c = Client(multi_value_post_app) data = {"field": ["val1", "val2"]} resp = c.post("/", data=data) assert resp.status_code == 200 c = Client(multi_value_post_app) data = MultiDict({"field": ["val1", "val2"]}) resp = c.post("/", data=data) assert resp.status_code == 200 def test_iri_support(): b = EnvironBuilder("/föö-bar", base_url="http://☃.net/") assert b.path == "/f%C3%B6%C3%B6-bar" assert b.base_url == "http://xn--n3h.net/" @pytest.mark.parametrize("buffered", (True, False)) @pytest.mark.parametrize("iterable", (True, False)) def test_run_wsgi_apps(buffered, iterable): leaked_data = [] def simple_app(environ, start_response): start_response("200 OK", [("Content-Type", "text/html")]) return ["Hello World!"] def yielding_app(environ, start_response): start_response("200 OK", [("Content-Type", "text/html")]) yield "Hello " yield "World!" def late_start_response(environ, start_response): yield "Hello " yield "World" start_response("200 OK", [("Content-Type", "text/html")]) yield "!" def depends_on_close(environ, start_response): leaked_data.append("harhar") start_response("200 OK", [("Content-Type", "text/html")]) class Rv: def __iter__(self): yield "Hello " yield "World" yield "!" def close(self): assert leaked_data.pop() == "harhar" return Rv() for app in (simple_app, yielding_app, late_start_response, depends_on_close): if iterable: app = iterable_middleware(app) app_iter, status, headers = run_wsgi_app(app, {}, buffered=buffered) assert status == "200 OK" assert list(headers) == [("Content-Type", "text/html")] assert "".join(app_iter) == "Hello World!" if hasattr(app_iter, "close"): app_iter.close() assert not leaked_data @pytest.mark.parametrize("buffered", (True, False)) @pytest.mark.parametrize("iterable", (True, False)) def test_lazy_start_response_empty_response_app(buffered, iterable): class app: def __init__(self, environ, start_response): self.start_response = start_response def __iter__(self): return self def __next__(self): self.start_response("200 OK", [("Content-Type", "text/html")]) raise StopIteration if iterable: app = iterable_middleware(app) app_iter, status, headers = run_wsgi_app(app, {}, buffered=buffered) assert status == "200 OK" assert list(headers) == [("Content-Type", "text/html")] assert "".join(app_iter) == "" def test_run_wsgi_app_closing_iterator(): got_close = [] class CloseIter: def __init__(self): self.iterated = False def __iter__(self): return self def close(self): got_close.append(None) def __next__(self): if self.iterated: raise StopIteration() self.iterated = True return "bar" def bar(environ, start_response): start_response("200 OK", [("Content-Type", "text/plain")]) return CloseIter() app_iter, status, headers = run_wsgi_app(bar, {}) assert status == "200 OK" assert list(headers) == [("Content-Type", "text/plain")] assert next(app_iter) == "bar" pytest.raises(StopIteration, partial(next, app_iter)) app_iter.close() assert run_wsgi_app(bar, {}, True)[0] == ["bar"] assert len(got_close) == 2 def iterable_middleware(app): """Guarantee that the app returns an iterable""" def inner(environ, start_response): rv = app(environ, start_response) class Iterable: def __iter__(self): return iter(rv) if hasattr(rv, "close"): def close(self): rv.close() return Iterable() return inner def test_multiple_cookies(): @Request.application def test_app(request): response = Response(repr(sorted(request.cookies.items()))) response.set_cookie("test1", b"foo") response.set_cookie("test2", b"bar") return response client = Client(test_app) resp = client.get("/") assert resp.text == "[]" resp = client.get("/") assert resp.text == repr([("test1", "foo"), ("test2", "bar")]) def test_correct_open_invocation_on_redirect(): class MyClient(Client): counter = 0 def open(self, *args, **kwargs): self.counter += 1 env = kwargs.setdefault("environ_overrides", {}) env["werkzeug._foo"] = self.counter return Client.open(self, *args, **kwargs) @Request.application def test_app(request): return Response(str(request.environ["werkzeug._foo"])) c = MyClient(test_app, response_wrapper=Response) assert c.get("/").text == "1" assert c.get("/").text == "2" assert c.get("/").text == "3" def test_correct_encoding(): req = Request.from_values("/\N{SNOWMAN}", "http://example.com/foo") assert req.script_root == "/foo" assert req.path == "/\N{SNOWMAN}" def test_full_url_requests_with_args(): base = "http://example.com/" @Request.application def test_app(request): return Response(request.args["x"]) client = Client(test_app) resp = client.get("/?x=42", base) assert resp.text == "42" resp = client.get("http://www.example.com/?x=23", base) assert resp.text == "23" def test_delete_requests_with_form(): @Request.application def test_app(request): return Response(request.form.get("x", None)) client = Client(test_app) resp = client.delete("/", data={"x": 42}) assert resp.text == "42" def test_post_with_file_descriptor(tmpdir): c = Client(Response()) f = tmpdir.join("some-file.txt") f.write("foo") with open(f.strpath) as data: resp = c.post("/", data=data) assert resp.status_code == 200 with open(f.strpath, mode="rb") as data: resp = c.post("/", data=data) assert resp.status_code == 200 def test_content_type(): @Request.application def test_app(request): return Response(request.content_type) client = Client(test_app) resp = client.get("/", data=b"testing", mimetype="text/css") assert resp.text == "text/css; charset=utf-8" resp = client.get("/", data=b"testing", mimetype="application/octet-stream") assert resp.text == "application/octet-stream" def test_raw_request_uri(): @Request.application def app(request): path_info = request.path request_uri = request.environ["REQUEST_URI"] return Response("\n".join((path_info, request_uri))) client = Client(app) response = client.get("/hello%2fworld") data = response.text assert data == "/hello/world\n/hello%2fworld" response = client.get("/?a=b") assert response.text == "/\n/?a=b" response = client.get("/%3f?") # escaped ? in path, and empty query string assert response.text == "/?\n/%3f?" def no_response_headers_app(environ, start_response): """A WSGI application which returns a resposne with no headers.""" response = Response("Response") response.headers.clear() return response(environ, start_response) def test_no_content_type_header_addition(): c = Client(no_response_headers_app) response = c.open() assert response.headers == Headers([("Content-Length", "8")])