Files
patroni/tests/test_async_executor.py
Alexander Kukushkin 0e01bb33bb Improve patronictl reinit (#576)
Make it possible to cancel a running task if you want to reinitialize replica.
There are two possible ways to trigger it:
1. patronictl will ask whether you want to cancel already running task if an attempt to trigger reinitialize has failed
2. if you are using `--force` argument with `patronictl reinit`
2018-01-04 10:31:44 +01:00

33 lines
756 B
Python

import unittest
from mock import Mock, patch
from patroni.async_executor import AsyncExecutor, CriticalTask
from threading import Thread
class TestAsyncExecutor(unittest.TestCase):
def setUp(self):
self.a = AsyncExecutor(Mock(), Mock())
@patch.object(Thread, 'start', Mock())
def test_run_async(self):
self.a.run_async(Mock(return_value=True))
def test_run(self):
self.a.run(Mock(side_effect=Exception()))
def test_cancel(self):
self.a.cancel()
self.a.schedule('foo')
self.a.cancel()
self.a.run(Mock())
class TestCriticalTask(unittest.TestCase):
def test_completed_task(self):
ct = CriticalTask()
ct.complete(1)
self.assertFalse(ct.cancel())