diff --git a/splitio/client/__init__.py b/splitio/client/__init__.py index 6a207a7b..e1093fae 100644 --- a/splitio/client/__init__.py +++ b/splitio/client/__init__.py @@ -2,10 +2,10 @@ from harness_commons.engine.impressions.strategies import StrategyNoneMode, StrategyDebugMode, StrategyOptimizedMode from harness_commons.engine.impressions.adapters import InMemorySenderAdapter, RedisSenderAdapter, PluggableSenderAdapter, RedisSenderAdapterAsync, \ InMemorySenderAdapterAsync, PluggableSenderAdapterAsync -from splitio.tasks.unique_keys_sync import UniqueKeysSyncTask, ClearFilterSyncTask, UniqueKeysSyncTaskAsync, ClearFilterSyncTaskAsync +from harness_commons.tasks.unique_keys_sync import UniqueKeysSyncTask, ClearFilterSyncTask, UniqueKeysSyncTaskAsync, ClearFilterSyncTaskAsync from harness_commons.sync.unique_keys import UniqueKeysSynchronizer, ClearFilterSynchronizer, UniqueKeysSynchronizerAsync, ClearFilterSynchronizerAsync from harness_commons.sync.impression import ImpressionsCountSynchronizer, ImpressionsCountSynchronizerAsync -from splitio.tasks.impressions_sync import ImpressionsCountSyncTask, ImpressionsCountSyncTaskAsync +from harness_commons.tasks.impressions_sync import ImpressionsCountSyncTask, ImpressionsCountSyncTaskAsync def set_classes(storage_mode, impressions_mode, api_adapter, imp_counter, unique_keys_tracker, prefix=None): """ @@ -27,10 +27,10 @@ def set_classes(storage_mode, impressions_mode, api_adapter, imp_counter, unique :return: tuple of classes instances. :rtype: (harness_commons.sync.unique_keys.UniqueKeysSynchronizer, harness_commons.sync.unique_keys.ClearFilterSynchronizer, - splitio.tasks.unique_keys_sync.UniqueKeysTask, - splitio.tasks.unique_keys_sync.ClearFilterTask, + harness_commons.tasks.unique_keys_sync.UniqueKeysTask, + harness_commons.tasks.unique_keys_sync.ClearFilterTask, harness_commons.sync.impressions_sync.ImpressionsCountSynchronizer, - splitio.tasks.impressions_sync.ImpressionsCountSyncTask, + harness_commons.tasks.impressions_sync.ImpressionsCountSyncTask, splitio.engine.impressions.strategies.StrategyNoneMode/splitio.engine.impressions.strategies.StrategyDebugMode/splitio.engine.impressions.strategies.StrategyOptimizedMode) """ unique_keys_synchronizer = None @@ -92,10 +92,10 @@ def set_classes_async(storage_mode, impressions_mode, api_adapter, imp_counter, :return: tuple of classes instances. :rtype: (harness_commons.sync.unique_keys.UniqueKeysSynchronizerAsync, harness_commons.sync.unique_keys.ClearFilterSynchronizerAsync, - splitio.tasks.unique_keys_sync.UniqueKeysTaskAsync, - splitio.tasks.unique_keys_sync.ClearFilterTaskAsync, + harness_commons.tasks.unique_keys_sync.UniqueKeysTaskAsync, + harness_commons.tasks.unique_keys_sync.ClearFilterTaskAsync, harness_commons.sync.impressions_sync.ImpressionsCountSynchronizerAsync, - splitio.tasks.impressions_sync.ImpressionsCountSyncTaskAsync, + harness_commons.tasks.impressions_sync.ImpressionsCountSyncTaskAsync, splitio.engine.impressions.strategies.StrategyNoneMode/splitio.engine.impressions.strategies.StrategyDebugMode/splitio.engine.impressions.strategies.StrategyOptimizedMode) """ unique_keys_synchronizer = None diff --git a/splitio/client/client.py b/splitio/client/client.py index 011f618c..42af3ab1 100644 --- a/splitio/client/client.py +++ b/splitio/client/client.py @@ -12,7 +12,7 @@ from harness_commons.models.events import Event, EventWrapper, SdkEvent from harness_commons.models.telemetry import get_latency_bucket_index, MethodExceptionsAndLatencies from splitio.optional.loaders import asyncio -from splitio.util.time import get_current_epoch_time_ms, utctime_ms +from harness_commons.util.time import get_current_epoch_time_ms, utctime_ms _LOGGER = logging.getLogger(__name__) diff --git a/splitio/client/factory.py b/splitio/client/factory.py index 9735992d..fc8d3d3e 100644 --- a/splitio/client/factory.py +++ b/splitio/client/factory.py @@ -59,15 +59,15 @@ from harness_commons.api.events import EventsAPI, EventsAPIAsync from harness_commons.api.auth import AuthAPI, AuthAPIAsync from harness_commons.api.telemetry import TelemetryAPI, TelemetryAPIAsync -from splitio.util.time import get_current_epoch_time_ms +from harness_commons.util.time import get_current_epoch_time_ms from splitio.spec import SPEC_VERSION # Tasks from splitio.tasks.split_sync import SplitSynchronizationTask, SplitSynchronizationTaskAsync -from splitio.tasks.segment_sync import SegmentSynchronizationTask, SegmentSynchronizationTaskAsync -from splitio.tasks.impressions_sync import ImpressionsSyncTask, ImpressionsCountSyncTask,\ +from harness_commons.tasks.segment_sync import SegmentSynchronizationTask, SegmentSynchronizationTaskAsync +from harness_commons.tasks.impressions_sync import ImpressionsSyncTask, ImpressionsCountSyncTask,\ ImpressionsCountSyncTaskAsync, ImpressionsSyncTaskAsync -from splitio.tasks.events_sync import EventsSyncTask, EventsSyncTaskAsync +from harness_commons.tasks.events_sync import EventsSyncTask, EventsSyncTaskAsync from harness_commons.tasks.telemetry_sync import TelemetrySyncTask, TelemetrySyncTaskAsync # Synchronizer diff --git a/splitio/tasks/__init__.py b/splitio/tasks/__init__.py deleted file mode 100644 index 10c405e5..00000000 --- a/splitio/tasks/__init__.py +++ /dev/null @@ -1,31 +0,0 @@ -"""Split synchronization tasks module.""" - -import abc - - -class BaseSynchronizationTask(object): - """Syncrhonization task interface.""" - - __metadata__ = abc.ABCMeta - - @abc.abstractmethod - def start(self): - """Start the task.""" - pass - - @abc.abstractmethod - def stop(self, event=None): - """ - Stop the task if running. - - Optionally accept an event to be set when the task finally stops. - - :param event: Event to be set as soon as the task finishes. - :type event: Threading.Event - """ - pass - - @abc.abstractmethod - def is_running(self): - """Return true if the task is running, false otherwise.""" - pass diff --git a/splitio/tasks/events_sync.py b/splitio/tasks/events_sync.py deleted file mode 100644 index 242b8e78..00000000 --- a/splitio/tasks/events_sync.py +++ /dev/null @@ -1,76 +0,0 @@ -"""Events syncrhonization task.""" -import logging - -from splitio.tasks import BaseSynchronizationTask -from splitio.tasks.util.asynctask import AsyncTask, AsyncTaskAsync - - -_LOGGER = logging.getLogger(__name__) - - -class EventsSyncTaskBase(BaseSynchronizationTask): - """Events synchronization task base uses an asynctask.AsyncTask to send events.""" - - def start(self): - """Start executing the events synchronization task.""" - self._task.start() - - def stop(self, event=None): - """Stop executing the events synchronization task.""" - pass - - def flush(self): - """Flush events in storage.""" - _LOGGER.debug('Forcing flush execution for events') - self._task.force_execution() - - def is_running(self): - """ - Return whether the task is running or not. - - :return: True if the task is running. False otherwise. - :rtype: bool - """ - return self._task.running() - - -class EventsSyncTask(EventsSyncTaskBase): - """Events synchronization task uses an asynctask.AsyncTask to send events.""" - - def __init__(self, synchronize_events, period): - """ - Class constructor. - - :param synchronize_events: Events Api object to send data to the backend - :type synchronize_events: harness_commons.api.events.EventsAPI - :param period: How many seconds to wait between subsequent event pushes to the BE. - :type period: int - - """ - self._period = period - self._task = AsyncTask(synchronize_events, self._period, on_stop=synchronize_events) - - def stop(self, event=None): - """Stop executing the events synchronization task.""" - self._task.stop(event) - - -class EventsSyncTaskAsync(EventsSyncTaskBase): - """Events synchronization task uses an asynctask.AsyncTaskAsync to send events.""" - - def __init__(self, synchronize_events, period): - """ - Class constructor. - - :param synchronize_events: Events Api object to send data to the backend - :type synchronize_events: harness_commons.api.events.EventsAPIAsync - :param period: How many seconds to wait between subsequent event pushes to the BE. - :type period: int - - """ - self._period = period - self._task = AsyncTaskAsync(synchronize_events, self._period, on_stop=synchronize_events) - - async def stop(self, event=None): - """Stop executing the events synchronization task.""" - await self._task.stop(True) diff --git a/splitio/tasks/impressions_sync.py b/splitio/tasks/impressions_sync.py deleted file mode 100644 index 195bdbdf..00000000 --- a/splitio/tasks/impressions_sync.py +++ /dev/null @@ -1,141 +0,0 @@ -"""Impressions syncrhonization task.""" -import logging - -from splitio.tasks import BaseSynchronizationTask -from splitio.tasks.util.asynctask import AsyncTask, AsyncTaskAsync - - -_LOGGER = logging.getLogger(__name__) - - -class ImpressionsSyncTaskBase(BaseSynchronizationTask): - """Impressions synchronization task uses an asynctask.AsyncTask to send impressions.""" - - def start(self): - """Start executing the impressions synchronization task.""" - self._task.start() - - def stop(self, event=None): - """Stop executing the impressions synchronization task.""" - pass - - def is_running(self): - """ - Return whether the task is running or not. - - :return: True if the task is running. False otherwise. - :rtype: bool - """ - return self._task.running() - - def flush(self): - """Flush impressions in storage.""" - _LOGGER.debug('Forcing flush execution for impressions') - self._task.force_execution() - - -class ImpressionsSyncTask(ImpressionsSyncTaskBase): - """Impressions synchronization task uses an asynctask.AsyncTask to send impressions.""" - - def __init__(self, synchronize_impressions, period): - """ - Class constructor. - - :param synchronize_impressions: sender - :type synchronize_impressions: func - :param period: How many seconds to wait between subsequent impressions pushes to the BE. - :type period: int - - """ - self._period = period - self._task = AsyncTask(synchronize_impressions, self._period, - on_stop=synchronize_impressions) - - def stop(self, event=None): - """Stop executing the impressions synchronization task.""" - self._task.stop(event) - - -class ImpressionsSyncTaskAsync(ImpressionsSyncTaskBase): - """Impressions synchronization task uses an asynctask.AsyncTask to send impressions.""" - - def __init__(self, synchronize_impressions, period): - """ - Class constructor. - - :param synchronize_impressions: sender - :type synchronize_impressions: func - :param period: How many seconds to wait between subsequent impressions pushes to the BE. - :type period: int - - """ - self._period = period - self._task = AsyncTaskAsync(synchronize_impressions, self._period, - on_stop=synchronize_impressions) - - async def stop(self, event=None): - """Stop executing the impressions synchronization task.""" - await self._task.stop(True) - - -class ImpressionsCountSyncTaskBase(BaseSynchronizationTask): - """Impressions synchronization task uses an asynctask.AsyncTask to send impressions.""" - - _PERIOD = 1800 # 30 * 60 # 30 minutes - - def start(self): - """Start executing the impressions synchronization task.""" - self._task.start() - - def stop(self, event=None): - """Stop executing the impressions synchronization task.""" - pass - - def is_running(self): - """ - Return whether the task is running or not. - - :return: True if the task is running. False otherwise. - :rtype: bool - """ - return self._task.running() - - def flush(self): - """Flush impressions in storage.""" - self._task.force_execution() - - -class ImpressionsCountSyncTask(ImpressionsCountSyncTaskBase): - """Impressions synchronization task uses an asynctask.AsyncTask to send impressions.""" - - def __init__(self, synchronize_counters): - """ - Class constructor. - - :param synchronize_counters: Handler - :type synchronize_counters: func - - """ - self._task = AsyncTask(synchronize_counters, self._PERIOD, on_stop=synchronize_counters) - - def stop(self, event=None): - """Stop executing the impressions synchronization task.""" - self._task.stop(event) - - -class ImpressionsCountSyncTaskAsync(ImpressionsCountSyncTaskBase): - """Impressions synchronization task uses an asynctask.AsyncTask to send impressions.""" - - def __init__(self, synchronize_counters): - """ - Class constructor. - - :param synchronize_counters: Handler - :type synchronize_counters: func - - """ - self._task = AsyncTaskAsync(synchronize_counters, self._PERIOD, on_stop=synchronize_counters) - - async def stop(self): - """Stop executing the impressions synchronization task.""" - await self._task.stop(True) diff --git a/splitio/tasks/segment_sync.py b/splitio/tasks/segment_sync.py deleted file mode 100644 index 55238634..00000000 --- a/splitio/tasks/segment_sync.py +++ /dev/null @@ -1,65 +0,0 @@ -"""Segment syncrhonization module.""" - -import logging -from splitio.tasks import BaseSynchronizationTask -from splitio.tasks.util import asynctask - - -_LOGGER = logging.getLogger(__name__) - - -class SegmentSynchronizationTaskBase(BaseSynchronizationTask): - """Segment Syncrhonization base class.""" - - def start(self): - """Start segment synchronization.""" - self._task.start() - - def stop(self, event=None): - """Stop segment synchronization.""" - pass - - def is_running(self): - """ - Return whether the task is running or not. - - :return: True if the task is running. False otherwise. - :rtype: bool - """ - return self._task.running() - - -class SegmentSynchronizationTask(SegmentSynchronizationTaskBase): - """Segment Syncrhonization class.""" - - def __init__(self, synchronize_segments, period): - """ - Clas constructor. - - :param synchronize_segments: handler for syncing segments - :type synchronize_segments: func - - """ - self._task = asynctask.AsyncTask(synchronize_segments, period, on_init=None) - - def stop(self, event=None): - """Stop segment synchronization.""" - self._task.stop(event) - - -class SegmentSynchronizationTaskAsync(SegmentSynchronizationTaskBase): - """Segment Syncrhonization async class.""" - - def __init__(self, synchronize_segments, period): - """ - Clas constructor. - - :param synchronize_segments: handler for syncing segments - :type synchronize_segments: func - - """ - self._task = asynctask.AsyncTaskAsync(synchronize_segments, period, on_init=None) - - async def stop(self): - """Stop segment synchronization.""" - await self._task.stop(True) diff --git a/splitio/tasks/split_sync.py b/splitio/tasks/split_sync.py index 0752bdbc..2267b1b3 100644 --- a/splitio/tasks/split_sync.py +++ b/splitio/tasks/split_sync.py @@ -1,8 +1,8 @@ """Split Synchronization task.""" import logging -from splitio.tasks import BaseSynchronizationTask -from splitio.tasks.util.asynctask import AsyncTask, AsyncTaskAsync +from harness_commons.tasks import BaseSynchronizationTask +from harness_commons.tasks.util.asynctask import AsyncTask, AsyncTaskAsync _LOGGER = logging.getLogger(__name__) @@ -32,17 +32,17 @@ def is_running(self): class SplitSynchronizationTask(SplitSynchronizationTaskBase): """Split Synchronization task class.""" - def __init__(self, synchronize_splits, period): + def __init__(self, synchronize_definitions, period): """ Class constructor. - :param synchronize_splits: Handler - :type synchronize_splits: func + :param synchronize_definitions: Handler + :type synchronize_definitions: func :param period: Period of task :type period: int """ self._period = period - self._task = AsyncTask(synchronize_splits, period, on_init=None) + self._task = AsyncTask(synchronize_definitions, period, on_init=None) def stop(self, event=None): """Stop the task. Accept an optional event to set when the task has finished.""" @@ -52,17 +52,17 @@ def stop(self, event=None): class SplitSynchronizationTaskAsync(SplitSynchronizationTaskBase): """Split Synchronization async task class.""" - def __init__(self, synchronize_splits, period): + def __init__(self, synchronize_definitions, period): """ Class constructor. - :param synchronize_splits: Handler - :type synchronize_splits: func + :param synchronize_definitions: Handler + :type synchronize_definitions: func :param period: Period of task :type period: int """ self._period = period - self._task = AsyncTaskAsync(synchronize_splits, period, on_init=None) + self._task = AsyncTaskAsync(synchronize_definitions, period, on_init=None) async def stop(self, event=None): """Stop the task. Accept an optional event to set when the task has finished.""" diff --git a/splitio/tasks/unique_keys_sync.py b/splitio/tasks/unique_keys_sync.py deleted file mode 100644 index 9ba81253..00000000 --- a/splitio/tasks/unique_keys_sync.py +++ /dev/null @@ -1,137 +0,0 @@ -"""Impressions syncrhonization task.""" -import logging - -from splitio.tasks import BaseSynchronizationTask -from splitio.tasks.util.asynctask import AsyncTask, AsyncTaskAsync - - -_LOGGER = logging.getLogger(__name__) -_UNIQUE_KEYS_SYNC_PERIOD = 15 * 60 # 15 minutes -_CLEAR_FILTER_SYNC_PERIOD = 60 * 60 * 24 # 24 hours - - -class UniqueKeysSyncTaskBase(BaseSynchronizationTask): - """Unique Keys synchronization task uses an asynctask.AsyncTask to send MTKs.""" - - def start(self): - """Start executing the unique keys synchronization task.""" - self._task.start() - - def stop(self, event=None): - """Stop executing the unique keys synchronization task.""" - pass - - def is_running(self): - """ - Return whether the task is running or not. - - :return: True if the task is running. False otherwise. - :rtype: bool - """ - return self._task.running() - - def flush(self): - """Flush unique keys.""" - _LOGGER.debug('Forcing flush execution for unique keys') - self._task.force_execution() - - -class UniqueKeysSyncTask(UniqueKeysSyncTaskBase): - """Unique Keys synchronization task uses an asynctask.AsyncTask to send MTKs.""" - - def __init__(self, synchronize_unique_keys, period = _UNIQUE_KEYS_SYNC_PERIOD): - """ - Class constructor. - - :param synchronize_unique_keys: sender - :type synchronize_unique_keys: func - :param period: How many seconds to wait between subsequent unique keys pushes to the BE. - :type period: int - """ - self._task = AsyncTask(synchronize_unique_keys, period, - on_stop=synchronize_unique_keys) - - def stop(self, event=None): - """Stop executing the unique keys synchronization task.""" - self._task.stop(event) - - -class UniqueKeysSyncTaskAsync(UniqueKeysSyncTaskBase): - """Unique Keys synchronization task uses an asynctask.AsyncTask to send MTKs.""" - - def __init__(self, synchronize_unique_keys, period = _UNIQUE_KEYS_SYNC_PERIOD): - """ - Class constructor. - - :param synchronize_unique_keys: sender - :type synchronize_unique_keys: func - :param period: How many seconds to wait between subsequent unique keys pushes to the BE. - :type period: int - """ - self._task = AsyncTaskAsync(synchronize_unique_keys, period, - on_stop=synchronize_unique_keys) - - async def stop(self): - """Stop executing the unique keys synchronization task.""" - await self._task.stop(True) - - -class ClearFilterSyncTaskBase(BaseSynchronizationTask): - """Unique Keys synchronization task uses an asynctask.AsyncTask to send MTKs.""" - - def start(self): - """Start executing the unique keys synchronization task.""" - self._task.start() - - def stop(self, event=None): - """Stop executing the unique keys synchronization task.""" - pass - - def is_running(self): - """ - Return whether the task is running or not. - - :return: True if the task is running. False otherwise. - :rtype: bool - """ - return self._task.running() - - -class ClearFilterSyncTask(ClearFilterSyncTaskBase): - """Unique Keys synchronization task uses an asynctask.AsyncTask to send MTKs.""" - - def __init__(self, clear_filter, period = _CLEAR_FILTER_SYNC_PERIOD): - """ - Class constructor. - - :param synchronize_unique_keys: sender - :type synchronize_unique_keys: func - :param period: How many seconds to wait between subsequent clearing of bloom filter - :type period: int - """ - self._task = AsyncTask(clear_filter, period, - on_stop=clear_filter) - - def stop(self, event=None): - """Stop executing the unique keys synchronization task.""" - self._task.stop(event) - - -class ClearFilterSyncTaskAsync(ClearFilterSyncTaskBase): - """Unique Keys synchronization task uses an asynctask.AsyncTask to send MTKs.""" - - def __init__(self, clear_filter, period = _CLEAR_FILTER_SYNC_PERIOD): - """ - Class constructor. - - :param synchronize_unique_keys: sender - :type synchronize_unique_keys: func - :param period: How many seconds to wait between subsequent clearing of bloom filter - :type period: int - """ - self._task = AsyncTaskAsync(clear_filter, period, - on_stop=clear_filter) - - async def stop(self): - """Stop executing the unique keys synchronization task.""" - await self._task.stop(True) diff --git a/splitio/tasks/util/__init__.py b/splitio/tasks/util/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/splitio/tasks/util/asynctask.py b/splitio/tasks/util/asynctask.py deleted file mode 100644 index a772b2d7..00000000 --- a/splitio/tasks/util/asynctask.py +++ /dev/null @@ -1,320 +0,0 @@ -"""Asynchronous tasks that can be controlled.""" -import threading -import logging -import queue -from splitio.optional.loaders import asyncio - -__TASK_STOP__ = 0 -__TASK_FORCE_RUN__ = 1 - -_LOGGER = logging.getLogger(__name__) - -def _safe_run(func): - """ - Execute a function wrapped in a try-except block. - - If anything goes wrong returns false instead of propagating the exception. - - :param func: Function to be executed, receives no arguments and it's return - value is ignored. - """ - try: - func() - return True - except Exception: # pylint: disable=broad-except - # Catch any exception that might happen to avoid the periodic task - # from ending and allowing for a recovery, as well as preventing - # an exception from propagating and breaking the main thread - _LOGGER.error('Something went wrong when running passed function.') - _LOGGER.debug('Original traceback:', exc_info=True) - return False - -async def _safe_run_async(func): - """ - Execute a function wrapped in a try-except block. - - If anything goes wrong returns false instead of propagating the exception. - - :param func: Function to be executed, receives no arguments and it's return - value is ignored. - """ - try: - await func() - return True - except Exception: # pylint: disable=broad-except - # Catch any exception that might happen to avoid the periodic task - # from ending and allowing for a recovery, as well as preventing - # an exception from propagating and breaking the main thread - _LOGGER.error('Something went wrong when running passed function.') - _LOGGER.debug('Original traceback:', exc_info=True) - return False - - -class AsyncTask(object): # pylint: disable=too-many-instance-attributes - """ - Asyncrhonous controllable task class. - - This class creates is used to wrap around a function to treat it as a - periodic task. This task can be stopped, it's execution can be forced, and - it's status (whether it's running or not) can be obtained from the task - object. - It also allows for "on init" and "on stop" functions to be passed. - """ - - def __init__(self, main, period, on_init=None, on_stop=None): - """ - Class constructor. - - :param main: Main function to be executed periodically - :type main: callable - :param period: How many seconds to wait between executions - :type period: int - :param on_init: Function to be executed ONCE before the main one - :type on_init: callable - :param on_stop: Function to be executed ONCE after the task has finished - :type on_stop: callable - """ - self._on_init = on_init - self._main = main - self._on_stop = on_stop - self._period = period - self._messages = queue.Queue() - self._running = False - self._thread = None - self._stop_event = None - - def _execution_wrapper(self): - """ - Execute user defined function in separate thread. - - It will execute the "on init" hook is available. If an exception is - raised it will abort execution, otherwise it will enter an infinite - loop in which the main function is executed every seconds. - After stop has been called the "on stop" hook will be invoked if - available. - - All custom functions are run within a _safe_run() function which - prevents exceptions from being propagated. - """ - try: - if self._on_init is not None: - if not _safe_run(self._on_init): - _LOGGER.error("Error running task initialization function, aborting execution") - self._running = False - return - self._running = True - while True: - try: - msg = self._messages.get(True, self._period) - if msg == __TASK_STOP__: - _LOGGER.debug("Stop signal received. finishing task execution") - break - elif msg == __TASK_FORCE_RUN__: - _LOGGER.debug("Force execution signal received. Running now") - if not _safe_run(self._main): - _LOGGER.error("An error occurred when executing the task. " - "Retrying after period expires") - continue - except queue.Empty: - # If no message was received, the timeout has expired - # and we're ready for a new execution - pass - - if not _safe_run(self._main): - _LOGGER.error( - "An error occurred when executing the task. " - "Retrying after period expires" - ) - finally: - self._cleanup() - - def _cleanup(self): - """Execute on_stop callback, set event if needed, update status.""" - if self._on_stop is not None: - if not _safe_run(self._on_stop): - _LOGGER.error("An error occurred when executing the task's OnStop hook. ") - - self._running = False - - if self._stop_event is not None: - self._stop_event.set() - - def start(self): - """Start the async task.""" - if self._running: - _LOGGER.warning("Task is already running. Ignoring .start() call") - return - - # Start execution - self._thread = threading.Thread(target=self._execution_wrapper, - name='AsyncTask::' + getattr(self._main, '__name__', 'N/S'), daemon=True) - try: - self._thread.start() - - except RuntimeError: - _LOGGER.error("Couldn't create new thread for async task") - _LOGGER.debug('Error: ', exc_info=True) - - def stop(self, event=None): - """ - Send a signal to the thread in order to stop it. If the task is not running do nothing. - - Optionally accept an event to be set upon task completion. - - :param event: Event to set when the task completes. - :type event: threading.Event - """ - if event is not None: - self._stop_event = event - - if not self._running: - if self._stop_event is not None: - event.set() - return - - # Queue is of infinite size, should not raise an exception - self._messages.put(__TASK_STOP__, False) - - def force_execution(self): - """Force an execution of the task without waiting for the period to end.""" - if not self._running: - return - # Queue is of infinite size, should not raise an exception - self._messages.put(__TASK_FORCE_RUN__, False) - - def running(self): - """Return whether the task is running or not.""" - return self._running - - -class AsyncTaskAsync(object): # pylint: disable=too-many-instance-attributes - """ - Asyncrhonous controllable task async class. - - This class creates is used to wrap around a function to treat it as a - periodic task. This task can be stopped, it's execution can be forced, and - it's status (whether it's running or not) can be obtained from the task - object. - It also allows for "on init" and "on stop" functions to be passed. - """ - - - def __init__(self, main, period, on_init=None, on_stop=None): - """ - Class constructor. - - :param main: Main function to be executed periodically - :type main: callable - :param period: How many seconds to wait between executions - :type period: int - :param on_init: Function to be executed ONCE before the main one - :type on_init: callable - :param on_stop: Function to be executed ONCE after the task has finished - :type on_stop: callable - """ - self._on_init = on_init - self._main = main - self._on_stop = on_stop - self._period = period - self._messages = asyncio.Queue() - self._running = False - self._completion_event = asyncio.Event() - self._sleep_task = None - - async def _execution_wrapper(self): - """ - Execute user defined function in separate thread. - - It will execute the "on init" hook is available. If an exception is - raised it will abort execution, otherwise it will enter an infinite - loop in which the main function is executed every seconds. - After stop has been called the "on stop" hook will be invoked if - available. - - All custom functions are run within a _safe_run() function which - prevents exceptions from being propagated. - """ - try: - if self._on_init is not None: - if not await _safe_run_async(self._on_init): - _LOGGER.error("Error running task initialization function, aborting execution") - self._running = False - return - self._running = True - - while self._running: - try: - msg = await asyncio.wait_for(self._messages.get(), timeout=self._period) - if msg == __TASK_STOP__: - _LOGGER.debug("Stop signal received. finishing task execution") - break - elif msg == __TASK_FORCE_RUN__: - _LOGGER.debug("Force execution signal received. Running now") - if not await _safe_run_async(self._main): - _LOGGER.error("An error occurred when executing the task. " - "Retrying after period expires") - continue - except asyncio.QueueEmpty: - # If no message was received, the timeout has expired - # and we're ready for a new execution - pass - except asyncio.CancelledError: - break - except asyncio.TimeoutError: - pass - - if not await _safe_run_async(self._main): - _LOGGER.error( - "An error occurred when executing the task. " - "Retrying after period expires" - ) - finally: - await self._cleanup() - - async def _cleanup(self): - """Execute on_stop callback, set event if needed, update status.""" - if self._on_stop is not None: - if not await _safe_run_async(self._on_stop): - _LOGGER.error("An error occurred when executing the task's OnStop hook. ") - - self._running = False - self._completion_event.set() - _LOGGER.debug("AsyncTask finished") - - def start(self): - """Start the async task.""" - if self._running: - _LOGGER.warning("Task is already running. Ignoring .start() call") - return - # Start execution - self._completion_event.clear() - asyncio.get_running_loop().create_task(self._execution_wrapper()) - - async def stop(self, wait_for_completion=False): - """ - Send a signal to the thread in order to stop it. If the task is not running do nothing. - - Optionally accept an event to be set upon task completion. - - :param event: Event to set when the task completes. - :type event: threading.Event - """ - if not self._running: - return - - # Queue is of infinite size, should not raise an exception - self._messages.put_nowait(__TASK_STOP__) - - if wait_for_completion: - await self._completion_event.wait() - - def force_execution(self): - """Force an execution of the task without waiting for the period to end.""" - if not self._running: - return - # Queue is of infinite size, should not raise an exception - self._messages.put_nowait(__TASK_FORCE_RUN__) - - def running(self): - """Return whether the task is running or not.""" - return self._running diff --git a/splitio/tasks/util/workerpool.py b/splitio/tasks/util/workerpool.py deleted file mode 100644 index 8d6c6e53..00000000 --- a/splitio/tasks/util/workerpool.py +++ /dev/null @@ -1,229 +0,0 @@ -"""Worker pool module.""" - -import logging -from threading import Thread, Event -import queue - -from splitio.optional.loaders import asyncio - -_LOGGER = logging.getLogger(__name__) - -class WorkerPool(object): - """Worker pool class to implement single producer/multiple consumer.""" - - def __init__(self, worker_count, worker_func): - """ - Class constructor. - - :param worker_count: Number of workers for the pool. - :type worker_func: Function to be executed by the workers whenever a messages is fetched. - """ - self._failed = False - self._incoming = queue.Queue() - self._should_be_working = [True for _ in range(0, worker_count)] - self._worker_events = [Event() for _ in range(0, worker_count)] - self._threads = [ - Thread(target=self._wrapper, args=(i, worker_func), name="pool_worker_%d" % i) - for i in range(0, worker_count) - ] - for thread in self._threads: - thread.daemon = True - - def start(self): - """Start the workers.""" - for thread in self._threads: - thread.start() - - @staticmethod - def _safe_run(func, message): - """ - Execute the user funcion for a given message without raising exceptions. - - :param func: User defined function. - :type func: callable - :param message: Message fetched from the queue. - :param message: object - - :return True if no everything goes well. False otherwise. - :rtype bool - """ - try: - func(message) - return True - except Exception: # pylint: disable=broad-except - _LOGGER.error("Something went wrong when processing message %s", message) - _LOGGER.debug('Original traceback: ', exc_info=True) - return False - - def _wrapper(self, worker_number, func): - """ - Fetch message, execute tasks, and acknowledge results. - - :param worker_number: # (id) of worker whose function will be executed. - :type worker_number: int - :param func: User defined function. - :type func: callable. - """ - while self._should_be_working[worker_number]: - try: - message = self._incoming.get(True, 0.5) - - # For some reason message can be None in python2 implementation of queue. - # This method must be both ignored and acknowledged with .task_done() - # otherwise .join() will halt. - if message is None: - _LOGGER.debug('spurious message received. acking and ignoring.') - self._incoming.task_done() - continue - - # If the task is successfully executed, the ack is done AFTERWARDS, - # to avoid race conditions on SDK initialization. - _LOGGER.debug("processing message '%s'", message) - ok = self._safe_run(func, message) # pylint: disable=invalid-name - if not ok: - self._failed = True - _LOGGER.error( - ("Something went wrong during the execution, " - "removing message \"%s\" from queue."), - message - ) - self._incoming.task_done() - except queue.Empty: - # No message was fetched, just keep waiting. - pass - - # Set my flag indicating that i have finished - self._worker_events[worker_number].set() - - def submit_work(self, message): - """ - Add a new message to the work-queue. - - :param message: New message to add. - :type message: object. - """ - self._incoming.put(message) - _LOGGER.debug('queued message %s for processing.', message) - - def wait_for_completion(self): - """Block until the work queue is empty.""" - _LOGGER.debug('waiting for all messages to be processed.') - self._incoming.join() - _LOGGER.debug('all messages processed.') - old = self._failed - self._failed = False - return old - - def stop(self, event=None): - """Stop all worker nodes.""" - async_stop = Thread(target=self._wait_workers_shutdown, args=(event,), daemon=True) - async_stop.start() - - def _wait_workers_shutdown(self, event): - """ - Wait until all workers have finished, and set the event. - - :param event: Event to set as soon as all the workers have shut down. - :type event: threading.Event - """ - self.wait_for_completion() - for index, _ in enumerate(self._should_be_working): - self._should_be_working[index] = False - - if event is not None: - for worker_event in self._worker_events: - worker_event.wait() - event.set() - - -class WorkerPoolAsync(object): - """Worker pool async class to implement single producer/multiple consumer.""" - - _abort = object() - - def __init__(self, worker_count, worker_func): - """ - Class constructor. - - :param worker_count: Number of workers for the pool. - :type worker_func: Function to be executed by the workers whenever a messages is fetched. - """ - self._semaphore = asyncio.Semaphore(worker_count) - self._queue = asyncio.Queue() - self._handler = worker_func - self._aborted = False - - async def _schedule_work(self): - """wrap the message handler execution.""" - while True: - message = await self._queue.get() - if message == self._abort: - self._aborted = True - return - asyncio.get_running_loop().create_task(self._do_work(message)) - - async def _do_work(self, message): - """process a single message.""" - try: - await self._semaphore.acquire() # wait until "there's a free worker" - if self._aborted: # check in case the pool was shutdown while we were waiting for a worker - return - await self._handler(message._message) - except Exception: - _LOGGER.error("Something went wrong when processing message %s", message) - _LOGGER.debug('Original traceback: ', exc_info=True) - message._failed = True - message._complete.set() - self._semaphore.release() # signal worker is idle - - def start(self): - """Start the workers.""" - asyncio.get_running_loop().create_task(self._schedule_work()) - - async def submit_work(self, jobs): - """ - Add a new message to the work-queue. - - :param message: New message to add. - :type message: object. - """ - self.jobs = jobs - if len(jobs) == 1: - wrapped = TaskCompletionWraper(next(i for i in jobs)) - await self._queue.put(wrapped) - return wrapped - - tasks = [TaskCompletionWraper(job) for job in jobs] - for w in tasks: - await self._queue.put(w) - - return BatchCompletionWrapper(tasks) - - async def stop(self, event=None): - """abort all execution (except currently running handlers).""" - await self._queue.put(self._abort) - - -class TaskCompletionWraper: - """Task completion class""" - def __init__(self, message): - self._message = message - self._complete = asyncio.Event() - self._failed = False - - async def await_completion(self): - await self._complete.wait() - return not self._failed - - def _mark_as_complete(self): - self._complete.set() - - -class BatchCompletionWrapper: - """Batch completion class""" - def __init__(self, tasks): - self._tasks = tasks - - async def await_completion(self): - await asyncio.gather(*[task.await_completion() for task in self._tasks]) - return not any(task._failed for task in self._tasks) diff --git a/splitio/util/__init__.py b/splitio/util/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/splitio/util/backoff.py b/splitio/util/backoff.py deleted file mode 100644 index f1e47324..00000000 --- a/splitio/util/backoff.py +++ /dev/null @@ -1,36 +0,0 @@ -"""Exponential Backoff duration calculator.""" - - -class Backoff(object): - """Backoff duration calculator.""" - - MAX_ALLOWED_WAIT = 30 * 60 # half an hour - - def __init__(self, base=1, max_allowed=MAX_ALLOWED_WAIT): - """ - Class constructor. - - :param base: basic unit to be multiplied on each iteration (seconds) - :param base: float - - :param max_allowed: max seconds to wait - :param max_allowed: int - """ - self._base = base - self._max_allowed = max_allowed - self._attempt = 0 - - def get(self): - """ - Return the current time to wait and pre-calculate the next one. - - :returns: time to wait until next retry. - :rtype: float - """ - to_return = min(self._base * (2 ** self._attempt), self._max_allowed) - self._attempt += 1 - return to_return - - def reset(self): - """Reset the attempt count.""" - self._attempt = 0 diff --git a/splitio/util/decorators.py b/splitio/util/decorators.py deleted file mode 100644 index e0775dea..00000000 --- a/splitio/util/decorators.py +++ /dev/null @@ -1,15 +0,0 @@ -"""Misc decorators.""" -from abc import abstractmethod - - -def abstract_property(func): - """ - Abstract property decorator. - - :param func: method to decorate - :type func: callable - - :returns: decorated function - :rtype: callable - """ - return property(abstractmethod(func)) diff --git a/splitio/util/storage_helper.py b/splitio/util/storage_helper.py deleted file mode 100644 index e7ec12ed..00000000 --- a/splitio/util/storage_helper.py +++ /dev/null @@ -1,199 +0,0 @@ -"""Storage Helper.""" -import logging -from splitio.models import splits - -_LOGGER = logging.getLogger(__name__) - -def update_definition_storage(feature_flag_storage, feature_flags, change_number, clear_storage=False): - """ - Update feature flag storage from given list of feature flags while checking the flag set logic - - :param feature_flag_storage: Feature flag storage instance - :type feature_flag_storage: splitio.storage.inmemory.InMemorySplitStorage - :param feature_flag: Feature flag instance to validate. - :type feature_flag: splitio.models.splits.Split - :param: last change number - :type: int - - :return: segments list from feature flags list - :rtype: list(str) - """ - segment_list = set() - to_add = [] - to_delete = [] - if clear_storage: - feature_flag_storage.clear() - - for feature_flag in feature_flags: - if feature_flag_storage.flag_set_filter.intersect(feature_flag.sets) and feature_flag.status == splits.Status.ACTIVE: - to_add.append(feature_flag) - segment_list.update(set(feature_flag.get_segment_names())) - else: - if feature_flag_storage.get(feature_flag.name) is not None: - to_delete.append(feature_flag.name) - - feature_flag_storage.update(to_add, to_delete, change_number) - return segment_list - -def update_rule_based_segment_storage(rule_based_segment_storage, rule_based_segments, change_number, clear_storage=False): - """ - Update rule based segment storage from given list of rule based segments - - :param rule_based_segment_storage: rule based segment storage instance - :type rule_based_segment_storage: splitio.storage.RuleBasedSegmentStorage - :param rule_based_segments: rule based segment instance to validate. - :type rule_based_segments: harness_commons.models.rule_based_segments.RuleBasedSegment - :param: last change number - :type: int - - :return: segments list from excluded segments list - :rtype: list(str) - """ - if clear_storage: - rule_based_segment_storage.clear() - - segment_list = set() - to_add = [] - to_delete = [] - for rule_based_segment in rule_based_segments: - if rule_based_segment.status == splits.Status.ACTIVE: - to_add.append(rule_based_segment) - segment_list.update(set(rule_based_segment.excluded.get_excluded_standard_segments())) - segment_list.update(rule_based_segment.get_condition_segment_names()) - else: - if rule_based_segment_storage.get(rule_based_segment.name) is not None: - to_delete.append(rule_based_segment.name) - - rule_based_segment_storage.update(to_add, to_delete, change_number) - return segment_list - -def get_standard_segment_names_in_rbs_storage(rule_based_segment_storage): - """ - Retrieve a list of all standard segments names. - - :return: Set of segment names. - :rtype: Set(str) - """ - segment_list = set() - for rb_segment in rule_based_segment_storage.get_segment_names(): - rb_segment_obj = rule_based_segment_storage.get(rb_segment) - segment_list.update(set(rb_segment_obj.excluded.get_excluded_standard_segments())) - segment_list.update(rb_segment_obj.get_condition_segment_names()) - - return segment_list - -async def update_definition_storage_async(feature_flag_storage, feature_flags, change_number, clear_storage=False): - """ - Update feature flag storage from given list of feature flags while checking the flag set logic - - :param feature_flag_storage: Feature flag storage instance - :type feature_flag_storage: splitio.storage.inmemory.InMemorySplitStorage - :param feature_flag: Feature flag instance to validate. - :type feature_flag: splitio.models.splits.Split - :param: last change number - :type: int - - :return: segments list from feature flags list - :rtype: list(str) - """ - if clear_storage: - await feature_flag_storage.clear() - - segment_list = set() - to_add = [] - to_delete = [] - for feature_flag in feature_flags: - if feature_flag_storage.flag_set_filter.intersect(feature_flag.sets) and feature_flag.status == splits.Status.ACTIVE: - to_add.append(feature_flag) - segment_list.update(set(feature_flag.get_segment_names())) - else: - if await feature_flag_storage.get(feature_flag.name) is not None: - to_delete.append(feature_flag.name) - - await feature_flag_storage.update(to_add, to_delete, change_number) - return segment_list - -async def update_rule_based_segment_storage_async(rule_based_segment_storage, rule_based_segments, change_number, clear_storage=False): - """ - Update rule based segment storage from given list of rule based segments - - :param rule_based_segment_storage: rule based segment storage instance - :type rule_based_segment_storage: splitio.storage.RuleBasedSegmentStorage - :param rule_based_segments: rule based segment instance to validate. - :type rule_based_segments: harness_commons.models.rule_based_segments.RuleBasedSegment - :param: last change number - :type: int - - :return: segments list from excluded segments list - :rtype: list(str) - """ - if clear_storage: - await rule_based_segment_storage.clear() - - segment_list = set() - to_add = [] - to_delete = [] - for rule_based_segment in rule_based_segments: - if rule_based_segment.status == splits.Status.ACTIVE: - to_add.append(rule_based_segment) - segment_list.update(set(rule_based_segment.excluded.get_excluded_standard_segments())) - segment_list.update(rule_based_segment.get_condition_segment_names()) - else: - if await rule_based_segment_storage.get(rule_based_segment.name) is not None: - to_delete.append(rule_based_segment.name) - - await rule_based_segment_storage.update(to_add, to_delete, change_number) - return segment_list - -async def get_standard_segment_names_in_rbs_storage_async(rule_based_segment_storage): - """ - Retrieve a list of all standard segments names. - - :return: Set of segment names. - :rtype: Set(str) - """ - segment_list = set() - segment_names = await rule_based_segment_storage.get_segment_names() - for rb_segment in segment_names: - rb_segment_obj = await rule_based_segment_storage.get(rb_segment) - segment_list.update(set(rb_segment_obj.excluded.get_excluded_standard_segments())) - segment_list.update(rb_segment_obj.get_condition_segment_names()) - - return segment_list - -def get_valid_flag_sets(flag_sets, flag_set_filter): - """ - Check each flag set in given array, return it if exist in a given config flag set array, if config array is empty return all - - :param flag_sets: Flag sets array - :type flag_sets: list(str) - :param config_flag_sets: Config flag sets array - :type config_flag_sets: list(str) - - :return: array of flag sets - :rtype: list(str) - """ - sets_to_fetch = [] - for flag_set in flag_sets: - if not flag_set_filter.set_exist(flag_set) and flag_set_filter.should_filter: - _LOGGER.warning("Flag set %s is not part of the configured flag set list, ignoring the request." % (flag_set)) - continue - sets_to_fetch.append(flag_set) - - return sets_to_fetch - -def combine_valid_flag_sets(result_sets): - """ - Check each flag set in given array of sets, combine all flag sets in one unique set - - :param result_sets: Flag sets set - :type flag_sets: list(set) - - :return: flag sets set - :rtype: set - """ - to_return = set() - for result_set in result_sets: - if isinstance(result_set, set) and len(result_set) > 0: - to_return.update(result_set) - return to_return \ No newline at end of file diff --git a/splitio/util/threadutil.py b/splitio/util/threadutil.py deleted file mode 100644 index 184f7186..00000000 --- a/splitio/util/threadutil.py +++ /dev/null @@ -1,51 +0,0 @@ -"""Threading utilities.""" -from threading import Event, Condition - - -class EventGroup(object): - """EventGroup that can be waited with an OR condition.""" - - class Event(Event): # pylint:disable=too-few-public-methods - """Threading event meant to be used in an group.""" - - def __init__(self, shared_condition): - """ - Construct an event. - - :param shared_condition: shared condition varaible. - :type shared_condition: threading.Condition - """ - Event.__init__(self) - self._shared_cond = shared_condition - - def set(self): - """Set the event.""" - Event.set(self) - with self._shared_cond: - self._shared_cond.notify() - - def __init__(self): - """Construct an event group.""" - self._cond = Condition() - - def make_event(self): - """ - Make a new event associated to this waitable group. - - :returns: an event that can be awaited as part of a group - :rtype: EventGroup.Event - """ - return EventGroup.Event(self._cond) - - def wait(self, timeout=None): - """ - Wait until one of the events is triggered. - - :param timeout: how many seconds to wait. None means forever. - :type timeout: int - - :returns: True if the condition was notified within the specified timeout. False otherwise. - :rtype: bool - """ - with self._cond: - return self._cond.wait(timeout) diff --git a/splitio/util/time.py b/splitio/util/time.py deleted file mode 100644 index 62743327..00000000 --- a/splitio/util/time.py +++ /dev/null @@ -1,33 +0,0 @@ -"""Utilities.""" -from datetime import datetime -import time - -EPOCH_DATETIME = datetime(1970, 1, 1) - -def utctime(): - """ - Return the utc time in nanoseconds. - - :returns: utc time in nanoseconds. - :rtype: float - """ - return (datetime.utcnow() - EPOCH_DATETIME).total_seconds() - - -def utctime_ms(): - """ - Return the utc time in milliseconds. - - :returns: utc time in milliseconds. - :rtype: int - """ - return int(utctime() * 1000) - -def get_current_epoch_time_ms(): - """ - Get current epoch time in milliseconds - - :return: epoch time - :rtype: int - """ - return int(round(time.time() * 1000)) \ No newline at end of file diff --git a/tests/client/test_factory.py b/tests/client/test_factory.py index 33e866eb..8601f2cc 100644 --- a/tests/client/test_factory.py +++ b/tests/client/test_factory.py @@ -37,7 +37,7 @@ from splitio.sync.split import SplitSynchronizer, SplitSynchronizerAsync from harness_commons.sync.segment import SegmentSynchronizer, SegmentSynchronizerAsync from splitio.storage.adapters.redis import RedisAdapter, RedisPipelineAdapter -from splitio.tasks.util import asynctask +from harness_commons.tasks.util import asynctask from tests.storage.test_pluggable import StorageMockAdapter, StorageMockAdapterAsync from tests.integration import splits_json diff --git a/tests/tasks/test_events_sync.py b/tests/tasks/test_events_sync.py deleted file mode 100644 index 2528e2f2..00000000 --- a/tests/tasks/test_events_sync.py +++ /dev/null @@ -1,89 +0,0 @@ -"""Impressions synchronization task test module.""" - -import threading -import time -import pytest - -from harness_commons.api.client import HttpResponse -from splitio.tasks import events_sync -from harness_commons.storage import EventStorage -from harness_commons.models.events import Event -from harness_commons.api.events import EventsAPI -from harness_commons.sync.event import EventSynchronizer, EventSynchronizerAsync -from splitio.optional.loaders import asyncio - - -class EventsSyncTests(object): - """Impressions Syncrhonization task test cases.""" - - def test_normal_operation(self, mocker): - """Test that the task works properly under normal circumstances.""" - storage = mocker.Mock(spec=EventStorage) - events = [ - Event('key1', 'user', 'purchase', 5.3, 123456, None), - Event('key2', 'user', 'purchase', 5.3, 123456, None), - Event('key3', 'user', 'purchase', 5.3, 123456, None), - Event('key4', 'user', 'purchase', 5.3, 123456, None), - Event('key5', 'user', 'purchase', 5.3, 123456, None), - ] - - storage.pop_many.return_value = events - api = mocker.Mock(spec=EventsAPI) - api.flush_events.return_value = HttpResponse(200, '', {}) - event_synchronizer = EventSynchronizer(api, storage, 5) - task = events_sync.EventsSyncTask(event_synchronizer.synchronize_events, 1) - task.start() - time.sleep(2) - assert task.is_running() - assert storage.pop_many.mock_calls[0] == mocker.call(5) - assert api.flush_events.mock_calls[0] == mocker.call(events) - stop_event = threading.Event() - calls_now = len(api.flush_events.mock_calls) - task.stop(stop_event) - stop_event.wait(5) - assert stop_event.is_set() - assert len(api.flush_events.mock_calls) > calls_now - - -class EventsSyncAsyncTests(object): - """Impressions Syncrhonization task async test cases.""" - - @pytest.mark.asyncio - async def test_normal_operation(self, mocker): - """Test that the task works properly under normal circumstances.""" - self.events = [ - Event('key1', 'user', 'purchase', 5.3, 123456, None), - Event('key2', 'user', 'purchase', 5.3, 123456, None), - Event('key3', 'user', 'purchase', 5.3, 123456, None), - Event('key4', 'user', 'purchase', 5.3, 123456, None), - Event('key5', 'user', 'purchase', 5.3, 123456, None), - ] - storage = mocker.Mock(spec=EventStorage) - self.called = False - async def pop_many(*args): - self.called = True - return self.events - storage.pop_many = pop_many - - api = mocker.Mock(spec=EventsAPI) - self.flushed_events = None - self.count = 0 - async def flush_events(events): - self.count += 1 - self.flushed_events = events - return HttpResponse(200, '', {}) - api.flush_events = flush_events - - event_synchronizer = EventSynchronizerAsync(api, storage, 5) - task = events_sync.EventsSyncTaskAsync(event_synchronizer.synchronize_events, 1) - task.start() - await asyncio.sleep(2) - - assert task.is_running() - assert self.called - assert self.flushed_events == self.events - - calls_now = self.count - await task.stop() - assert not task.is_running() - assert self.count > calls_now diff --git a/tests/tasks/test_impressions_sync.py b/tests/tasks/test_impressions_sync.py deleted file mode 100644 index ecb7b99c..00000000 --- a/tests/tasks/test_impressions_sync.py +++ /dev/null @@ -1,172 +0,0 @@ -"""Impressions synchronization task test module.""" - -import threading -import time -import pytest - -from harness_commons.api.client import HttpResponse -from splitio.tasks import impressions_sync -from harness_commons.storage import ImpressionStorage -from harness_commons.models.impressions import Impression -from harness_commons.api.impressions import ImpressionsAPI -from harness_commons.sync.impression import ImpressionSynchronizer, ImpressionsCountSynchronizer, ImpressionSynchronizerAsync, ImpressionsCountSynchronizerAsync -from harness_commons.engine.impressions.manager import Counter -from splitio.optional.loaders import asyncio - -class ImpressionsSyncTaskTests(object): - """Impressions Syncrhonization task test cases.""" - - def test_normal_operation(self, mocker): - """Test that the task works properly under normal circumstances.""" - storage = mocker.Mock(spec=ImpressionStorage) - impressions = [ - Impression('key1', 'split1', 'on', 'l1', 123456, 'b1', 321654, None, None), - Impression('key2', 'split1', 'on', 'l1', 123456, 'b1', 321654, None, None), - Impression('key3', 'split2', 'off', 'l1', 123456, 'b1', 321654, None, None), - Impression('key4', 'split2', 'on', 'l1', 123456, 'b1', 321654, None, None), - Impression('key5', 'split3', 'off', 'l1', 123456, 'b1', 321654, None, None) - ] - storage.pop_many.return_value = impressions - api = mocker.Mock(spec=ImpressionsAPI) - api.flush_impressions.return_value = HttpResponse(200, '', {}) - impression_synchronizer = ImpressionSynchronizer(api, storage, 5) - task = impressions_sync.ImpressionsSyncTask( - impression_synchronizer.synchronize_impressions, - 1 - ) - task.start() - time.sleep(2) - assert task.is_running() - assert storage.pop_many.mock_calls[0] == mocker.call(5) - assert api.flush_impressions.mock_calls[0] == mocker.call(impressions) - stop_event = threading.Event() - calls_now = len(api.flush_impressions.mock_calls) - task.stop(stop_event) - stop_event.wait(5) - assert stop_event.is_set() - assert len(api.flush_impressions.mock_calls) > calls_now - - -class ImpressionsSyncTaskAsyncTests(object): - """Impressions Syncrhonization task test cases.""" - - @pytest.mark.asyncio - async def test_normal_operation(self, mocker): - """Test that the task works properly under normal circumstances.""" - storage = mocker.Mock(spec=ImpressionStorage) - impressions = [ - Impression('key1', 'split1', 'on', 'l1', 123456, 'b1', 321654, None, None), - Impression('key2', 'split1', 'on', 'l1', 123456, 'b1', 321654, None, None), - Impression('key3', 'split2', 'off', 'l1', 123456, 'b1', 321654, None, None), - Impression('key4', 'split2', 'on', 'l1', 123456, 'b1', 321654, None, None), - Impression('key5', 'split3', 'off', 'l1', 123456, 'b1', 321654, None, None) - ] - self.pop_called = 0 - async def pop_many(*args): - self.pop_called += 1 - return impressions - storage.pop_many = pop_many - - api = mocker.Mock(spec=ImpressionsAPI) - self.flushed = None - self.called = 0 - async def flush_impressions(imps): - self.called += 1 - self.flushed = imps - return HttpResponse(200, '', {}) - api.flush_impressions = flush_impressions - - impression_synchronizer = ImpressionSynchronizerAsync(api, storage, 5) - task = impressions_sync.ImpressionsSyncTaskAsync( - impression_synchronizer.synchronize_impressions, - 1 - ) - task.start() - await asyncio.sleep(2) - assert task.is_running() - assert self.pop_called == 1 - assert self.flushed == impressions - - calls_now = self.called - await task.stop() - assert self.called > calls_now - - -class ImpressionsCountSyncTaskTests(object): - """Impressions Syncrhonization task test cases.""" - - def test_normal_operation(self, mocker): - """Test that the task works properly under normal circumstances.""" - counter = mocker.Mock(spec=Counter) - - counters = [ - Counter.CountPerFeature('f1', 123, 2), - Counter.CountPerFeature('f2', 123, 123), - Counter.CountPerFeature('f1', 456, 111), - Counter.CountPerFeature('f2', 456, 222) - ] - - counter.pop_all.return_value = counters - api = mocker.Mock(spec=ImpressionsAPI) - api.flush_counters.return_value = HttpResponse(200, '', {}) - impressions_sync.ImpressionsCountSyncTask._PERIOD = 1 - impression_synchronizer = ImpressionsCountSynchronizer(api, counter) - task = impressions_sync.ImpressionsCountSyncTask( - impression_synchronizer.synchronize_counters - ) - task.start() - time.sleep(2) - assert task.is_running() - assert counter.pop_all.mock_calls[0] == mocker.call() - assert api.flush_counters.mock_calls[0] == mocker.call(counters) - stop_event = threading.Event() - calls_now = len(api.flush_counters.mock_calls) - task.stop(stop_event) - stop_event.wait(5) - assert stop_event.is_set() - assert len(api.flush_counters.mock_calls) > calls_now - - -class ImpressionsCountSyncTaskAsyncTests(object): - """Impressions Syncrhonization task test cases.""" - - @pytest.mark.asyncio - async def test_normal_operation(self, mocker): - """Test that the task works properly under normal circumstances.""" - counter = mocker.Mock(spec=Counter) - counters = [ - Counter.CountPerFeature('f1', 123, 2), - Counter.CountPerFeature('f2', 123, 123), - Counter.CountPerFeature('f1', 456, 111), - Counter.CountPerFeature('f2', 456, 222) - ] - self._pop_called = 0 - def pop_all(): - self._pop_called += 1 - return counters - counter.pop_all = pop_all - - api = mocker.Mock(spec=ImpressionsAPI) - self.flushed = None - self.called = 0 - async def flush_counters(imps): - self.called += 1 - self.flushed = imps - return HttpResponse(200, '', {}) - api.flush_counters = flush_counters - - impressions_sync.ImpressionsCountSyncTaskAsync._PERIOD = 1 - impression_synchronizer = ImpressionsCountSynchronizerAsync(api, counter) - task = impressions_sync.ImpressionsCountSyncTaskAsync( - impression_synchronizer.synchronize_counters - ) - task.start() - await asyncio.sleep(2) - assert task.is_running() - - assert self._pop_called == 1 - assert self.flushed == counters - - calls_now = self.called - await task.stop() - assert self.called > calls_now diff --git a/tests/tasks/test_segment_sync.py b/tests/tasks/test_segment_sync.py deleted file mode 100644 index 4c04d68e..00000000 --- a/tests/tasks/test_segment_sync.py +++ /dev/null @@ -1,374 +0,0 @@ -"""Split syncrhonization task test module.""" - -import threading -import time -import pytest - -from harness_commons.api.commons import FetchOptions -from splitio.tasks import segment_sync -from harness_commons.storage import DefinitionStorage, SegmentStorage, RuleBasedSegmentsStorage -from splitio.models.splits import Split -from harness_commons.models.segments import Segment -from harness_commons.models.grammar.condition import Condition -from harness_commons.models.grammar.matchers import UserDefinedSegmentMatcher -from harness_commons.sync.segment import SegmentSynchronizer, SegmentSynchronizerAsync -from splitio.optional.loaders import asyncio - -class SegmentSynchronizationTests(object): - """Split synchronization task test cases.""" - - def test_normal_operation(self, mocker): - """Test the normal operation flow.""" - split_storage = mocker.Mock(spec=DefinitionStorage) - split_storage.get_segment_names.return_value = ['segmentA', 'segmentB', 'segmentC'] - rbs_storage = mocker.Mock(spec=RuleBasedSegmentsStorage) - rbs_storage.get_segment_names.return_value = [] - - # Setup a mocked segment storage whose changenumber returns -1 on first fetch and - # 123 afterwards. - storage = mocker.Mock(spec=SegmentStorage) - - def change_number_mock(segment_name): - if segment_name == 'segmentA' and change_number_mock._count_a == 0: - change_number_mock._count_a = 1 - return -1 - if segment_name == 'segmentB' and change_number_mock._count_b == 0: - change_number_mock._count_b = 1 - return -1 - if segment_name == 'segmentC' and change_number_mock._count_c == 0: - change_number_mock._count_c = 1 - return -1 - return 123 - change_number_mock._count_a = 0 - change_number_mock._count_b = 0 - change_number_mock._count_c = 0 - storage.get_change_number.side_effect = change_number_mock - - # Setup a mocked segment api to return segments mentioned before. - def fetch_segment_mock(segment_name, change_number, fetch_options): - if segment_name == 'segmentA' and fetch_segment_mock._count_a == 0: - fetch_segment_mock._count_a = 1 - return {'name': 'segmentA', 'added': ['key1', 'key2', 'key3'], 'removed': [], - 'since': -1, 'till': 123} - if segment_name == 'segmentB' and fetch_segment_mock._count_b == 0: - fetch_segment_mock._count_b = 1 - return {'name': 'segmentB', 'added': ['key4', 'key5', 'key6'], 'removed': [], - 'since': -1, 'till': 123} - if segment_name == 'segmentC' and fetch_segment_mock._count_c == 0: - fetch_segment_mock._count_c = 1 - return {'name': 'segmentC', 'added': ['key7', 'key8', 'key9'], 'removed': [], - 'since': -1, 'till': 123} - return {'added': [], 'removed': [], 'since': 123, 'till': 123} - fetch_segment_mock._count_a = 0 - fetch_segment_mock._count_b = 0 - fetch_segment_mock._count_c = 0 - - api = mocker.Mock() - fetch_options = FetchOptions(True, None, None, None, None) - api.fetch_segment.side_effect = fetch_segment_mock - - segments_synchronizer = SegmentSynchronizer(api, split_storage, storage, rbs_storage) - task = segment_sync.SegmentSynchronizationTask(segments_synchronizer.synchronize_segments, - 0.5) - task.start() - time.sleep(0.7) - - assert task.is_running() - - stop_event = threading.Event() - task.stop(stop_event) - stop_event.wait() - assert not task.is_running() - - api_calls = [call for call in api.fetch_segment.mock_calls] - assert mocker.call('segmentA', -1, fetch_options) in api_calls - assert mocker.call('segmentB', -1, fetch_options) in api_calls - assert mocker.call('segmentC', -1, fetch_options) in api_calls - assert mocker.call('segmentA', 123, fetch_options) in api_calls - assert mocker.call('segmentB', 123, fetch_options) in api_calls - assert mocker.call('segmentC', 123, fetch_options) in api_calls - - segment_put_calls = storage.put.mock_calls - segments_to_validate = set(['segmentA', 'segmentB', 'segmentC']) - for call in segment_put_calls: - _, positional_args, _ = call - segment = positional_args[0] - assert isinstance(segment, Segment) - assert segment.name in segments_to_validate - segments_to_validate.remove(segment.name) - - def test_that_errors_dont_stop_task(self, mocker): - """Test that if fetching segments fails at some_point, the task will continue running.""" - split_storage = mocker.Mock(spec=DefinitionStorage) - split_storage.get_segment_names.return_value = ['segmentA', 'segmentB', 'segmentC'] - rbs_storage = mocker.Mock(spec=RuleBasedSegmentsStorage) - rbs_storage.get_segment_names.return_value = [] - - # Setup a mocked segment storage whose changenumber returns -1 on first fetch and - # 123 afterwards. - storage = mocker.Mock(spec=SegmentStorage) - - def change_number_mock(segment_name): - if segment_name == 'segmentA' and change_number_mock._count_a == 0: - change_number_mock._count_a = 1 - return -1 - if segment_name == 'segmentB' and change_number_mock._count_b == 0: - change_number_mock._count_b = 1 - return -1 - if segment_name == 'segmentC' and change_number_mock._count_c == 0: - change_number_mock._count_c = 1 - return -1 - return 123 - change_number_mock._count_a = 0 - change_number_mock._count_b = 0 - change_number_mock._count_c = 0 - storage.get_change_number.side_effect = change_number_mock - - # Setup a mocked segment api to return segments mentioned before. - def fetch_segment_mock(segment_name, change_number, fetch_options): - if segment_name == 'segmentA' and fetch_segment_mock._count_a == 0: - fetch_segment_mock._count_a = 1 - return {'name': 'segmentA', 'added': ['key1', 'key2', 'key3'], 'removed': [], - 'since': -1, 'till': 123} - if segment_name == 'segmentB' and fetch_segment_mock._count_b == 0: - fetch_segment_mock._count_b = 1 - raise Exception("some exception") - if segment_name == 'segmentC' and fetch_segment_mock._count_c == 0: - fetch_segment_mock._count_c = 1 - return {'name': 'segmentC', 'added': ['key7', 'key8', 'key9'], 'removed': [], - 'since': -1, 'till': 123} - return {'added': [], 'removed': [], 'since': 123, 'till': 123} - fetch_segment_mock._count_a = 0 - fetch_segment_mock._count_b = 0 - fetch_segment_mock._count_c = 0 - - api = mocker.Mock() - fetch_options = FetchOptions(True, None, None, None, None) - api.fetch_segment.side_effect = fetch_segment_mock - - segments_synchronizer = SegmentSynchronizer(api, split_storage, storage, rbs_storage) - task = segment_sync.SegmentSynchronizationTask(segments_synchronizer.synchronize_segments, - 0.5) - task.start() - time.sleep(0.7) - - assert task.is_running() - - stop_event = threading.Event() - task.stop(stop_event) - stop_event.wait() - assert not task.is_running() - - api_calls = [call for call in api.fetch_segment.mock_calls] - assert mocker.call('segmentA', -1, fetch_options) in api_calls - assert mocker.call('segmentB', -1, fetch_options) in api_calls - assert mocker.call('segmentC', -1, fetch_options) in api_calls - assert mocker.call('segmentA', 123, fetch_options) in api_calls - assert mocker.call('segmentC', 123, fetch_options) in api_calls - - segment_put_calls = storage.put.mock_calls - segments_to_validate = set(['segmentA', 'segmentB', 'segmentC']) - for call in segment_put_calls: - _, positional_args, _ = call - segment = positional_args[0] - assert isinstance(segment, Segment) - assert segment.name in segments_to_validate - segments_to_validate.remove(segment.name) - - -class SegmentSynchronizationAsyncTests(object): - """Split synchronization async task test cases.""" - - @pytest.mark.asyncio - async def test_normal_operation(self, mocker): - """Test the normal operation flow.""" - split_storage = mocker.Mock(spec=DefinitionStorage) - async def get_segment_names(): - return ['segmentA', 'segmentB', 'segmentC'] - split_storage.get_segment_names = get_segment_names - - rbs_storage = mocker.Mock(spec=RuleBasedSegmentsStorage) - async def get_segment_names_rbs(): - return [] - rbs_storage.get_segment_names = get_segment_names_rbs - - # Setup a mocked segment storage whose changenumber returns -1 on first fetch and - # 123 afterwards. - storage = mocker.Mock(spec=SegmentStorage) - - async def change_number_mock(segment_name): - if segment_name == 'segmentA' and change_number_mock._count_a == 0: - change_number_mock._count_a = 1 - return -1 - if segment_name == 'segmentB' and change_number_mock._count_b == 0: - change_number_mock._count_b = 1 - return -1 - if segment_name == 'segmentC' and change_number_mock._count_c == 0: - change_number_mock._count_c = 1 - return -1 - return 123 - change_number_mock._count_a = 0 - change_number_mock._count_b = 0 - change_number_mock._count_c = 0 - storage.get_change_number = change_number_mock - - self.segments = [] - async def put(segment): - self.segments.append(segment) - storage.put = put - - async def update(*arg): - pass - storage.update = update - - # Setup a mocked segment api to return segments mentioned before. - self.segment_name = [] - self.change_number = [] - self.fetch_options = [] - async def fetch_segment_mock(segment_name, change_number, fetch_options): - self.segment_name.append(segment_name) - self.change_number.append(change_number) - self.fetch_options.append(fetch_options) - if segment_name == 'segmentA' and fetch_segment_mock._count_a == 0: - fetch_segment_mock._count_a = 1 - return {'name': 'segmentA', 'added': ['key1', 'key2', 'key3'], 'removed': [], - 'since': -1, 'till': 123} - if segment_name == 'segmentB' and fetch_segment_mock._count_b == 0: - fetch_segment_mock._count_b = 1 - return {'name': 'segmentB', 'added': ['key4', 'key5', 'key6'], 'removed': [], - 'since': -1, 'till': 123} - if segment_name == 'segmentC' and fetch_segment_mock._count_c == 0: - fetch_segment_mock._count_c = 1 - return {'name': 'segmentC', 'added': ['key7', 'key8', 'key9'], 'removed': [], - 'since': -1, 'till': 123} - return {'added': [], 'removed': [], 'since': 123, 'till': 123} - fetch_segment_mock._count_a = 0 - fetch_segment_mock._count_b = 0 - fetch_segment_mock._count_c = 0 - - api = mocker.Mock() - fetch_options = FetchOptions(True, None, None, None, None) - api.fetch_segment = fetch_segment_mock - - segments_synchronizer = SegmentSynchronizerAsync(api, split_storage, storage, rbs_storage) - task = segment_sync.SegmentSynchronizationTaskAsync(segments_synchronizer.synchronize_segments, - 0.5) - task.start() - await asyncio.sleep(0.7) - assert task.is_running() - - await task.stop() - assert not task.is_running() - - api_calls = [] - for i in range(6): - api_calls.append((self.segment_name[i], self.change_number[i], self.fetch_options[i])) - - assert ('segmentA', -1, FetchOptions(True, None, None, None, None)) in api_calls - assert ('segmentA', 123, FetchOptions(True, None, None, None, None)) in api_calls - assert ('segmentB', -1, FetchOptions(True, None, None, None, None)) in api_calls - assert ('segmentB', 123, FetchOptions(True, None, None, None, None)) in api_calls - assert ('segmentC', -1, FetchOptions(True, None, None, None, None)) in api_calls - assert ('segmentC', 123, FetchOptions(True, None, None, None, None)) in api_calls - - segments_to_validate = set(['segmentA', 'segmentB', 'segmentC']) - for segment in self.segments: - assert isinstance(segment, Segment) - assert segment.name in segments_to_validate - segments_to_validate.remove(segment.name) - - @pytest.mark.asyncio - async def test_that_errors_dont_stop_task(self, mocker): - """Test that if fetching segments fails at some_point, the task will continue running.""" - split_storage = mocker.Mock(spec=DefinitionStorage) - async def get_segment_names(): - return ['segmentA', 'segmentB', 'segmentC'] - split_storage.get_segment_names = get_segment_names - - rbs_storage = mocker.Mock(spec=RuleBasedSegmentsStorage) - async def get_segment_names_rbs(): - return [] - rbs_storage.get_segment_names = get_segment_names_rbs - - # Setup a mocked segment storage whose changenumber returns -1 on first fetch and - # 123 afterwards. - storage = mocker.Mock(spec=SegmentStorage) - - async def change_number_mock(segment_name): - if segment_name == 'segmentA' and change_number_mock._count_a == 0: - change_number_mock._count_a = 1 - return -1 - if segment_name == 'segmentB' and change_number_mock._count_b == 0: - change_number_mock._count_b = 1 - return -1 - if segment_name == 'segmentC' and change_number_mock._count_c == 0: - change_number_mock._count_c = 1 - return -1 - return 123 - change_number_mock._count_a = 0 - change_number_mock._count_b = 0 - change_number_mock._count_c = 0 - storage.get_change_number = change_number_mock - - self.segments = [] - async def put(segment): - self.segments.append(segment) - storage.put = put - - async def update(*arg): - pass - storage.update = update - - # Setup a mocked segment api to return segments mentioned before. - self.segment_name = [] - self.change_number = [] - self.fetch_options = [] - async def fetch_segment_mock(segment_name, change_number, fetch_options): - self.segment_name.append(segment_name) - self.change_number.append(change_number) - self.fetch_options.append(fetch_options) - if segment_name == 'segmentA' and fetch_segment_mock._count_a == 0: - fetch_segment_mock._count_a = 1 - return {'name': 'segmentA', 'added': ['key1', 'key2', 'key3'], 'removed': [], - 'since': -1, 'till': 123} - if segment_name == 'segmentB' and fetch_segment_mock._count_b == 0: - fetch_segment_mock._count_b = 1 - raise Exception("some exception") - if segment_name == 'segmentC' and fetch_segment_mock._count_c == 0: - fetch_segment_mock._count_c = 1 - return {'name': 'segmentC', 'added': ['key7', 'key8', 'key9'], 'removed': [], - 'since': -1, 'till': 123} - return {'added': [], 'removed': [], 'since': 123, 'till': 123} - fetch_segment_mock._count_a = 0 - fetch_segment_mock._count_b = 0 - fetch_segment_mock._count_c = 0 - - api = mocker.Mock() - fetch_options = FetchOptions(True, None, None, None, None) - api.fetch_segment = fetch_segment_mock - - segments_synchronizer = SegmentSynchronizerAsync(api, split_storage, storage, rbs_storage) - task = segment_sync.SegmentSynchronizationTaskAsync(segments_synchronizer.synchronize_segments, - 0.5) - task.start() - await asyncio.sleep(0.7) - assert task.is_running() - - await task.stop() - assert not task.is_running() - - api_calls = [] - for i in range(5): - api_calls.append((self.segment_name[i], self.change_number[i], self.fetch_options[i])) - - assert ('segmentA', -1, FetchOptions(True, None, None, None, None)) in api_calls - assert ('segmentA', 123, FetchOptions(True, None, None, None, None)) in api_calls - assert ('segmentB', -1, FetchOptions(True, None, None, None, None)) in api_calls - assert ('segmentC', -1, FetchOptions(True, None, None, None, None)) in api_calls - assert ('segmentC', 123, FetchOptions(True, None, None, None, None)) in api_calls - - segments_to_validate = set(['segmentA', 'segmentB', 'segmentC']) - for segment in self.segments: - assert isinstance(segment, Segment) - assert segment.name in segments_to_validate - segments_to_validate.remove(segment.name) diff --git a/tests/tasks/test_split_sync.py b/tests/tasks/test_split_sync.py index 017ad7f3..3f075b57 100644 --- a/tests/tasks/test_split_sync.py +++ b/tests/tasks/test_split_sync.py @@ -9,6 +9,7 @@ from harness_commons.storage import DefinitionStorage, RuleBasedSegmentsStorage from splitio.models.splits import Split from splitio.sync.split import SplitSynchronizer, SplitSynchronizerAsync +from splitio.tasks.split_sync import SplitSynchronizationTask, SplitSynchronizationTaskAsync from splitio.optional.loaders import asyncio splits = [{ @@ -46,7 +47,6 @@ ] }] - class SplitSynchronizationTests(object): """Split synchronization task test cases.""" @@ -98,7 +98,7 @@ def get_changes(*args, **kwargs): fetch_options = FetchOptions(True) api.fetch_definitions.side_effect = get_changes split_synchronizer = SplitSynchronizer(api, storage, rbs_storage) - task = split_sync.SplitSynchronizationTask(split_synchronizer.synchronize_definitions, 0.5) + task = SplitSynchronizationTask(split_synchronizer.synchronize_definitions, 0.5) task.start() time.sleep(0.7) assert task.is_running() @@ -106,7 +106,6 @@ def get_changes(*args, **kwargs): task.stop(stop_event) stop_event.wait() assert not task.is_running() - print(api.fetch_definitions.mock_calls) assert api.fetch_definitions.mock_calls[0][1][0] == -1 assert api.fetch_definitions.mock_calls[0][1][2].cache_control_headers == True assert api.fetch_definitions.mock_calls[1][1][0] == 123 @@ -136,7 +135,7 @@ def run(x): storage.get_change_number.return_value = -1 split_synchronizer = SplitSynchronizer(api, storage, rbs_storage) - task = split_sync.SplitSynchronizationTask(split_synchronizer.synchronize_definitions, 0.5) + task = SplitSynchronizationTask(split_synchronizer.synchronize_definitions, 0.5) task.start() time.sleep(0.1) assert task.is_running() @@ -212,7 +211,7 @@ async def rbs_update(split, deleted, change_number): fetch_options = FetchOptions(True) split_synchronizer = SplitSynchronizerAsync(api, storage, rbs_storage) - task = split_sync.SplitSynchronizationTaskAsync(split_synchronizer.synchronize_definitions, 0.5) + task = SplitSynchronizationTaskAsync(split_synchronizer.synchronize_definitions, 0.5) task.start() await asyncio.sleep(2) assert task.is_running() @@ -247,7 +246,7 @@ async def get_change_number(): storage.get_change_number = get_change_number split_synchronizer = SplitSynchronizerAsync(api, storage, rbs_storage) - task = split_sync.SplitSynchronizationTaskAsync(split_synchronizer.synchronize_definitions, 0.5) + task = SplitSynchronizationTaskAsync(split_synchronizer.synchronize_definitions, 0.5) task.start() await asyncio.sleep(0.1) assert task.is_running() diff --git a/tests/tasks/test_unique_keys_sync.py b/tests/tasks/test_unique_keys_sync.py deleted file mode 100644 index 57856c89..00000000 --- a/tests/tasks/test_unique_keys_sync.py +++ /dev/null @@ -1,102 +0,0 @@ -"""Impressions synchronization task test module.""" -import asyncio -import threading -import time -import pytest - -from harness_commons.api.client import HttpResponse -from splitio.tasks.unique_keys_sync import UniqueKeysSyncTask, ClearFilterSyncTask,\ - ClearFilterSyncTaskAsync, UniqueKeysSyncTaskAsync -from harness_commons.api.telemetry import TelemetryAPI -from harness_commons.sync.unique_keys import UniqueKeysSynchronizer, ClearFilterSynchronizer,\ - UniqueKeysSynchronizerAsync, ClearFilterSynchronizerAsync -from harness_commons.engine.impressions.unique_keys_tracker import UniqueKeysTracker, UniqueKeysTrackerAsync - - -class UniqueKeysSyncTests(object): - """Unique Keys Syncrhonization task test cases.""" - - def test_normal_operation(self, mocker): - """Test that the task works properly under normal circumstances.""" - api = mocker.Mock(spec=TelemetryAPI) - api.record_unique_keys.return_value = HttpResponse(200, '', {}) - - unique_keys_tracker = UniqueKeysTracker() - unique_keys_tracker.track("key1", "split1") - unique_keys_tracker.track("key2", "split1") - - unique_keys_sync = UniqueKeysSynchronizer(mocker.Mock(), unique_keys_tracker) - task = UniqueKeysSyncTask(unique_keys_sync.send_all, 1) - task.start() - time.sleep(2) - assert task.is_running() - assert api.record_unique_keys.mock_calls == mocker.call() - stop_event = threading.Event() - task.stop(stop_event) - stop_event.wait(5) - assert stop_event.is_set() - -class ClearFilterSyncTests(object): - """Clear Filter Syncrhonization task test cases.""" - - def test_normal_operation(self, mocker): - """Test that the task works properly under normal circumstances.""" - - unique_keys_tracker = UniqueKeysTracker() - unique_keys_tracker.track("key1", "split1") - unique_keys_tracker.track("key2", "split1") - - clear_filter_sync = ClearFilterSynchronizer(unique_keys_tracker) - task = ClearFilterSyncTask(clear_filter_sync.clear_all, 1) - task.start() - time.sleep(2) - assert task.is_running() - assert not unique_keys_tracker._filter.contains("split1key1") - assert not unique_keys_tracker._filter.contains("split1key2") - stop_event = threading.Event() - task.stop(stop_event) - stop_event.wait(5) - assert stop_event.is_set() - -class UniqueKeysSyncAsyncTests(object): - """Unique Keys Syncrhonization task test cases.""" - - @pytest.mark.asyncio - async def test_normal_operation(self, mocker): - """Test that the task works properly under normal circumstances.""" - api = mocker.Mock(spec=TelemetryAPI) - api.record_unique_keys.return_value = HttpResponse(200, '', {}) - - unique_keys_tracker = UniqueKeysTrackerAsync() - await unique_keys_tracker.track("key1", "split1") - await unique_keys_tracker.track("key2", "split1") - - unique_keys_sync = UniqueKeysSynchronizerAsync(mocker.Mock(), unique_keys_tracker) - task = UniqueKeysSyncTaskAsync(unique_keys_sync.send_all, 1) - task.start() - await asyncio.sleep(2) - assert task.is_running() - assert api.record_unique_keys.mock_calls == mocker.call() - await task.stop() - assert not task.is_running() - -class ClearFilterSyncTests(object): - """Clear Filter Syncrhonization task test cases.""" - - @pytest.mark.asyncio - async def test_normal_operation(self, mocker): - """Test that the task works properly under normal circumstances.""" - - unique_keys_tracker = UniqueKeysTrackerAsync() - await unique_keys_tracker.track("key1", "split1") - await unique_keys_tracker.track("key2", "split1") - - clear_filter_sync = ClearFilterSynchronizerAsync(unique_keys_tracker) - task = ClearFilterSyncTaskAsync(clear_filter_sync.clear_all, 1) - task.start() - await asyncio.sleep(2) - assert task.is_running() - assert not unique_keys_tracker._filter.contains("split1key1") - assert not unique_keys_tracker._filter.contains("split1key2") - await task.stop() - assert not task.is_running() diff --git a/tests/tasks/util/test_asynctask.py b/tests/tasks/util/test_asynctask.py deleted file mode 100644 index b587b9c5..00000000 --- a/tests/tasks/util/test_asynctask.py +++ /dev/null @@ -1,257 +0,0 @@ -"""Asynctask test module.""" - -import time -import threading -import pytest - -from splitio.tasks.util import asynctask -from splitio.optional.loaders import asyncio - -class AsyncTaskTests(object): - """AsyncTask test cases.""" - - def test_default_task_flow(self, mocker): - """Test the default execution flow of an asynctask.""" - main_func = mocker.Mock() - on_init = mocker.Mock() - on_stop = mocker.Mock() - on_stop_event = threading.Event() - - task = asynctask.AsyncTask(main_func, 0.5, on_init, on_stop) - task.start() - time.sleep(1) - assert task.running() - task.stop(on_stop_event) - on_stop_event.wait() - - assert on_stop_event.is_set() - assert 0 < len(main_func.mock_calls) <= 2 - assert len(on_init.mock_calls) == 1 - assert len(on_stop.mock_calls) == 1 - assert not task.running() - - def test_main_exception_skips_iteration(self, mocker): - """Test that an exception in the main func only skips current iteration.""" - def raise_exception(): - raise Exception('something') - main_func = mocker.Mock() - main_func.side_effect = raise_exception - on_init = mocker.Mock() - on_stop = mocker.Mock() - on_stop_event = threading.Event() - - task = asynctask.AsyncTask(main_func, 0.1, on_init, on_stop) - task.start() - time.sleep(1) - assert task.running() - task.stop(on_stop_event) - on_stop_event.wait() - - assert on_stop_event.is_set() - assert 9 <= len(main_func.mock_calls) <= 10 - assert len(on_init.mock_calls) == 1 - assert len(on_stop.mock_calls) == 1 - assert not task.running() - - def test_on_init_failure_aborts_task(self, mocker): - """Test that if the on_init callback fails, the task never runs.""" - def raise_exception(): - raise Exception('something') - main_func = mocker.Mock() - on_init = mocker.Mock() - on_init.side_effect = raise_exception - on_stop = mocker.Mock() - on_stop_event = threading.Event() - - task = asynctask.AsyncTask(main_func, 0.1, on_init, on_stop) - task.start() - time.sleep(0.5) - assert not task.running() # Since on_init fails, task never starts - task.stop(on_stop_event) - on_stop_event.wait(1) - - assert on_stop_event.is_set() - assert on_init.mock_calls == [mocker.call()] - assert on_stop.mock_calls == [mocker.call()] - assert main_func.mock_calls == [] - assert not task.running() - - def test_on_stop_failure_ends_gacefully(self, mocker): - """Test that if the on_init callback fails, the task never runs.""" - def raise_exception(): - raise Exception('something') - main_func = mocker.Mock() - on_init = mocker.Mock() - on_stop = mocker.Mock() - on_stop.side_effect = raise_exception - on_stop_event = threading.Event() - - task = asynctask.AsyncTask(main_func, 0.1, on_init, on_stop) - task.start() - time.sleep(1) - task.stop(on_stop_event) - on_stop_event.wait(1) - - assert on_stop_event.is_set() - assert on_init.mock_calls == [mocker.call()] - assert on_stop.mock_calls == [mocker.call()] - assert 9 <= len(main_func.mock_calls) <= 10 - - def test_force_run(self, mocker): - """Test that if the on_init callback fails, the task never runs.""" - main_func = mocker.Mock() - on_init = mocker.Mock() - on_stop = mocker.Mock() - on_stop_event = threading.Event() - - task = asynctask.AsyncTask(main_func, 5, on_init, on_stop) - task.start() - time.sleep(1) - assert task.running() - task.force_execution() - task.force_execution() - task.stop(on_stop_event) - on_stop_event.wait(1) - - assert on_stop_event.is_set() - assert on_init.mock_calls == [mocker.call()] - assert on_stop.mock_calls == [mocker.call()] - assert len(main_func.mock_calls) == 2 - assert not task.running() - - -class AsyncTaskAsyncTests(object): - """AsyncTask test cases.""" - - @pytest.mark.asyncio - async def test_default_task_flow(self, mocker): - """Test the default execution flow of an asynctask.""" - self.main_called = 0 - async def main_func(): - self.main_called += 1 - - self.init_called = 0 - async def on_init(): - self.init_called += 1 - - self.stop_called = 0 - async def on_stop(): - self.stop_called += 1 - - task = asynctask.AsyncTaskAsync(main_func, 0.5, on_init, on_stop) - task.start() - await asyncio.sleep(1) - assert task.running() - await task.stop(True) - - assert 0 < self.main_called <= 2 - assert self.init_called == 1 - assert self.stop_called == 1 - assert not task.running() - - @pytest.mark.asyncio - async def test_main_exception_skips_iteration(self, mocker): - """Test that an exception in the main func only skips current iteration.""" - self.main_called = 0 - async def raise_exception(): - self.main_called += 1 - raise Exception('something') - main_func = raise_exception - - self.init_called = 0 - async def on_init(): - self.init_called += 1 - - self.stop_called = 0 - async def on_stop(): - self.stop_called += 1 - - task = asynctask.AsyncTaskAsync(main_func, 0.1, on_init, on_stop) - task.start() - await asyncio.sleep(1) - assert task.running() - await task.stop(True) - - assert 9 <= self.main_called <= 10 - assert self.init_called == 1 - assert self.stop_called == 1 - assert not task.running() - - @pytest.mark.asyncio - async def test_on_init_failure_aborts_task(self, mocker): - """Test that if the on_init callback fails, the task never runs.""" - self.main_called = 0 - async def main_func(): - self.main_called += 1 - - self.init_called = 0 - async def on_init(): - self.init_called += 1 - raise Exception('something') - - self.stop_called = 0 - async def on_stop(): - self.stop_called += 1 - - task = asynctask.AsyncTaskAsync(main_func, 0.1, on_init, on_stop) - task.start() - await asyncio.sleep(0.5) - assert not task.running() # Since on_init fails, task never starts - await task.stop(True) - - assert self.init_called == 1 - assert self.stop_called == 1 - assert self.main_called == 0 - assert not task.running() - - @pytest.mark.asyncio - async def test_on_stop_failure_ends_gacefully(self, mocker): - """Test that if the on_init callback fails, the task never runs.""" - self.main_called = 0 - async def main_func(): - self.main_called += 1 - - self.init_called = 0 - async def on_init(): - self.init_called += 1 - - self.stop_called = 0 - async def on_stop(): - self.stop_called += 1 - raise Exception('something') - - task = asynctask.AsyncTaskAsync(main_func, 0.1, on_init, on_stop) - task.start() - await asyncio.sleep(1) - await task.stop(True) - assert 9 <= self.main_called <= 10 - assert self.init_called == 1 - assert self.stop_called == 1 - - @pytest.mark.asyncio - async def test_force_run(self, mocker): - """Test that if the on_init callback fails, the task never runs.""" - self.main_called = 0 - async def main_func(): - self.main_called += 1 - - self.init_called = 0 - async def on_init(): - self.init_called += 1 - - self.stop_called = 0 - async def on_stop(): - self.stop_called += 1 - - task = asynctask.AsyncTaskAsync(main_func, 5, on_init, on_stop) - task.start() - await asyncio.sleep(1) - assert task.running() - task.force_execution() - task.force_execution() - await task.stop(True) - - assert self.main_called == 2 - assert self.init_called == 1 - assert self.stop_called == 1 - assert not task.running() diff --git a/tests/tasks/util/test_workerpool.py b/tests/tasks/util/test_workerpool.py deleted file mode 100644 index 2f7a8e71..00000000 --- a/tests/tasks/util/test_workerpool.py +++ /dev/null @@ -1,151 +0,0 @@ -"""Workerpool test module.""" -# pylint: disable=no-self-use,too-few-public-methods,missing-docstring -import time -import threading -import pytest - -from splitio.tasks.util import workerpool -from splitio.optional.loaders import asyncio - -class WorkerPoolTests(object): - """Worker pool test cases.""" - - def test_normal_operation(self, mocker): - """Test normal opeation works properly.""" - worker_func = mocker.Mock() - wpool = workerpool.WorkerPool(10, worker_func) - wpool.start() - for num in range(0, 100): - wpool.submit_work(str(num)) - - stop_event = threading.Event() - wpool.stop(stop_event) - stop_event.wait(5) - assert stop_event.is_set() - - calls = worker_func.mock_calls - for num in range(0, 100): - assert mocker.call(str(num)) in calls - - def test_fail_in_msg_doesnt_break(self): - """Test that if a message cannot be parsed it is ignored and others are processed.""" - class Worker(object): #pylint: disable= - def __init__(self): - self.worked = set() - - def do_work(self, work): - if work == '55': - raise Exception('something') - self.worked.add(work) - - worker = Worker() - wpool = workerpool.WorkerPool(50, worker.do_work) - wpool.start() - for num in range(0, 100): - wpool.submit_work(str(num)) - - stop_event = threading.Event() - wpool.stop(stop_event) - stop_event.wait(5) - assert stop_event.is_set() - - for num in range(0, 100): - if num != 55: - assert str(num) in worker.worked - else: - assert str(num) not in worker.worked - - def test_msg_acked_after_processed(self): - """Test that events are only set after all the work in the pipeline is done.""" - class Worker(object): - def __init__(self): - self.worked = set() - - def do_work(self, work): - self.worked.add(work) - time.sleep(0.02) # will wait 2 seconds in total for 100 elements - - worker = Worker() - wpool = workerpool.WorkerPool(50, worker.do_work) - wpool.start() - for num in range(0, 100): - wpool.submit_work(str(num)) - - wpool.wait_for_completion() - assert len(worker.worked) == 100 - - -class WorkerPoolAsyncTests(object): - """Worker pool async test cases.""" - - @pytest.mark.asyncio - async def test_normal_operation(self, mocker): - """Test normal opeation works properly.""" - self.calls = 0 - calls = [] - async def worker_func(num): - self.calls += 1 - calls.append(num) - - wpool = workerpool.WorkerPoolAsync(10, worker_func) - wpool.start() - jobs = [] - for num in range(0, 11): - jobs.append(str(num)) - - task = await wpool.submit_work(jobs) - assert await task.await_completion() - await wpool.stop() - for num in range(0, 11): - assert str(num) in calls - - @pytest.mark.asyncio - async def test_fail_in_msg_doesnt_break(self): - """Test that if a message cannot be parsed it is ignored and others are processed.""" - class Worker(object): #pylint: disable= - def __init__(self): - self.worked = set() - - async def do_work(self, work): - if work == '55': - raise Exception('something') - self.worked.add(work) - - worker = Worker() - wpool = workerpool.WorkerPoolAsync(50, worker.do_work) - wpool.start() - jobs = [] - for num in range(0, 100): - jobs.append(str(num)) - task = await wpool.submit_work(jobs) - - assert not await task.await_completion() - await wpool.stop() - - for num in range(0, 100): - if num != 55: - assert str(num) in worker.worked - else: - assert str(num) not in worker.worked - - @pytest.mark.asyncio - async def test_msg_acked_after_processed(self): - """Test that events are only set after all the work in the pipeline is done.""" - class Worker(object): - def __init__(self): - self.worked = set() - - async def do_work(self, work): - self.worked.add(work) - await asyncio.sleep(0.02) # will wait 2 seconds in total for 100 elements - - worker = Worker() - wpool = workerpool.WorkerPoolAsync(50, worker.do_work) - wpool.start() - jobs = [] - for num in range(0, 100): - jobs.append(str(num)) - task = await wpool.submit_work(jobs) - assert await task.await_completion() - await wpool.stop() - assert len(worker.worked) == 100 diff --git a/tests/util/test_backoff.py b/tests/util/test_backoff.py deleted file mode 100644 index 5fffbc33..00000000 --- a/tests/util/test_backoff.py +++ /dev/null @@ -1,45 +0,0 @@ -"""Backoff unit tests.""" -from splitio.util.backoff import Backoff - - -class BackOffTests(object): # pylint:disable=too-few-public-methods - """Backoff test cases.""" - - def test_basic_functionality(self): # pylint:disable=no-self-use - """Test basic working.""" - backoff = Backoff() - assert backoff.get() == 1 - assert backoff.get() == 2 - assert backoff.get() == 4 - assert backoff.get() == 8 - assert backoff.get() == 16 - assert backoff.get() == 32 - assert backoff.get() == 64 - assert backoff.get() == 128 - assert backoff.get() == 256 - assert backoff.get() == 512 - assert backoff.get() == 1024 - - # assert that it's limited to 30 minutes - assert backoff.get() == 1800 - assert backoff.get() == 1800 - assert backoff.get() == 1800 - assert backoff.get() == 1800 - - # assert that resetting begins on 1 - backoff.reset() - assert backoff.get() == 1 - assert backoff.get() == 2 - assert backoff.get() == 4 - assert backoff.get() == 8 - assert backoff.get() == 16 - assert backoff.get() == 32 - assert backoff.get() == 64 - assert backoff.get() == 128 - assert backoff.get() == 256 - assert backoff.get() == 512 - assert backoff.get() == 1024 - assert backoff.get() == 1800 - assert backoff.get() == 1800 - assert backoff.get() == 1800 - assert backoff.get() == 1800 diff --git a/tests/util/test_storage_helper.py b/tests/util/test_storage_helper.py deleted file mode 100644 index 3c99a7b0..00000000 --- a/tests/util/test_storage_helper.py +++ /dev/null @@ -1,340 +0,0 @@ -"""Storage Helper tests.""" -import pytest -import queue -import asyncio - -from harness_commons.util.storage_helper import update_definition_storage, get_valid_flag_sets, combine_valid_flag_sets, \ - update_rule_based_segment_storage, update_rule_based_segment_storage_async, update_definition_storage_async, \ - get_standard_segment_names_in_rbs_storage_async, get_standard_segment_names_in_rbs_storage -from splitio.storage.inmemory import InMemorySplitStorage, InMemorySplitStorageAsync -from harness_commons.storage.inmemmory import InMemoryRuleBasedSegmentStorage, InMemoryRuleBasedSegmentStorageAsync -from splitio.models import splits -from harness_commons.models import rule_based_segments -from harness_commons.storage import FlagSetsFilter -from tests.sync.test_splits_synchronizer import splits_raw as split_sample - -class StorageHelperTests(object): - - rbs = rule_based_segments.from_raw({ - "changeNumber": 123, - "name": "sample_rule_based_segment", - "status": "ACTIVE", - "trafficTypeName": "user", - "excluded":{ - "keys":["mauro@split.io","gaston@split.io"], - "segments":[{"name":"excluded_segment", "type": "standard"}] - }, - "conditions": [ - {"matcherGroup": { - "combiner": "AND", - "matchers": [ - { - "matcherType": "IN_SEGMENT", - "negate": False, - "userDefinedSegmentMatcherData": { - "segmentName": "employees" - }, - "whitelistMatcherData": None - } - ] - }, - } - ] - }) - - def test_update_definition_storage(self, mocker): - storage = mocker.Mock(spec=InMemorySplitStorage) - split = splits.from_raw(split_sample[0]) - - self.added = [] - self.deleted = [] - self.change_number = 0 - def update(to_add, to_delete, change_number): - self.added = to_add - self.deleted = to_delete - self.change_number = change_number - storage.update = update - - def is_flag_set_exist(flag_set): - return False - storage.is_flag_set_exist = is_flag_set_exist - - class flag_set_filter(): - def should_filter(): - return False - def intersect(sets): - return True - storage.flag_set_filter = flag_set_filter - storage.flag_set_filter.flag_sets = {} - - self.clear = 0 - def clear(): - self.clear += 1 - storage.clear = clear - - update_definition_storage(storage, [split], 123, True) - assert self.added[0] == split - assert self.deleted == [] - assert self.change_number == 123 - assert self.clear == 1 - - class flag_set_filter2(): - def should_filter(): - return True - def intersect(sets): - return False - storage.flag_set_filter = flag_set_filter2 - storage.flag_set_filter.flag_sets = set({'set1', 'set2'}) - - self.clear = 0 - update_definition_storage(storage, [split], 123) - assert self.added == [] - assert self.deleted[0] == split.name - assert self.clear == 0 - - class flag_set_filter3(): - def should_filter(): - return True - def intersect(sets): - return True - storage.flag_set_filter = flag_set_filter3 - storage.flag_set_filter.flag_sets = set({'set1', 'set2'}) - - def is_flag_set_exist2(flag_set): - return True - storage.is_flag_set_exist = is_flag_set_exist2 - update_definition_storage(storage, [split], 123) - assert self.added[0] == split - assert self.deleted == [] - - split_json = split_sample[0] - split_json['conditions'].append({ - "matcherGroup": { - "combiner": "AND", - "matchers": [ - { - "matcherType": "IN_SEGMENT", - "negate": False, - "userDefinedSegmentMatcherData": { - "segmentName": "segment1" - }, - "whitelistMatcherData": None - } - ] - }, - "partitions": [ - { - "treatment": "on", - "size": 30 - }, - { - "treatment": "off", - "size": 70 - } - ] - } - ) - - split = splits.from_raw(split_json) - storage.config_flag_sets_used = 0 - assert update_definition_storage(storage, [split], 123) == {'segment1'} - - def test_get_valid_flag_sets(self): - flag_sets = ['set1', 'set2'] - config_flag_sets = FlagSetsFilter([]) - assert get_valid_flag_sets(flag_sets, config_flag_sets) == ['set1', 'set2'] - - config_flag_sets = FlagSetsFilter(['set1']) - assert get_valid_flag_sets(flag_sets, config_flag_sets) == ['set1'] - - flag_sets = ['set2', 'set3'] - config_flag_sets = FlagSetsFilter(['set1', 'set2']) - assert get_valid_flag_sets(flag_sets, config_flag_sets) == ['set2'] - - flag_sets = ['set3', 'set4'] - config_flag_sets = FlagSetsFilter(['set1', 'set2']) - assert get_valid_flag_sets(flag_sets, config_flag_sets) == [] - - flag_sets = [] - config_flag_sets = FlagSetsFilter(['set1', 'set2']) - assert get_valid_flag_sets(flag_sets, config_flag_sets) == [] - - def test_combine_valid_flag_sets(self): - results_set = [{'set1', 'set2'}, {'set2', 'set3'}] - assert combine_valid_flag_sets(results_set) == {'set1', 'set2', 'set3'} - - results_set = [{}, {'set2', 'set3'}] - assert combine_valid_flag_sets(results_set) == {'set2', 'set3'} - - results_set = ['set1', {'set2', 'set3'}] - assert combine_valid_flag_sets(results_set) == {'set2', 'set3'} - - def test_update_rule_base_segment_storage(self, mocker): - storage = mocker.Mock(spec=InMemoryRuleBasedSegmentStorage) - self.added = [] - self.deleted = [] - self.change_number = 0 - def update(to_add, to_delete, change_number): - self.added = to_add - self.deleted = to_delete - self.change_number = change_number - storage.update = update - - self.clear = 0 - def clear(): - self.clear += 1 - storage.clear = clear - - segments = update_rule_based_segment_storage(storage, [self.rbs], 123) - assert self.added[0] == self.rbs - assert self.deleted == [] - assert self.change_number == 123 - assert segments == {'excluded_segment', 'employees'} - assert self.clear == 0 - - segments = update_rule_based_segment_storage(storage, [self.rbs], 123, True) - assert self.clear == 1 - - def test_get_standard_segment_in_rbs_storage(self, mocker): - events_queue = queue.Queue() - storage = InMemoryRuleBasedSegmentStorage(events_queue) - segments = update_rule_based_segment_storage(storage, [self.rbs], 123) - assert get_standard_segment_names_in_rbs_storage(storage) == {'excluded_segment', 'employees'} - - @pytest.mark.asyncio - async def test_get_standard_segment_in_rbs_storage(self, mocker): - storage = InMemoryRuleBasedSegmentStorageAsync(asyncio.Queue()) - segments = await update_rule_based_segment_storage_async(storage, [self.rbs], 123) - assert await get_standard_segment_names_in_rbs_storage_async(storage) == {'excluded_segment', 'employees'} - - @pytest.mark.asyncio - async def test_update_rule_base_segment_storage_async(self, mocker): - storage = mocker.Mock(spec=InMemoryRuleBasedSegmentStorageAsync) - self.added = [] - self.deleted = [] - self.change_number = 0 - async def update(to_add, to_delete, change_number): - self.added = to_add - self.deleted = to_delete - self.change_number = change_number - storage.update = update - - self.clear = 0 - async def clear(): - self.clear += 1 - storage.clear = clear - - segments = await update_rule_based_segment_storage_async(storage, [self.rbs], 123) - assert self.added[0] == self.rbs - assert self.deleted == [] - assert self.change_number == 123 - assert segments == {'excluded_segment', 'employees'} - - segments = await update_rule_based_segment_storage_async(storage, [self.rbs], 123, True) - assert self.clear == 1 - - @pytest.mark.asyncio - async def test_update_definition_storage_async(self, mocker): - storage = mocker.Mock(spec=InMemorySplitStorageAsync) - split = splits.from_raw(split_sample[0]) - - self.added = [] - self.deleted = [] - self.change_number = 0 - async def get(flag_name): - return None - storage.get = get - - async def update(to_add, to_delete, change_number): - self.added = to_add - self.deleted = to_delete - self.change_number = change_number - storage.update = update - - async def is_flag_set_exist(flag_set): - return False - storage.is_flag_set_exist = is_flag_set_exist - - class flag_set_filter(): - def should_filter(): - return False - def intersect(sets): - return True - storage.flag_set_filter = flag_set_filter - storage.flag_set_filter.flag_sets = {} - - self.clear = 0 - async def clear(): - self.clear += 1 - storage.clear = clear - - await update_definition_storage_async(storage, [split], 123, True) - assert self.added[0] == split - assert self.deleted == [] - assert self.change_number == 123 - assert self.clear == 1 - - class flag_set_filter2(): - def should_filter(): - return True - def intersect(sets): - return False - storage.flag_set_filter = flag_set_filter2 - storage.flag_set_filter.flag_sets = set({'set1', 'set2'}) - - async def get(flag_name): - return split - storage.get = get - - self.clear = 0 - await update_definition_storage_async(storage, [split], 123) - assert self.added == [] - assert self.deleted[0] == split.name - assert self.clear == 0 - - class flag_set_filter3(): - def should_filter(): - return True - def intersect(sets): - return True - storage.flag_set_filter = flag_set_filter3 - storage.flag_set_filter.flag_sets = set({'set1', 'set2'}) - - async def is_flag_set_exist2(flag_set): - return True - storage.is_flag_set_exist = is_flag_set_exist2 - await update_definition_storage_async(storage, [split], 123) - assert self.added[0] == split - assert self.deleted == [] - - split_json = split_sample[0] - split_json['conditions'].append({ - "matcherGroup": { - "combiner": "AND", - "matchers": [ - { - "matcherType": "IN_SEGMENT", - "negate": False, - "userDefinedSegmentMatcherData": { - "segmentName": "segment1" - }, - "whitelistMatcherData": None - } - ] - }, - "partitions": [ - { - "treatment": "on", - "size": 30 - }, - { - "treatment": "off", - "size": 70 - } - ] - } - ) - - split = splits.from_raw(split_json) - storage.config_flag_sets_used = 0 - assert await update_definition_storage_async(storage, [split], 123) == {'segment1'} \ No newline at end of file diff --git a/tests/util/test_threadutil.py b/tests/util/test_threadutil.py deleted file mode 100644 index 7473aa96..00000000 --- a/tests/util/test_threadutil.py +++ /dev/null @@ -1,44 +0,0 @@ -"""threading utilities unit tests.""" - -import time -import threading - -from splitio.util.threadutil import EventGroup - - -class EventGroupTests(object): - """EventGroup class test cases.""" - - def test_basic_functionality(self): - """Test basic functionality.""" - - def fun(event): #pylint:disable=missing-docstring - time.sleep(1) - event.set() - - group = EventGroup() - event1 = group.make_event() - event2 = group.make_event() - - task = threading.Thread(target=fun, args=(event1,)) - task.start() - group.wait(3) - assert event1.is_set() - assert not event2.is_set() - - group = EventGroup() - event1 = group.make_event() - event2 = group.make_event() - - task = threading.Thread(target=fun, args=(event2,)) - task.start() - group.wait(3) - assert not event1.is_set() - assert event2.is_set() - - group = EventGroup() - event1 = group.make_event() - event2 = group.make_event() - group.wait(3) - assert not event1.is_set() - assert not event2.is_set()