Files
patroni/tests/test_utils.py
Alexander Kukushkin 76d1b4cfd8 Minor fixes (#808)
* Use `shutil.move` instead of `os.replace`, which is available only from 3.3
*  Introduce standby-leader health-check and consul service
* Improve unit tests, some lines were not covered
* rename `assertEquals` -> `assertEqual`, due to deprecation warning
2018-09-19 16:32:33 +02:00

59 lines
1.6 KiB
Python

import unittest
from mock import Mock, patch
from patroni.exceptions import PatroniException
from patroni.utils import Retry, RetryFailedError, polling_loop
class TestUtils(unittest.TestCase):
def test_polling_loop(self):
self.assertEqual(list(polling_loop(0.001, interval=0.001)), [0])
@patch('time.sleep', Mock())
class TestRetrySleeper(unittest.TestCase):
@staticmethod
def _fail(times=1):
scope = dict(times=0)
def inner():
if scope['times'] >= times:
pass
else:
scope['times'] += 1
raise PatroniException('Failed!')
return inner
def test_reset(self):
retry = Retry(delay=0, max_tries=2)
retry(self._fail())
self.assertEqual(retry._attempts, 1)
retry.reset()
self.assertEqual(retry._attempts, 0)
def test_too_many_tries(self):
retry = Retry(delay=0)
self.assertRaises(RetryFailedError, retry, self._fail(times=999))
self.assertEqual(retry._attempts, 1)
def test_maximum_delay(self):
retry = Retry(delay=10, max_tries=100)
retry(self._fail(times=10))
self.assertTrue(retry._cur_delay < 4000, retry._cur_delay)
# gevent's sleep function is picky about the type
self.assertEqual(type(retry._cur_delay), float)
def test_deadline(self):
retry = Retry(deadline=0.0001)
self.assertRaises(RetryFailedError, retry, self._fail(times=100))
def test_copy(self):
def _sleep(t):
pass
retry = Retry(sleep_func=_sleep)
rcopy = retry.copy()
self.assertTrue(rcopy.sleep_func is _sleep)