python-requests-toolbelt/tests/threaded/test_thread.py

138 lines
4.8 KiB
Python

"""Module containing the tests for requests_toolbelt.threaded.thread."""
try:
import queue # Python 3
except ImportError:
import Queue as queue
import threading
import unittest
import uuid
try:
from unittest import mock
except ImportError:
import mock
import requests.exceptions
from requests_toolbelt.threaded import thread
def _make_mocks():
return (mock.MagicMock() for _ in range(4))
def _initialize_a_session_thread(session=None, job_queue=None,
response_queue=None, exception_queue=None):
if job_queue is None:
job_queue = queue.Queue()
with mock.patch.object(threading, 'Thread') as Thread:
thread_instance = mock.MagicMock()
Thread.return_value = thread_instance
st = thread.SessionThread(
initialized_session=session,
job_queue=job_queue,
response_queue=response_queue,
exception_queue=exception_queue,
)
return (st, thread_instance, Thread)
class TestSessionThread(unittest.TestCase):
"""Tests for requests_toolbelt.threaded.thread.SessionThread."""
def test_thread_initialization(self):
"""Test the way a SessionThread is initialized.
We want to ensure that we creat a thread with a name generated by the
uuid module, and that we pass the right method to use as a target.
"""
with mock.patch.object(uuid, 'uuid4', return_value='test'):
(st, thread_instance, Thread) = _initialize_a_session_thread()
Thread.assert_called_once_with(target=st._make_request, name='test')
assert thread_instance.daemon is True
assert thread_instance._state is 0
thread_instance.start.assert_called_once_with()
def test_is_alive_proxies_to_worker(self):
"""Test that we proxy the is_alive method to the Thread."""
job_queue = queue.Queue()
with mock.patch.object(threading, 'Thread') as Thread:
thread_instance = mock.MagicMock()
Thread.return_value = thread_instance
st = thread.SessionThread(None, job_queue, None, None)
st.is_alive()
thread_instance.is_alive.assert_called_once_with()
def test_join_proxies_to_worker(self):
"""Test that we proxy the join method to the Thread."""
st, thread_instance, _ = _initialize_a_session_thread()
st.join()
thread_instance.join.assert_called_once_with()
def test_handle_valid_request(self):
"""Test that a response is added to the right queue."""
session, job_queue, response_queue, exception_queue = _make_mocks()
response = mock.MagicMock()
session.request.return_value = response
st, _, _ = _initialize_a_session_thread(
session, job_queue, response_queue, exception_queue)
st._handle_request({'method': 'GET', 'url': 'http://example.com'})
session.request.assert_called_once_with(
method='GET',
url='http://example.com'
)
response_queue.put.assert_called_once_with(
({'method': 'GET', 'url': 'http://example.com'}, response)
)
assert exception_queue.put.called is False
assert job_queue.get.called is False
assert job_queue.get_nowait.called is False
assert job_queue.get_nowait.called is False
assert job_queue.task_done.called is True
def test_handle_invalid_request(self):
"""Test that exceptions from requests are added to the right queue."""
session, job_queue, response_queue, exception_queue = _make_mocks()
exception = requests.exceptions.InvalidURL()
def _side_effect(*args, **kwargs):
raise exception
# Make the request raise an exception
session.request.side_effect = _side_effect
st, _, _ = _initialize_a_session_thread(
session, job_queue, response_queue, exception_queue)
st._handle_request({'method': 'GET', 'url': 'http://example.com'})
session.request.assert_called_once_with(
method='GET',
url='http://example.com'
)
exception_queue.put.assert_called_once_with(
({'method': 'GET', 'url': 'http://example.com'}, exception)
)
assert response_queue.put.called is False
assert job_queue.get.called is False
assert job_queue.get_nowait.called is False
assert job_queue.get_nowait.called is False
assert job_queue.task_done.called is True
def test_make_request(self):
"""Test that _make_request exits when the queue is Empty."""
job_queue = next(_make_mocks())
job_queue.get_nowait.side_effect = queue.Empty()
st, _, _ = _initialize_a_session_thread(job_queue=job_queue)
st._make_request()
job_queue.get_nowait.assert_called_once_with()