mirror of
https://github.com/optim-enterprises-bv/siembol.git
synced 2025-11-02 03:18:09 +00:00
siembol alerting: sampling rule protection messages (#576)
* siembol alerting: sampling rule protection messages * fixing sampling and adding test * increasing app version
This commit is contained in:
@@ -11,7 +11,7 @@
|
||||
<parent>
|
||||
<groupId>uk.co.gresearch.siembol</groupId>
|
||||
<artifactId>alerting</artifactId>
|
||||
<version>2.4.2-SNAPSHOT</version>
|
||||
<version>2.4.3-SNAPSHOT</version>
|
||||
</parent>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
@@ -35,7 +35,7 @@
|
||||
<dependency>
|
||||
<groupId>uk.co.gresearch.siembol</groupId>
|
||||
<artifactId>siembol-common</artifactId>
|
||||
<version>2.4.2-SNAPSHOT</version>
|
||||
<version>2.4.3-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
|
||||
@@ -11,7 +11,7 @@
|
||||
<parent>
|
||||
<groupId>uk.co.gresearch.siembol</groupId>
|
||||
<artifactId>alerting</artifactId>
|
||||
<version>2.4.2-SNAPSHOT</version>
|
||||
<version>2.4.3-SNAPSHOT</version>
|
||||
</parent>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
@@ -23,7 +23,7 @@
|
||||
<dependency>
|
||||
<groupId>uk.co.gresearch.siembol</groupId>
|
||||
<artifactId>alerting-core</artifactId>
|
||||
<version>2.4.2-SNAPSHOT</version>
|
||||
<version>2.4.3-SNAPSHOT</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<artifactId>jackson-databind</artifactId>
|
||||
|
||||
@@ -9,7 +9,7 @@
|
||||
<parent>
|
||||
<groupId>uk.co.gresearch.siembol</groupId>
|
||||
<artifactId>alerting</artifactId>
|
||||
<version>2.4.2-SNAPSHOT</version>
|
||||
<version>2.4.3-SNAPSHOT</version>
|
||||
</parent>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
@@ -51,7 +51,7 @@
|
||||
<dependency>
|
||||
<groupId>uk.co.gresearch.siembol</groupId>
|
||||
<artifactId>alerting-core</artifactId>
|
||||
<version>2.4.2-SNAPSHOT</version>
|
||||
<version>2.4.3-SNAPSHOT</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>org.slf4j</groupId>
|
||||
|
||||
@@ -68,13 +68,20 @@ public class AlertingKafkaWriterBolt extends KafkaWriterBoltBase {
|
||||
AlertingResult matchesInfo = ruleProtection.incrementRuleMatches(match.getFullRuleName());
|
||||
int hourlyMatches = matchesInfo.getAttributes().getHourlyMatches();
|
||||
int dailyMatches = matchesInfo.getAttributes().getDailyMatches();
|
||||
int hourlyMatchesDiffWithMax = hourlyMatches - match.getMaxHourMatches().intValue();
|
||||
int dailyMatchesDiffWithMax = dailyMatches - match.getMaxDayMatches().intValue();
|
||||
|
||||
if (match.getMaxHourMatches().intValue() < hourlyMatches
|
||||
|| match.getMaxDayMatches().intValue() < dailyMatches) {
|
||||
if (hourlyMatchesDiffWithMax > 0 || dailyMatchesDiffWithMax > 0) {
|
||||
String msg = String.format(RULE_PROTECTION_ERROR_MESSAGE,
|
||||
match.getFullRuleName(), hourlyMatches, dailyMatches, match.getAlertJson());
|
||||
LOG.debug(msg);
|
||||
exceptions.add(msg);
|
||||
|
||||
if ((hourlyMatchesDiffWithMax > 0 && isPowerOfTwo(hourlyMatchesDiffWithMax))
|
||||
|| (dailyMatchesDiffWithMax > 0 && isPowerOfTwo(dailyMatchesDiffWithMax))) {
|
||||
//NOTE: sending message about an alert filtered by rule protection is sampled exponentially
|
||||
exceptions.add(msg);
|
||||
}
|
||||
|
||||
counters.add(SiembolMetrics.ALERTING_ENGINE_RULE_PROTECTION.getMetricName());
|
||||
counters.add(SiembolMetrics.ALERTING_RULE_PROTECTION.getMetricName(match.getRuleName()));
|
||||
continue;
|
||||
@@ -125,4 +132,9 @@ public class AlertingKafkaWriterBolt extends KafkaWriterBoltBase {
|
||||
error.setMessage(errorMsg);
|
||||
return error.toString();
|
||||
}
|
||||
|
||||
private boolean isPowerOfTwo(int n) {
|
||||
return (n & n - 1) == 0;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -142,7 +142,11 @@ public class AlertingKafkaWriterBoltTest {
|
||||
public void testAlertMessageReachProtectionThreshold() throws Exception {
|
||||
AlertMessage alert = new AlertMessage(AlertingEngineType.SIEMBOL_ALERTS, alertMap, AlertMessageStr);
|
||||
AlertMessages.add(alert);
|
||||
AlertMessages.add(alert);
|
||||
AlertMessages.add(alert);//#1 protection
|
||||
AlertMessages.add(alert);//#2 protection
|
||||
AlertMessages.add(alert);//#3 protection
|
||||
AlertMessages.add(alert);//#4 protection
|
||||
AlertMessages.add(alert);//#5 protection
|
||||
writerBolt.execute(tuple);
|
||||
List<String> outputAlert = kafkaRule.helper().consumeStrings("alerts", 1)
|
||||
.get(10, TimeUnit.SECONDS);
|
||||
@@ -150,10 +154,10 @@ public class AlertingKafkaWriterBoltTest {
|
||||
Assert.assertEquals(1, outputAlert.size());
|
||||
Assert.assertEquals(AlertMessageStr.trim(), outputAlert.get(0).trim());
|
||||
|
||||
List<String> outputExceptions = kafkaRule.helper().consumeStrings("errors", 1)
|
||||
List<String> outputExceptions = kafkaRule.helper().consumeStrings("errors", 3)
|
||||
.get(10, TimeUnit.SECONDS);
|
||||
Assert.assertNotNull(outputExceptions);
|
||||
Assert.assertEquals(1, outputExceptions.size());
|
||||
Assert.assertEquals(3, outputExceptions.size());
|
||||
Map<String, Object> parsedException = JSON_MAP_READER.readValue(outputExceptions.get(0));
|
||||
Assert.assertEquals("siembol_alerts", parsedException.get("failed_sensor_type"));
|
||||
Assert.assertEquals("alerting_error", parsedException.get("error_type"));
|
||||
@@ -163,9 +167,9 @@ public class AlertingKafkaWriterBoltTest {
|
||||
metricsTestRegistrarFactory.getCounterValue(SiembolMetrics.ALERTING_ENGINE_MATCHES.getMetricName()));
|
||||
Assert.assertEquals(1, metricsTestRegistrarFactory
|
||||
.getCounterValue(SiembolMetrics.ALERTING_RULE_MATCHES.getMetricName("alert1")));
|
||||
Assert.assertEquals(1, metricsTestRegistrarFactory
|
||||
Assert.assertEquals(5, metricsTestRegistrarFactory
|
||||
.getCounterValue(SiembolMetrics.ALERTING_ENGINE_RULE_PROTECTION.getMetricName()));
|
||||
Assert.assertEquals(1, metricsTestRegistrarFactory
|
||||
Assert.assertEquals(5, metricsTestRegistrarFactory
|
||||
.getCounterValue(SiembolMetrics.ALERTING_RULE_PROTECTION.getMetricName("alert1")));
|
||||
}
|
||||
|
||||
|
||||
@@ -11,7 +11,7 @@
|
||||
<parent>
|
||||
<groupId>uk.co.gresearch.siembol</groupId>
|
||||
<artifactId>siembol</artifactId>
|
||||
<version>2.4.2-SNAPSHOT</version>
|
||||
<version>2.4.3-SNAPSHOT</version>
|
||||
</parent>
|
||||
<modules>
|
||||
<module>alerting-core</module>
|
||||
|
||||
@@ -9,13 +9,13 @@
|
||||
<parent>
|
||||
<groupId>uk.co.gresearch.siembol</groupId>
|
||||
<artifactId>config-editor</artifactId>
|
||||
<version>2.4.2-SNAPSHOT</version>
|
||||
<version>2.4.3-SNAPSHOT</version>
|
||||
</parent>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>uk.co.gresearch.siembol</groupId>
|
||||
<artifactId>siembol-common</artifactId>
|
||||
<version>2.4.2-SNAPSHOT</version>
|
||||
<version>2.4.3-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
|
||||
@@ -9,7 +9,7 @@
|
||||
<parent>
|
||||
<groupId>uk.co.gresearch.siembol</groupId>
|
||||
<artifactId>config-editor</artifactId>
|
||||
<version>2.4.2-SNAPSHOT</version>
|
||||
<version>2.4.3-SNAPSHOT</version>
|
||||
</parent>
|
||||
<dependencyManagement>
|
||||
<dependencies>
|
||||
@@ -56,7 +56,7 @@
|
||||
<dependency>
|
||||
<groupId>uk.co.gresearch.siembol</groupId>
|
||||
<artifactId>siembol-common</artifactId>
|
||||
<version>2.4.2-SNAPSHOT</version>
|
||||
<version>2.4.3-SNAPSHOT</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>org.slf4j</groupId>
|
||||
@@ -67,22 +67,22 @@
|
||||
<dependency>
|
||||
<groupId>uk.co.gresearch.siembol</groupId>
|
||||
<artifactId>config-editor-core</artifactId>
|
||||
<version>2.4.2-SNAPSHOT</version>
|
||||
<version>2.4.3-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>uk.co.gresearch.siembol</groupId>
|
||||
<artifactId>config-editor-services</artifactId>
|
||||
<version>2.4.2-SNAPSHOT</version>
|
||||
<version>2.4.3-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>uk.co.gresearch.siembol</groupId>
|
||||
<artifactId>config-editor-sync</artifactId>
|
||||
<version>2.4.2-SNAPSHOT</version>
|
||||
<version>2.4.3-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>uk.co.gresearch.siembol</groupId>
|
||||
<artifactId>alerting-core</artifactId>
|
||||
<version>2.4.2-SNAPSHOT</version>
|
||||
<version>2.4.3-SNAPSHOT</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>org.slf4j</groupId>
|
||||
@@ -93,7 +93,7 @@
|
||||
<dependency>
|
||||
<groupId>uk.co.gresearch.siembol</groupId>
|
||||
<artifactId>parsing-app</artifactId>
|
||||
<version>2.4.2-SNAPSHOT</version>
|
||||
<version>2.4.3-SNAPSHOT</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>org.slf4j</groupId>
|
||||
@@ -104,7 +104,7 @@
|
||||
<dependency>
|
||||
<groupId>uk.co.gresearch.siembol</groupId>
|
||||
<artifactId>enriching-core</artifactId>
|
||||
<version>2.4.2-SNAPSHOT</version>
|
||||
<version>2.4.3-SNAPSHOT</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>org.slf4j</groupId>
|
||||
@@ -115,7 +115,7 @@
|
||||
<dependency>
|
||||
<groupId>uk.co.gresearch.siembol</groupId>
|
||||
<artifactId>responding-core</artifactId>
|
||||
<version>2.4.2-SNAPSHOT</version>
|
||||
<version>2.4.3-SNAPSHOT</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>org.slf4j</groupId>
|
||||
|
||||
@@ -10,7 +10,7 @@
|
||||
<parent>
|
||||
<groupId>uk.co.gresearch.siembol</groupId>
|
||||
<artifactId>config-editor</artifactId>
|
||||
<version>2.4.2-SNAPSHOT</version>
|
||||
<version>2.4.3-SNAPSHOT</version>
|
||||
</parent>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
@@ -41,32 +41,32 @@
|
||||
<dependency>
|
||||
<groupId>uk.co.gresearch.siembol</groupId>
|
||||
<artifactId>siembol-common</artifactId>
|
||||
<version>2.4.2-SNAPSHOT</version>
|
||||
<version>2.4.3-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>uk.co.gresearch.siembol</groupId>
|
||||
<artifactId>config-editor-core</artifactId>
|
||||
<version>2.4.2-SNAPSHOT</version>
|
||||
<version>2.4.3-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>uk.co.gresearch.siembol</groupId>
|
||||
<artifactId>alerting-core</artifactId>
|
||||
<version>2.4.2-SNAPSHOT</version>
|
||||
<version>2.4.3-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>uk.co.gresearch.siembol</groupId>
|
||||
<artifactId>parsing-app</artifactId>
|
||||
<version>2.4.2-SNAPSHOT</version>
|
||||
<version>2.4.3-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>uk.co.gresearch.siembol</groupId>
|
||||
<artifactId>enriching-core</artifactId>
|
||||
<version>2.4.2-SNAPSHOT</version>
|
||||
<version>2.4.3-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>uk.co.gresearch.siembol</groupId>
|
||||
<artifactId>responding-core</artifactId>
|
||||
<version>2.4.2-SNAPSHOT</version>
|
||||
<version>2.4.3-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
|
||||
@@ -9,7 +9,7 @@
|
||||
<parent>
|
||||
<groupId>uk.co.gresearch.siembol</groupId>
|
||||
<artifactId>config-editor</artifactId>
|
||||
<version>2.4.2-SNAPSHOT</version>
|
||||
<version>2.4.3-SNAPSHOT</version>
|
||||
</parent>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
@@ -20,17 +20,17 @@
|
||||
<dependency>
|
||||
<groupId>uk.co.gresearch.siembol</groupId>
|
||||
<artifactId>siembol-common</artifactId>
|
||||
<version>2.4.2-SNAPSHOT</version>
|
||||
<version>2.4.3-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>uk.co.gresearch.siembol</groupId>
|
||||
<artifactId>config-editor-core</artifactId>
|
||||
<version>2.4.2-SNAPSHOT</version>
|
||||
<version>2.4.3-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>uk.co.gresearch.siembol</groupId>
|
||||
<artifactId>parsing-app</artifactId>
|
||||
<version>2.4.2-SNAPSHOT</version>
|
||||
<version>2.4.3-SNAPSHOT</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
|
||||
@@ -11,7 +11,7 @@
|
||||
<parent>
|
||||
<groupId>uk.co.gresearch.siembol</groupId>
|
||||
<artifactId>siembol</artifactId>
|
||||
<version>2.4.2-SNAPSHOT</version>
|
||||
<version>2.4.3-SNAPSHOT</version>
|
||||
</parent>
|
||||
<modules>
|
||||
<module>config-editor-core</module>
|
||||
|
||||
@@ -9,7 +9,7 @@
|
||||
<parent>
|
||||
<groupId>uk.co.gresearch.siembol</groupId>
|
||||
<artifactId>siembol</artifactId>
|
||||
<version>2.4.2-SNAPSHOT</version>
|
||||
<version>2.4.3-SNAPSHOT</version>
|
||||
<relativePath>../../pom.xml</relativePath>
|
||||
</parent>
|
||||
<dependencyManagement>
|
||||
@@ -43,7 +43,7 @@
|
||||
<dependency>
|
||||
<groupId>uk.co.gresearch.siembol</groupId>
|
||||
<artifactId>siembol-common</artifactId>
|
||||
<version>2.4.2-SNAPSHOT</version>
|
||||
<version>2.4.3-SNAPSHOT</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>org.slf4j</groupId>
|
||||
|
||||
@@ -11,7 +11,7 @@
|
||||
<parent>
|
||||
<groupId>uk.co.gresearch.siembol</groupId>
|
||||
<artifactId>enriching</artifactId>
|
||||
<version>2.4.2-SNAPSHOT</version>
|
||||
<version>2.4.3-SNAPSHOT</version>
|
||||
</parent>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
@@ -35,12 +35,12 @@
|
||||
<dependency>
|
||||
<groupId>uk.co.gresearch.siembol</groupId>
|
||||
<artifactId>siembol-common</artifactId>
|
||||
<version>2.4.2-SNAPSHOT</version>
|
||||
<version>2.4.3-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>uk.co.gresearch.siembol</groupId>
|
||||
<artifactId>alerting-core</artifactId>
|
||||
<version>2.4.2-SNAPSHOT</version>
|
||||
<version>2.4.3-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
|
||||
@@ -9,7 +9,7 @@
|
||||
<parent>
|
||||
<groupId>uk.co.gresearch.siembol</groupId>
|
||||
<artifactId>enriching</artifactId>
|
||||
<version>2.4.2-SNAPSHOT</version>
|
||||
<version>2.4.3-SNAPSHOT</version>
|
||||
</parent>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
@@ -75,7 +75,7 @@
|
||||
<dependency>
|
||||
<groupId>uk.co.gresearch.siembol</groupId>
|
||||
<artifactId>enriching-core</artifactId>
|
||||
<version>2.4.2-SNAPSHOT</version>
|
||||
<version>2.4.3-SNAPSHOT</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>org.slf4j</groupId>
|
||||
|
||||
@@ -11,7 +11,7 @@
|
||||
<parent>
|
||||
<groupId>uk.co.gresearch.siembol</groupId>
|
||||
<artifactId>siembol</artifactId>
|
||||
<version>2.4.2-SNAPSHOT</version>
|
||||
<version>2.4.3-SNAPSHOT</version>
|
||||
</parent>
|
||||
<modules>
|
||||
<module>enriching-core</module>
|
||||
|
||||
@@ -11,7 +11,7 @@
|
||||
<parent>
|
||||
<groupId>uk.co.gresearch.siembol</groupId>
|
||||
<artifactId>parsing</artifactId>
|
||||
<version>2.4.2-SNAPSHOT</version>
|
||||
<version>2.4.3-SNAPSHOT</version>
|
||||
</parent>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
@@ -39,12 +39,12 @@
|
||||
<dependency>
|
||||
<groupId>uk.co.gresearch.siembol</groupId>
|
||||
<artifactId>siembol-common</artifactId>
|
||||
<version>2.4.2-SNAPSHOT</version>
|
||||
<version>2.4.3-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>uk.co.gresearch.siembol</groupId>
|
||||
<artifactId>parsing-core</artifactId>
|
||||
<version>2.4.2-SNAPSHOT</version>
|
||||
<version>2.4.3-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
|
||||
@@ -11,7 +11,7 @@
|
||||
<parent>
|
||||
<groupId>uk.co.gresearch.siembol</groupId>
|
||||
<artifactId>parsing</artifactId>
|
||||
<version>2.4.2-SNAPSHOT</version>
|
||||
<version>2.4.3-SNAPSHOT</version>
|
||||
</parent>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
@@ -45,7 +45,7 @@
|
||||
<dependency>
|
||||
<groupId>uk.co.gresearch.siembol</groupId>
|
||||
<artifactId>siembol-common</artifactId>
|
||||
<version>2.4.2-SNAPSHOT</version>
|
||||
<version>2.4.3-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>joda-time</groupId>
|
||||
|
||||
@@ -9,7 +9,7 @@
|
||||
<parent>
|
||||
<groupId>uk.co.gresearch.siembol</groupId>
|
||||
<artifactId>parsing</artifactId>
|
||||
<version>2.4.2-SNAPSHOT</version>
|
||||
<version>2.4.3-SNAPSHOT</version>
|
||||
</parent>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
@@ -75,7 +75,7 @@
|
||||
<dependency>
|
||||
<groupId>uk.co.gresearch.siembol</groupId>
|
||||
<artifactId>parsing-app</artifactId>
|
||||
<version>2.4.2-SNAPSHOT</version>
|
||||
<version>2.4.3-SNAPSHOT</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>org.slf4j</groupId>
|
||||
|
||||
@@ -11,7 +11,7 @@
|
||||
<parent>
|
||||
<groupId>uk.co.gresearch.siembol</groupId>
|
||||
<artifactId>siembol</artifactId>
|
||||
<version>2.4.2-SNAPSHOT</version>
|
||||
<version>2.4.3-SNAPSHOT</version>
|
||||
</parent>
|
||||
<modules>
|
||||
<module>parsing-core</module>
|
||||
|
||||
2
pom.xml
2
pom.xml
@@ -6,7 +6,7 @@
|
||||
<groupId>uk.co.gresearch.siembol</groupId>
|
||||
<artifactId>siembol</artifactId>
|
||||
<name>siembol</name>
|
||||
<version>2.4.2-SNAPSHOT</version>
|
||||
<version>2.4.3-SNAPSHOT</version>
|
||||
<description>A scalable, advanced security analytics framework based on open-source big data technologies.</description>
|
||||
<inceptionYear>2019</inceptionYear>
|
||||
<url>https://siembol.io/</url>
|
||||
|
||||
@@ -11,7 +11,7 @@
|
||||
<parent>
|
||||
<groupId>uk.co.gresearch.siembol</groupId>
|
||||
<artifactId>siembol</artifactId>
|
||||
<version>2.4.2-SNAPSHOT</version>
|
||||
<version>2.4.3-SNAPSHOT</version>
|
||||
</parent>
|
||||
<modules>
|
||||
<module>responding-core</module>
|
||||
|
||||
@@ -11,7 +11,7 @@
|
||||
<parent>
|
||||
<groupId>uk.co.gresearch.siembol</groupId>
|
||||
<artifactId>responding</artifactId>
|
||||
<version>2.4.2-SNAPSHOT</version>
|
||||
<version>2.4.3-SNAPSHOT</version>
|
||||
</parent>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
@@ -35,12 +35,12 @@
|
||||
<dependency>
|
||||
<groupId>uk.co.gresearch.siembol</groupId>
|
||||
<artifactId>siembol-common</artifactId>
|
||||
<version>2.4.2-SNAPSHOT</version>
|
||||
<version>2.4.3-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>uk.co.gresearch.siembol</groupId>
|
||||
<artifactId>alerting-core</artifactId>
|
||||
<version>2.4.2-SNAPSHOT</version>
|
||||
<version>2.4.3-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.jayway.jsonpath</groupId>
|
||||
|
||||
@@ -9,7 +9,7 @@
|
||||
<parent>
|
||||
<groupId>uk.co.gresearch.siembol</groupId>
|
||||
<artifactId>responding</artifactId>
|
||||
<version>2.4.2-SNAPSHOT</version>
|
||||
<version>2.4.3-SNAPSHOT</version>
|
||||
</parent>
|
||||
<dependencyManagement>
|
||||
<dependencies>
|
||||
@@ -51,7 +51,7 @@
|
||||
<dependency>
|
||||
<groupId>uk.co.gresearch.siembol</groupId>
|
||||
<artifactId>siembol-common</artifactId>
|
||||
<version>2.4.2-SNAPSHOT</version>
|
||||
<version>2.4.3-SNAPSHOT</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>org.slf4j</groupId>
|
||||
@@ -62,7 +62,7 @@
|
||||
<dependency>
|
||||
<groupId>uk.co.gresearch.siembol</groupId>
|
||||
<artifactId>responding-core</artifactId>
|
||||
<version>2.4.2-SNAPSHOT</version>
|
||||
<version>2.4.3-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
|
||||
@@ -9,7 +9,7 @@
|
||||
<parent>
|
||||
<groupId>uk.co.gresearch.siembol</groupId>
|
||||
<artifactId>siembol</artifactId>
|
||||
<version>2.4.2-SNAPSHOT</version>
|
||||
<version>2.4.3-SNAPSHOT</version>
|
||||
</parent>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
|
||||
@@ -65,12 +65,17 @@ public abstract class KafkaWriterBoltBase extends BaseRichBolt {
|
||||
}
|
||||
|
||||
protected void writeMessages(List<KafkaWriterMessage> messages, List<String> countersNames, KafkaWriterAnchor anchor) {
|
||||
anchor.acquire(messages.size());
|
||||
List<SiembolCounter> siembolCounters = countersNames.stream()
|
||||
.map(x -> metricsRegistrar.registerCounter(x))
|
||||
.collect(Collectors.toList());
|
||||
anchor.addSiembolCounters(siembolCounters);
|
||||
messages.forEach(x -> writeMessage(x, anchor));
|
||||
|
||||
if (messages.isEmpty()) {
|
||||
acknowledgeWithoutWriting(anchor);
|
||||
} else {
|
||||
anchor.acquire(messages.size());
|
||||
messages.forEach(x -> writeMessage(x, anchor));
|
||||
}
|
||||
}
|
||||
|
||||
private Callback createProducerCallback(final KafkaWriterAnchor anchor) {
|
||||
@@ -89,6 +94,13 @@ public abstract class KafkaWriterBoltBase extends BaseRichBolt {
|
||||
};
|
||||
}
|
||||
|
||||
private void acknowledgeWithoutWriting(KafkaWriterAnchor anchor) {
|
||||
anchor.incrementSiembolCounters();
|
||||
synchronized (collector) {
|
||||
collector.ack(anchor.getTuple());
|
||||
}
|
||||
}
|
||||
|
||||
private void writeMessage(KafkaWriterMessage message, KafkaWriterAnchor anchor) {
|
||||
try {
|
||||
var callBack = createProducerCallback(anchor);
|
||||
|
||||
Reference in New Issue
Block a user