Siembol Response: improve initialisation (#713)

This commit is contained in:
Celie Valentiny
2022-07-05 15:32:15 +01:00
committed by GitHub
parent 7ff6b8fbae
commit 63fdffa288
31 changed files with 145 additions and 75 deletions

View File

@@ -11,7 +11,7 @@
<parent>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>alerting</artifactId>
<version>2.6.1-SNAPSHOT</version>
<version>2.6.2-SNAPSHOT</version>
</parent>
<dependencies>
<dependency>
@@ -35,7 +35,7 @@
<dependency>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>siembol-common</artifactId>
<version>2.6.1-SNAPSHOT</version>
<version>2.6.2-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>junit</groupId>

View File

@@ -11,7 +11,7 @@
<parent>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>alerting</artifactId>
<version>2.6.1-SNAPSHOT</version>
<version>2.6.2-SNAPSHOT</version>
</parent>
<dependencies>
<dependency>
@@ -23,7 +23,7 @@
<dependency>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>alerting-core</artifactId>
<version>2.6.1-SNAPSHOT</version>
<version>2.6.2-SNAPSHOT</version>
<exclusions>
<exclusion>
<artifactId>jackson-databind</artifactId>

View File

@@ -9,7 +9,7 @@
<parent>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>alerting</artifactId>
<version>2.6.1-SNAPSHOT</version>
<version>2.6.2-SNAPSHOT</version>
</parent>
<dependencies>
<dependency>
@@ -51,7 +51,7 @@
<dependency>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>alerting-core</artifactId>
<version>2.6.1-SNAPSHOT</version>
<version>2.6.2-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>

View File

@@ -11,7 +11,7 @@
<parent>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>siembol</artifactId>
<version>2.6.1-SNAPSHOT</version>
<version>2.6.2-SNAPSHOT</version>
</parent>
<modules>
<module>alerting-core</module>

View File

@@ -9,13 +9,13 @@
<parent>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>config-editor</artifactId>
<version>2.6.1-SNAPSHOT</version>
<version>2.6.2-SNAPSHOT</version>
</parent>
<dependencies>
<dependency>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>siembol-common</artifactId>
<version>2.6.1-SNAPSHOT</version>
<version>2.6.2-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>

View File

@@ -9,7 +9,7 @@
<parent>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>config-editor</artifactId>
<version>2.6.1-SNAPSHOT</version>
<version>2.6.2-SNAPSHOT</version>
</parent>
<dependencyManagement>
<dependencies>
@@ -56,7 +56,7 @@
<dependency>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>siembol-common</artifactId>
<version>2.6.1-SNAPSHOT</version>
<version>2.6.2-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
@@ -67,22 +67,22 @@
<dependency>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>config-editor-core</artifactId>
<version>2.6.1-SNAPSHOT</version>
<version>2.6.2-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>config-editor-services</artifactId>
<version>2.6.1-SNAPSHOT</version>
<version>2.6.2-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>config-editor-sync</artifactId>
<version>2.6.1-SNAPSHOT</version>
<version>2.6.2-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>alerting-core</artifactId>
<version>2.6.1-SNAPSHOT</version>
<version>2.6.2-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
@@ -93,7 +93,7 @@
<dependency>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>parsing-app</artifactId>
<version>2.6.1-SNAPSHOT</version>
<version>2.6.2-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
@@ -104,7 +104,7 @@
<dependency>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>enriching-core</artifactId>
<version>2.6.1-SNAPSHOT</version>
<version>2.6.2-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
@@ -115,7 +115,7 @@
<dependency>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>responding-core</artifactId>
<version>2.6.1-SNAPSHOT</version>
<version>2.6.2-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>

View File

@@ -10,7 +10,7 @@
<parent>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>config-editor</artifactId>
<version>2.6.1-SNAPSHOT</version>
<version>2.6.2-SNAPSHOT</version>
</parent>
<dependencies>
<dependency>
@@ -41,32 +41,32 @@
<dependency>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>siembol-common</artifactId>
<version>2.6.1-SNAPSHOT</version>
<version>2.6.2-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>config-editor-core</artifactId>
<version>2.6.1-SNAPSHOT</version>
<version>2.6.2-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>alerting-core</artifactId>
<version>2.6.1-SNAPSHOT</version>
<version>2.6.2-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>parsing-app</artifactId>
<version>2.6.1-SNAPSHOT</version>
<version>2.6.2-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>enriching-core</artifactId>
<version>2.6.1-SNAPSHOT</version>
<version>2.6.2-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>responding-core</artifactId>
<version>2.6.1-SNAPSHOT</version>
<version>2.6.2-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>junit</groupId>

View File

@@ -9,7 +9,7 @@
<parent>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>config-editor</artifactId>
<version>2.6.1-SNAPSHOT</version>
<version>2.6.2-SNAPSHOT</version>
</parent>
<dependencies>
<dependency>
@@ -20,17 +20,17 @@
<dependency>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>siembol-common</artifactId>
<version>2.6.1-SNAPSHOT</version>
<version>2.6.2-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>config-editor-core</artifactId>
<version>2.6.1-SNAPSHOT</version>
<version>2.6.2-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>parsing-app</artifactId>
<version>2.6.1-SNAPSHOT</version>
<version>2.6.2-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>

View File

@@ -11,7 +11,7 @@
<parent>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>siembol</artifactId>
<version>2.6.1-SNAPSHOT</version>
<version>2.6.2-SNAPSHOT</version>
</parent>
<modules>
<module>config-editor-core</module>

View File

@@ -9,7 +9,7 @@
<parent>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>siembol</artifactId>
<version>2.6.1-SNAPSHOT</version>
<version>2.6.2-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<dependencyManagement>
@@ -37,7 +37,7 @@
<dependency>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>siembol-common</artifactId>
<version>2.6.1-SNAPSHOT</version>
<version>2.6.2-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>

View File

@@ -9,7 +9,7 @@
<parent>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>siembol</artifactId>
<version>2.6.1-SNAPSHOT</version>
<version>2.6.2-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<dependencyManagement>
@@ -43,7 +43,7 @@
<dependency>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>siembol-common</artifactId>
<version>2.6.1-SNAPSHOT</version>
<version>2.6.2-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>

View File

@@ -11,7 +11,7 @@
<parent>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>enriching</artifactId>
<version>2.6.1-SNAPSHOT</version>
<version>2.6.2-SNAPSHOT</version>
</parent>
<dependencies>
<dependency>
@@ -35,12 +35,12 @@
<dependency>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>siembol-common</artifactId>
<version>2.6.1-SNAPSHOT</version>
<version>2.6.2-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>alerting-core</artifactId>
<version>2.6.1-SNAPSHOT</version>
<version>2.6.2-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>junit</groupId>

View File

@@ -9,7 +9,7 @@
<parent>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>enriching</artifactId>
<version>2.6.1-SNAPSHOT</version>
<version>2.6.2-SNAPSHOT</version>
</parent>
<dependencies>
<dependency>
@@ -75,7 +75,7 @@
<dependency>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>enriching-core</artifactId>
<version>2.6.1-SNAPSHOT</version>
<version>2.6.2-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>

View File

@@ -11,7 +11,7 @@
<parent>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>siembol</artifactId>
<version>2.6.1-SNAPSHOT</version>
<version>2.6.2-SNAPSHOT</version>
</parent>
<modules>
<module>enriching-core</module>

View File

@@ -11,7 +11,7 @@
<parent>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>parsing</artifactId>
<version>2.6.1-SNAPSHOT</version>
<version>2.6.2-SNAPSHOT</version>
</parent>
<dependencies>
<dependency>
@@ -39,12 +39,12 @@
<dependency>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>siembol-common</artifactId>
<version>2.6.1-SNAPSHOT</version>
<version>2.6.2-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>parsing-core</artifactId>
<version>2.6.1-SNAPSHOT</version>
<version>2.6.2-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>junit</groupId>

View File

@@ -11,7 +11,7 @@
<parent>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>parsing</artifactId>
<version>2.6.1-SNAPSHOT</version>
<version>2.6.2-SNAPSHOT</version>
</parent>
<dependencies>
<dependency>
@@ -50,7 +50,7 @@
<dependency>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>siembol-common</artifactId>
<version>2.6.1-SNAPSHOT</version>
<version>2.6.2-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>joda-time</groupId>

View File

@@ -9,7 +9,7 @@
<parent>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>parsing</artifactId>
<version>2.6.1-SNAPSHOT</version>
<version>2.6.2-SNAPSHOT</version>
</parent>
<dependencies>
<dependency>
@@ -75,7 +75,7 @@
<dependency>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>parsing-app</artifactId>
<version>2.6.1-SNAPSHOT</version>
<version>2.6.2-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>

View File

@@ -11,7 +11,7 @@
<parent>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>siembol</artifactId>
<version>2.6.1-SNAPSHOT</version>
<version>2.6.2-SNAPSHOT</version>
</parent>
<modules>
<module>parsing-core</module>

View File

@@ -6,7 +6,7 @@
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>siembol</artifactId>
<name>siembol</name>
<version>2.6.1-SNAPSHOT</version>
<version>2.6.2-SNAPSHOT</version>
<description>A scalable, advanced security analytics framework based on open-source big data technologies.</description>
<inceptionYear>2019</inceptionYear>
<url>https://siembol.io/</url>

View File

@@ -11,7 +11,7 @@
<parent>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>siembol</artifactId>
<version>2.6.1-SNAPSHOT</version>
<version>2.6.2-SNAPSHOT</version>
</parent>
<modules>
<module>responding-core</module>

View File

@@ -11,7 +11,7 @@
<parent>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>responding</artifactId>
<version>2.6.1-SNAPSHOT</version>
<version>2.6.2-SNAPSHOT</version>
</parent>
<dependencies>
<dependency>
@@ -35,12 +35,12 @@
<dependency>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>siembol-common</artifactId>
<version>2.6.1-SNAPSHOT</version>
<version>2.6.2-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>alerting-core</artifactId>
<version>2.6.1-SNAPSHOT</version>
<version>2.6.2-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.jayway.jsonpath</groupId>

View File

@@ -9,7 +9,7 @@
<parent>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>responding</artifactId>
<version>2.6.1-SNAPSHOT</version>
<version>2.6.2-SNAPSHOT</version>
</parent>
<dependencyManagement>
<dependencies>
@@ -51,7 +51,7 @@
<dependency>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>siembol-common</artifactId>
<version>2.6.1-SNAPSHOT</version>
<version>2.6.2-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
@@ -62,7 +62,7 @@
<dependency>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>responding-core</artifactId>
<version>2.6.1-SNAPSHOT</version>
<version>2.6.2-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>

View File

@@ -68,6 +68,7 @@ public class ApplicationConfiguration implements DisposableBean {
streamService = properties.getInactiveStreamService()
? new InactiveRulesService()
: new KafkaStreamRulesService(rulesProvider, properties);
streamService.initialise();
return streamService;
}
@@ -78,7 +79,7 @@ public class ApplicationConfiguration implements DisposableBean {
@Autowired SiembolMetricsRegistrar metricsRegistrar) throws Exception {
return properties.getInactiveStreamService()
? () -> null :
new ZooKeeperRulesProvider(properties.getZookeperAttributes(), respondingCompiler, metricsRegistrar);
new ZooKeeperRulesProvider(properties.getZooKeeperAttributes(), respondingCompiler, metricsRegistrar);
}
@Override

View File

@@ -16,9 +16,10 @@ public class ResponseConfigurationProperties {
private Map<String, Object> streamConfig;
private Boolean inactiveStreamService = false;
@NestedConfigurationProperty
private ZooKeeperAttributesDto zookeperAttributes;
private ZooKeeperAttributesDto zooKeeperAttributes;
@NestedConfigurationProperty
private ProvidedEvaluatorsProperties evaluatorsProperties;
private int initialisationSleepTimeMs = 1000;
public String getInputTopic() {
return inputTopic;
@@ -36,12 +37,12 @@ public class ResponseConfigurationProperties {
this.errorTopic = errorTopic;
}
public ZooKeeperAttributesDto getZookeperAttributes() {
return zookeperAttributes;
public ZooKeeperAttributesDto getZooKeeperAttributes() {
return zooKeeperAttributes;
}
public void setZookeperAttributes(ZooKeeperAttributesDto zookeperAttributes) {
this.zookeperAttributes = zookeperAttributes;
public void setZooKeeperAttributes(ZooKeeperAttributesDto zooKeeperAttributes) {
this.zooKeeperAttributes = zooKeeperAttributes;
}
public Boolean getInactiveStreamService() {
@@ -67,4 +68,12 @@ public class ResponseConfigurationProperties {
public void setEvaluatorsProperties(ProvidedEvaluatorsProperties evaluatorsProperties) {
this.evaluatorsProperties = evaluatorsProperties;
}
public int getInitialisationSleepTimeMs() {
return initialisationSleepTimeMs;
}
public void setInitialisationSleepTimeMs(int initialisationSleepTimeMs) {
this.initialisationSleepTimeMs = initialisationSleepTimeMs;
}
}

View File

@@ -20,6 +20,7 @@ import uk.co.gresearch.siembol.response.common.ResponseEvaluationResult;
import java.lang.invoke.MethodHandles;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import static uk.co.gresearch.siembol.response.common.RespondingResult.StatusCode.ERROR;
import static uk.co.gresearch.siembol.response.common.RespondingResult.StatusCode.OK;
@@ -28,8 +29,13 @@ public class KafkaStreamRulesService implements RulesService {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final String INIT_START = "Kafka stream service initialisation started";
private static final String INIT_COMPLETED = "Kafka stream service initialisation completed";
private static final String RULES_INIT_COMPLETED = "Response rules have been initialised";
private static final String RULES_NOT_INITIALISED = "Response rules have not been initialised yet";
private static final String RULES_INIT_ERROR = "Error during waiting for initialisation";
private final KafkaStreams streams;
private final RulesProvider rulesProvider;
private final int initSleepTimeMs;
public KafkaStreamRulesService(RulesProvider rulesProvider,
ResponseConfigurationProperties properties) {
@@ -40,7 +46,28 @@ public class KafkaStreamRulesService implements RulesService {
ResponseConfigurationProperties properties,
KafkaStreamsFactory kafkaStreamsFactory) {
this.rulesProvider = rulesProvider;
this.initSleepTimeMs = properties.getInitialisationSleepTimeMs();
streams = createStreams(kafkaStreamsFactory, properties);
}
public void initialise() {
if (rulesProvider.isInitialised()) {
initialiseRules();
return;
}
CompletableFuture.runAsync(this::initialiseRules);
}
private void initialiseRules() {
while (!rulesProvider.isInitialised()) {
try {
LOG.warn(RULES_NOT_INITIALISED);
Thread.sleep(initSleepTimeMs);
} catch (InterruptedException e) {
LOG.error(RULES_INIT_ERROR);
}
}
LOG.info(RULES_INIT_COMPLETED);
streams.start();
}
@@ -105,7 +132,7 @@ public class KafkaStreamRulesService implements RulesService {
@Override
public Mono<Health> checkHealth() {
return Mono.just(streams.state().isRunningOrRebalancing() || streams.state().equals(KafkaStreams.State.CREATED)
return Mono.just(!this.rulesProvider.isInitialised() || streams.state().isRunningOrRebalancing() || streams.state().equals(KafkaStreams.State.CREATED)
? Health.up().build() :
Health.down().build());
}

View File

@@ -4,4 +4,8 @@ import uk.co.gresearch.siembol.response.engine.ResponseEngine;
public interface RulesProvider {
ResponseEngine getEngine();
default boolean isInitialised() {
return getEngine() != null;
}
}

View File

@@ -8,4 +8,5 @@ public interface RulesService {
RespondingResult getRulesMetadata();
Mono<Health> checkHealth();
default void close() {}
default void initialise() {};
}

View File

@@ -25,11 +25,11 @@ public class ZooKeeperRulesProvider implements RulesProvider {
private static final String COMPILE_RULES_ERROR_MSG_FORMAT =
"Compilation of response rules has failed with error message: {}";
private static final String UPDATE_EXCEPTION_LOG = "Exception during response engine update: {}";
private static final String ERROR_INIT_MESSAGE = "Response exception: Response rules initialisation error";
private static final String INIT_START = "Response application initialisation start";
private static final String INIT_COMPLETED = "Response application initialisation completed";
private static final String PARSERS_UPDATE_START = "Response rules update start";
private static final String PARSERS_UPDATE_COMPLETED = "Response rules update completed";
private static final String RESPONSE_RULES_NOT_INITIALISED = "Response rules have not been loaded into zookeeper during initialisation";
private final AtomicReference<ResponseEngine> currentEngine = new AtomicReference<>();
private final ZooKeeperConnector zooKeeperConnector;
@@ -38,14 +38,14 @@ public class ZooKeeperRulesProvider implements RulesProvider {
private final SiembolCounter updateErrorCounter;
public ZooKeeperRulesProvider(ZooKeeperAttributesDto zookeperAttributes,
public ZooKeeperRulesProvider(ZooKeeperAttributesDto zooKeeperAttributes,
RespondingCompiler respondingCompiler,
SiembolMetricsRegistrar metricsRegistrar) throws Exception {
this(new ZooKeeperConnectorFactoryImpl(), zookeperAttributes, respondingCompiler, metricsRegistrar);
this(new ZooKeeperConnectorFactoryImpl(), zooKeeperAttributes, respondingCompiler, metricsRegistrar);
}
ZooKeeperRulesProvider(ZooKeeperConnectorFactory factory,
ZooKeeperAttributesDto zookeperAttributes,
ZooKeeperAttributesDto zooKeeperAttributes,
RespondingCompiler respondingCompiler,
SiembolMetricsRegistrar metricsRegistrar) throws Exception {
LOG.info(INIT_START);
@@ -53,11 +53,11 @@ public class ZooKeeperRulesProvider implements RulesProvider {
this.updateCounter = metricsRegistrar.registerCounter(SiembolMetrics.RESPONSE_RULES_UPDATE.getMetricName());
this.updateErrorCounter = metricsRegistrar.registerCounter(SiembolMetrics.RESPONSE_RULES_ERROR_UPDATE.getMetricName());
this.respondingCompiler = respondingCompiler;
zooKeeperConnector = factory.createZookeeperConnector(zookeperAttributes);
zooKeeperConnector = factory.createZookeeperConnector(zooKeeperAttributes);
updateRules();
if (currentEngine.get() == null) {
throw new IllegalStateException(ERROR_INIT_MESSAGE);
LOG.warn(RESPONSE_RULES_NOT_INITIALISED);
}
zooKeeperConnector.addCacheListener(this::updateRules);
LOG.info(INIT_COMPLETED);
@@ -74,7 +74,6 @@ public class ZooKeeperRulesProvider implements RulesProvider {
LOG.error(COMPILE_RULES_ERROR_MSG_FORMAT, result.getAttributes().getMessage());
return;
}
currentEngine.set(result.getAttributes().getResponseEngine());
updateCounter.increment();
LOG.info(PARSERS_UPDATE_COMPLETED);

View File

@@ -68,11 +68,13 @@ public class KafkaStreamRuleServiceTest {
streamsFactory = new TestingDriverKafkaStreamsFactory(kafkaStreams);
when(rulesProvider.getEngine()).thenReturn(rulesEngine);
when(rulesProvider.isInitialised()).thenReturn(true);
ResponseConfigurationProperties properties = new ResponseConfigurationProperties();
properties.setInputTopic(inputTopic);
properties.setErrorTopic(errorTopic);
properties.setStreamConfig(new HashMap<>());
properties.getStreamConfig().put("application.id", "siembol-response-" + UUID.randomUUID());
properties.setInitialisationSleepTimeMs(0);
streamService = new KafkaStreamRulesService(rulesProvider, properties, streamsFactory);
testDriver = streamsFactory.getTestDriver();
testInputTopic = testDriver.createInputTopic(inputTopic, Serdes.String().serializer(),
@@ -135,29 +137,54 @@ public class KafkaStreamRuleServiceTest {
@Test
public void testHealthUpCreated() {
when(rulesProvider.isInitialised()).thenReturn(true);
when(kafkaStreams.state()).thenReturn(KafkaStreams.State.CREATED);
Mono<Health> health = streamService.checkHealth();
Assert.assertEquals(Status.UP, Objects.requireNonNull(health.block()).getStatus());
}
@Test
public void testHealthUpRunning() {
public void healthUpRunning() {
when(kafkaStreams.state()).thenReturn(KafkaStreams.State.RUNNING);
Mono<Health> health = streamService.checkHealth();
Assert.assertEquals(Status.UP, Objects.requireNonNull(health.block()).getStatus());
}
@Test
public void testHealthUpRebalancing() {
public void healthUpRebalancing() {
when(kafkaStreams.state()).thenReturn(KafkaStreams.State.REBALANCING);
Mono<Health> health = streamService.checkHealth();
Assert.assertEquals(Status.UP, Objects.requireNonNull(health.block()).getStatus());
}
@Test
public void testHealthDownError() {
public void healthDownError() {
when(kafkaStreams.state()).thenReturn(KafkaStreams.State.ERROR);
Mono<Health> health = streamService.checkHealth();
Assert.assertEquals(Status.DOWN, Objects.requireNonNull(health.block()).getStatus());
}
@Test
public void healthUpNotInitialised() {
when(rulesProvider.isInitialised()).thenReturn(false);
Mono<Health> health = streamService.checkHealth();
Assert.assertEquals(Status.UP, Objects.requireNonNull(health.block()).getStatus());
}
@Test
public void initialiseRulesSleepOnce() throws InterruptedException {
when(rulesProvider.isInitialised()).thenReturn(false, false, true);
streamService.initialise();
Thread.sleep(100);
verify(rulesProvider, times(3)).isInitialised();
verify(kafkaStreams, times(1)).start();
}
@Test
public void initialiseRulesNoSleep() {
streamService.initialise();
verify(rulesProvider, times(2)).isInitialised();
verify(kafkaStreams, times(1)).start();
}
}

View File

@@ -5,6 +5,7 @@ import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.springframework.boot.actuate.health.Health;
import uk.co.gresearch.siembol.common.metrics.SiembolMetrics;
import uk.co.gresearch.siembol.common.metrics.SiembolMetricsRegistrar;
import uk.co.gresearch.siembol.common.metrics.test.SiembolMetricsTestRegistrar;
@@ -101,13 +102,14 @@ public class ZooKeeperRulesProviderTest {
Assert.assertEquals(testingRules, engine.getRulesMetadata().getAttributes().getJsonRules());
}
@Test(expected = java.lang.IllegalStateException.class)
@Test
public void testInvalidRulesInit() throws Exception {
when(rulesZooKeeperConnector.getData()).thenReturn("INVALID");
rulesProvider = new ZooKeeperRulesProvider(zooKeeperConnectorFactory,
zooKeeperAttributes,
compiler,
cachedMetricsRegistrar);
Assert.assertFalse(rulesProvider.isInitialised());
}
@Test

View File

@@ -9,7 +9,7 @@
<parent>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>siembol</artifactId>
<version>2.6.1-SNAPSHOT</version>
<version>2.6.2-SNAPSHOT</version>
</parent>
<dependencies>
<dependency>