Siembol-monitoring: heartbeat (#673)

This commit is contained in:
Celie Valentiny
2022-06-17 12:37:56 +01:00
committed by GitHub
parent 49515f2b80
commit b7924bcb5c
59 changed files with 1213 additions and 81 deletions

View File

@@ -70,6 +70,11 @@ jobs:
with:
name: storm-topology-manager
path: deployment/storm-topology-manager/target/storm-topology-manager-*.jar
- name: Upload siembol-monitoring jar
uses: actions/upload-artifact@v3
with:
name: siembol-monitoring
path: deployment/siembol-monitoring/target/siembol-monitoring-*.jar
build-docker-storm:
runs-on: ubuntu-latest
@@ -133,7 +138,7 @@ jobs:
needs: build-java
strategy:
matrix:
component: [config-editor-rest, responding-stream, storm-topology-manager]
component: [config-editor-rest, responding-stream, storm-topology-manager, siembol-monitoring]
fail-fast: false
steps:
- name: Checkout
@@ -341,7 +346,7 @@ jobs:
environment: release
strategy:
matrix:
component: [alerting-storm, enriching-storm, parsing-storm, config-editor-rest, responding-stream, storm-topology-manager, config-editor-ui]
component: [alerting-storm, enriching-storm, parsing-storm, config-editor-rest, responding-stream, storm-topology-manager, config-editor-ui, siembol-monitoring]
fail-fast: false
steps:
- name: Login to DockerHub

View File

@@ -11,7 +11,7 @@
<parent>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>alerting</artifactId>
<version>2.5.11-SNAPSHOT</version>
<version>2.5.12-SNAPSHOT</version>
</parent>
<dependencies>
<dependency>
@@ -35,7 +35,7 @@
<dependency>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>siembol-common</artifactId>
<version>2.5.11-SNAPSHOT</version>
<version>2.5.12-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>junit</groupId>

View File

@@ -11,7 +11,7 @@
<parent>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>alerting</artifactId>
<version>2.5.11-SNAPSHOT</version>
<version>2.5.12-SNAPSHOT</version>
</parent>
<dependencies>
<dependency>
@@ -23,7 +23,7 @@
<dependency>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>alerting-core</artifactId>
<version>2.5.11-SNAPSHOT</version>
<version>2.5.12-SNAPSHOT</version>
<exclusions>
<exclusion>
<artifactId>jackson-databind</artifactId>

View File

@@ -9,7 +9,7 @@
<parent>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>alerting</artifactId>
<version>2.5.11-SNAPSHOT</version>
<version>2.5.12-SNAPSHOT</version>
</parent>
<dependencies>
<dependency>
@@ -51,7 +51,7 @@
<dependency>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>alerting-core</artifactId>
<version>2.5.11-SNAPSHOT</version>
<version>2.5.12-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>

View File

@@ -11,7 +11,7 @@
<parent>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>siembol</artifactId>
<version>2.5.11-SNAPSHOT</version>
<version>2.5.12-SNAPSHOT</version>
</parent>
<modules>
<module>alerting-core</module>

View File

@@ -9,13 +9,13 @@
<parent>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>config-editor</artifactId>
<version>2.5.11-SNAPSHOT</version>
<version>2.5.12-SNAPSHOT</version>
</parent>
<dependencies>
<dependency>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>siembol-common</artifactId>
<version>2.5.11-SNAPSHOT</version>
<version>2.5.12-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>

View File

@@ -8,6 +8,7 @@ import org.slf4j.LoggerFactory;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.Status;
import uk.co.gresearch.siembol.common.constants.ServiceType;
import uk.co.gresearch.siembol.configeditor.common.*;
import uk.co.gresearch.siembol.configeditor.configstore.ConfigStore;
import uk.co.gresearch.siembol.configeditor.configstore.ConfigStoreImpl;

View File

@@ -1,7 +1,7 @@
package uk.co.gresearch.siembol.configeditor.serviceaggregator;
import uk.co.gresearch.siembol.configeditor.common.ConfigSchemaService;
import uk.co.gresearch.siembol.configeditor.common.ServiceType;
import uk.co.gresearch.siembol.common.constants.ServiceType;
import uk.co.gresearch.siembol.configeditor.configstore.ConfigStore;
import static uk.co.gresearch.siembol.configeditor.model.ConfigEditorResult.StatusCode.OK;

View File

@@ -8,7 +8,7 @@ import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.Status;
import uk.co.gresearch.siembol.configeditor.common.AuthorisationProvider;
import uk.co.gresearch.siembol.configeditor.common.ConfigSchemaService;
import uk.co.gresearch.siembol.configeditor.common.ServiceType;
import uk.co.gresearch.siembol.common.constants.ServiceType;
import uk.co.gresearch.siembol.configeditor.common.UserInfo;
import uk.co.gresearch.siembol.configeditor.configstore.ConfigStore;
import uk.co.gresearch.siembol.configeditor.model.ConfigEditorAttributes;

View File

@@ -9,7 +9,7 @@
<parent>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>config-editor</artifactId>
<version>2.5.11-SNAPSHOT</version>
<version>2.5.12-SNAPSHOT</version>
</parent>
<dependencyManagement>
<dependencies>
@@ -56,7 +56,7 @@
<dependency>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>siembol-common</artifactId>
<version>2.5.11-SNAPSHOT</version>
<version>2.5.12-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
@@ -67,22 +67,22 @@
<dependency>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>config-editor-core</artifactId>
<version>2.5.11-SNAPSHOT</version>
<version>2.5.12-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>config-editor-services</artifactId>
<version>2.5.11-SNAPSHOT</version>
<version>2.5.12-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>config-editor-sync</artifactId>
<version>2.5.11-SNAPSHOT</version>
<version>2.5.12-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>alerting-core</artifactId>
<version>2.5.11-SNAPSHOT</version>
<version>2.5.12-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
@@ -93,7 +93,7 @@
<dependency>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>parsing-app</artifactId>
<version>2.5.11-SNAPSHOT</version>
<version>2.5.12-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
@@ -104,7 +104,7 @@
<dependency>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>enriching-core</artifactId>
<version>2.5.11-SNAPSHOT</version>
<version>2.5.12-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
@@ -115,7 +115,7 @@
<dependency>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>responding-core</artifactId>
<version>2.5.11-SNAPSHOT</version>
<version>2.5.12-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>

View File

@@ -17,7 +17,7 @@ import uk.co.gresearch.siembol.common.zookeeper.ZooKeeperConnectorFactoryImpl;
import uk.co.gresearch.siembol.configeditor.common.AuthorisationProvider;
import uk.co.gresearch.siembol.configeditor.common.ConfigEditorUtils;
import uk.co.gresearch.siembol.configeditor.common.ConfigSchemaService;
import uk.co.gresearch.siembol.configeditor.common.ServiceType;
import uk.co.gresearch.siembol.common.constants.ServiceType;
import uk.co.gresearch.siembol.configeditor.sync.common.ConfigServiceHelper;
import uk.co.gresearch.siembol.configeditor.sync.service.*;
import uk.co.gresearch.siembol.configeditor.model.ConfigEditorUiLayout;

View File

@@ -6,7 +6,7 @@ import uk.co.gresearch.siembol.common.zookeeper.ZooKeeperConnector;
import uk.co.gresearch.siembol.common.zookeeper.ZooKeeperConnectorFactory;
import uk.co.gresearch.siembol.configeditor.common.ConfigInfoProvider;
import uk.co.gresearch.siembol.configeditor.common.ConfigInfoType;
import uk.co.gresearch.siembol.configeditor.common.ServiceType;
import uk.co.gresearch.siembol.common.constants.ServiceType;
import uk.co.gresearch.siembol.configeditor.configinfo.AdminConfigInfoProvider;
import uk.co.gresearch.siembol.configeditor.model.ConfigEditorResult;
import uk.co.gresearch.siembol.configeditor.rest.common.ConfigEditorConfigurationProperties;

View File

@@ -10,7 +10,7 @@
<parent>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>config-editor</artifactId>
<version>2.5.11-SNAPSHOT</version>
<version>2.5.12-SNAPSHOT</version>
</parent>
<dependencies>
<dependency>
@@ -41,32 +41,32 @@
<dependency>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>siembol-common</artifactId>
<version>2.5.11-SNAPSHOT</version>
<version>2.5.12-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>config-editor-core</artifactId>
<version>2.5.11-SNAPSHOT</version>
<version>2.5.12-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>alerting-core</artifactId>
<version>2.5.11-SNAPSHOT</version>
<version>2.5.12-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>parsing-app</artifactId>
<version>2.5.11-SNAPSHOT</version>
<version>2.5.12-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>enriching-core</artifactId>
<version>2.5.11-SNAPSHOT</version>
<version>2.5.12-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>responding-core</artifactId>
<version>2.5.11-SNAPSHOT</version>
<version>2.5.12-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>junit</groupId>

View File

@@ -2,7 +2,7 @@ package uk.co.gresearch.siembol.configeditor.service.common;
import uk.co.gresearch.siembol.configeditor.common.ConfigInfoProvider;
import uk.co.gresearch.siembol.configeditor.common.ConfigSchemaService;
import uk.co.gresearch.siembol.configeditor.common.ServiceType;
import uk.co.gresearch.siembol.common.constants.ServiceType;
import uk.co.gresearch.siembol.configeditor.configinfo.JsonRuleConfigInfoProvider;
import uk.co.gresearch.siembol.configeditor.model.ConfigEditorUiLayout;
import uk.co.gresearch.siembol.configeditor.service.alerts.AlertingRuleSchemaService;

View File

@@ -9,7 +9,7 @@
<parent>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>config-editor</artifactId>
<version>2.5.11-SNAPSHOT</version>
<version>2.5.12-SNAPSHOT</version>
</parent>
<dependencies>
<dependency>
@@ -20,17 +20,17 @@
<dependency>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>siembol-common</artifactId>
<version>2.5.11-SNAPSHOT</version>
<version>2.5.12-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>config-editor-core</artifactId>
<version>2.5.11-SNAPSHOT</version>
<version>2.5.12-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>parsing-app</artifactId>
<version>2.5.11-SNAPSHOT</version>
<version>2.5.12-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>

View File

@@ -1,7 +1,7 @@
package uk.co.gresearch.siembol.configeditor.sync.common;
import uk.co.gresearch.siembol.common.zookeeper.ZooKeeperConnector;
import uk.co.gresearch.siembol.configeditor.common.ServiceType;
import uk.co.gresearch.siembol.common.constants.ServiceType;
import java.util.Optional;

View File

@@ -4,7 +4,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.actuate.health.Health;
import uk.co.gresearch.siembol.common.model.StormTopologyDto;
import uk.co.gresearch.siembol.configeditor.common.ServiceType;
import uk.co.gresearch.siembol.common.constants.ServiceType;
import uk.co.gresearch.siembol.configeditor.sync.actions.*;
import uk.co.gresearch.siembol.configeditor.sync.common.ConfigServiceHelper;
import uk.co.gresearch.siembol.configeditor.model.*;

View File

@@ -11,7 +11,7 @@
<parent>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>siembol</artifactId>
<version>2.5.11-SNAPSHOT</version>
<version>2.5.12-SNAPSHOT</version>
</parent>
<modules>
<module>config-editor-core</module>

View File

@@ -0,0 +1,30 @@
management.endpoint.health.show-details=always
management.endpoints.web.exposure.include=info,prometheus,health
management.endpoints.web.path-mapping.prometheus=metrics
management.endpoints.web.base-path=
server.port=8083
info.env.java.vendor=${java.specification.vendor}
info.env.java.vm-name=${java.vm.name}
info.env.java.runtime-version=${java.runtime.version}
info.env.java.app.description=Siembol monitoring
logging.level.org.apache.http=INFO
logging.level.org.apache.http.wire=INFO
logging.level.org.springframework.web.filter.CommonsRequestLoggingFilter=INFO
logging.level.org.springframework.web=INFO
logging.level.org.springframework.web.servlet.mvc.method.annotation.RequestResponseBodyMethodProcessor=INFO
siembol-monitoring.heartbeat-properties.heartbeat-interval-seconds=60
siembol-monitoring.heartbeat-properties.heartbeat-consumer.enabled-services=parsingapp,enrichment,response
siembol-monitoring.heartbeat-properties.heartbeat-consumer.input-topic=siembol.response.heartbeat
siembol-monitoring.heartbeat-properties.heartbeat-consumer.kafka-properties.[bootstrap.servers]=kafka-0.kafka-headless.siembol.svc.cluster.local:9092
siembol-monitoring.heartbeat-properties.heartbeat-consumer.kafka-properties.[application.id]=siembol.heartbeat.reader
siembol-monitoring.heartbeat-properties.heartbeat-consumer.kafka-properties.[auto.offset.reset]=earliest
siembol-monitoring.heartbeat-properties.heartbeat-consumer.kafka-properties.[security.protocol]=PLAINTEXT
siembol-monitoring.heartbeat-properties.heartbeat-producers.producer1.output-topic=siembol.heartbeat
siembol-monitoring.heartbeat-properties.heartbeat-producers.producer1.kafka-properties.[bootstrap.servers]=kafka-0.kafka-headless.siembol.svc.cluster.local:9092
siembol-monitoring.heartbeat-properties.heartbeat-producers.producer1.kafka-properties.[security.protocol]=PLAINTEXT

View File

@@ -0,0 +1,139 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>siembol-monitoring</artifactId>
<name>siembol-monitoring</name>
<packaging>jar</packaging>
<parent>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>siembol</artifactId>
<version>2.5.12-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring_boot_version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson_version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
<version>${spring_boot_version}</version>
</dependency>
<dependency>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>siembol-common</artifactId>
<version>2.5.12-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>${reactor_core_version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
<version>${spring_boot_version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>${spring_boot_version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-security</artifactId>
<version>${spring_boot_version}</version>
</dependency>
<dependency>
<groupId>net.bytebuddy</groupId>
<artifactId>byte-buddy</artifactId>
<version>${byte_buddy_version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
<version>${io_micrometer_version}</version>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>${mockito_version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-inline</artifactId>
<version>${mockito_version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit_version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka_version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>${kafka_version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams-test-utils</artifactId>
<version>${kafka_version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${spring_boot_version}</version>
<configuration>
<mainClass>uk.co.gresearch.siembol.deployment.monitoring.application.Application</mainClass>
</configuration>
<executions>
<execution>
<id>build-info</id>
<goals>
<goal>build-info</goal>
</goals>
</execution>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@@ -0,0 +1,13 @@
package uk.co.gresearch.siembol.deployment.monitoring.application;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication app = new SpringApplication(Application.class);
app.setRegisterShutdownHook(true);
app.run(args);
}
}

View File

@@ -0,0 +1,19 @@
package uk.co.gresearch.siembol.deployment.monitoring.application;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.NestedConfigurationProperty;
import uk.co.gresearch.siembol.deployment.monitoring.heartbeat.HeartbeatProperties;
@ConfigurationProperties(prefix = "siembol-monitoring")
public class ServiceConfigurationProperties {
@NestedConfigurationProperty
private HeartbeatProperties heartbeatProperties;
public HeartbeatProperties getHeartbeatProperties() {
return heartbeatProperties;
}
public void setHeartbeatProperties(HeartbeatProperties heartbeatProperties) {
this.heartbeatProperties = heartbeatProperties;
}
}

View File

@@ -0,0 +1,49 @@
package uk.co.gresearch.siembol.deployment.monitoring.application;
import io.micrometer.core.instrument.MeterRegistry;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import uk.co.gresearch.siembol.common.metrics.SiembolMetricsRegistrar;
import uk.co.gresearch.siembol.common.metrics.spring.SpringMetricsRegistrar;
import uk.co.gresearch.siembol.deployment.monitoring.heartbeat.HeartbeatConsumer;
import uk.co.gresearch.siembol.deployment.monitoring.heartbeat.HeartbeatProducerScheduler;
@Configuration
@EnableConfigurationProperties(ServiceConfigurationProperties.class)
public class SiembolMonitoringConfiguration implements DisposableBean {
@Autowired
private ServiceConfigurationProperties properties;
@Autowired
private MeterRegistry springMeterRegistrar;
private HeartbeatConsumer heartbeatConsumer;
@Bean("metricsRegistrar")
SiembolMetricsRegistrar metricsRegistrar() {
return new SpringMetricsRegistrar(springMeterRegistrar);
}
@Bean("heartbeatProducerScheduler")
@DependsOn("metricsRegistrar")
HeartbeatProducerScheduler heartbeatProducerScheduler(@Autowired SiembolMetricsRegistrar metricsRegistrar) {
return new HeartbeatProducerScheduler(properties.getHeartbeatProperties(), metricsRegistrar);
}
@Bean("heartbeatConsumer")
@DependsOn("metricsRegistrar")
HeartbeatConsumer heartbeatConsumer(@Autowired SiembolMetricsRegistrar metricsRegistrar) {
heartbeatConsumer = new HeartbeatConsumer(properties.getHeartbeatProperties().getHeartbeatConsumer(), metricsRegistrar);
return heartbeatConsumer;
}
@Override
public void destroy() {
if (heartbeatConsumer == null) {
return;
}
heartbeatConsumer.close();
}
}

View File

@@ -0,0 +1,35 @@
package uk.co.gresearch.siembol.deployment.monitoring.application;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.ReactiveHealthIndicator;
import org.springframework.boot.actuate.health.SimpleStatusAggregator;
import org.springframework.boot.actuate.health.Status;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;
import uk.co.gresearch.siembol.deployment.monitoring.heartbeat.HeartbeatConsumer;
import uk.co.gresearch.siembol.deployment.monitoring.heartbeat.HeartbeatProducerScheduler;
@Component
public class SiembolMonitoringHealthIndicator implements ReactiveHealthIndicator {
private final SimpleStatusAggregator statusAggregator = new SimpleStatusAggregator();
@Autowired
private HeartbeatProducerScheduler heartbeatProducerScheduler;
@Autowired
private HeartbeatConsumer heartbeatConsumer;
@Override
public Mono<Health> health() {
return checkDownstreamServiceHealth().onErrorResume(
ex -> Mono.just(new Health.Builder().down(ex).build()));
}
private Mono<Health> checkDownstreamServiceHealth() {
Status status = statusAggregator.getAggregateStatus(
heartbeatProducerScheduler.checkHealth().getStatus(),
heartbeatConsumer.checkHealth().getStatus());
return Mono.just(new Health.Builder().status(status).build());
}
}

View File

@@ -0,0 +1,10 @@
package uk.co.gresearch.siembol.deployment.monitoring.application;
import org.springframework.context.annotation.Configuration;
import org.springframework.security.config.annotation.web.configuration.EnableWebSecurity;
import uk.co.gresearch.siembol.common.authorisation.SiembolUnauthenticatedSecurityAdapter;
@Configuration
@EnableWebSecurity
public class SiembolMonitoringUnauthenticatedSecurityAdapter extends SiembolUnauthenticatedSecurityAdapter {
}

View File

@@ -0,0 +1,124 @@
package uk.co.gresearch.siembol.deployment.monitoring.heartbeat;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.actuate.health.Health;
import uk.co.gresearch.siembol.common.constants.ServiceType;
import uk.co.gresearch.siembol.common.metrics.SiembolCounter;
import uk.co.gresearch.siembol.common.metrics.SiembolGauge;
import uk.co.gresearch.siembol.common.metrics.SiembolMetrics;
import uk.co.gresearch.siembol.common.metrics.SiembolMetricsRegistrar;
import uk.co.gresearch.siembol.common.utils.KafkaStreamsFactoryImpl;
import uk.co.gresearch.siembol.common.utils.KafkaStreamsFactory;
import java.lang.invoke.MethodHandles;
import java.time.Instant;
import java.util.*;
public class HeartbeatConsumer {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final String INIT_START = "Kafka stream service initialisation started";
private static final String INIT_COMPLETED = "Kafka stream service initialisation completed";
private static final ObjectReader MESSAGE_READER = new ObjectMapper()
.readerFor(HeartbeatProcessedMessage.class);
private final KafkaStreams streams;
private final SiembolGauge totalLatencyGauge;
private final SiembolCounter consumerErrorCount;
private final SiembolCounter consumerMessageRead;
private final List<Pair<ServiceType, SiembolGauge>> servicesMetrics = new ArrayList<>();
public HeartbeatConsumer(HeartbeatConsumerProperties properties, SiembolMetricsRegistrar metricsRegistrar) {
this(properties, metricsRegistrar, new KafkaStreamsFactoryImpl());
}
HeartbeatConsumer(HeartbeatConsumerProperties properties, SiembolMetricsRegistrar metricsRegistrar,
KafkaStreamsFactory streamsFactory) {
consumerErrorCount = metricsRegistrar.registerCounter(SiembolMetrics.HEARTBEAT_CONSUMER_ERROR.name());
consumerMessageRead = metricsRegistrar.registerCounter(SiembolMetrics.HEARTBEAT_MESSAGES_READ.name());
if (properties.getEnabledServices() == null) {
throw new IllegalArgumentException("Missing enabled services for heartbeat consumer.");
}
if (properties.getEnabledServices().contains(ServiceType.PARSING_APP)) {
servicesMetrics.add(Pair.of(ServiceType.PARSING_APP,
metricsRegistrar.registerGauge(SiembolMetrics.HEARTBEAT_LATENCY_PARSING_MS.name())));
}
if (properties.getEnabledServices().contains(ServiceType.ENRICHMENT)) {
servicesMetrics.add(Pair.of(ServiceType.ENRICHMENT,
metricsRegistrar.registerGauge(SiembolMetrics.HEARTBEAT_LATENCY_ENRICHING_MS.name())));
}
if (properties.getEnabledServices().contains(ServiceType.RESPONSE)) {
servicesMetrics.add(Pair.of(ServiceType.RESPONSE,
metricsRegistrar.registerGauge(SiembolMetrics.HEARTBEAT_LATENCY_RESPONDING_MS.name())));
}
totalLatencyGauge = metricsRegistrar.registerGauge(SiembolMetrics.HEARTBEAT_LATENCY_TOTAL_MS.name());
streams = createStreams(streamsFactory, properties);
streams.start();
}
private KafkaStreams createStreams(KafkaStreamsFactory streamsFactory, HeartbeatConsumerProperties properties) {
LOG.info(INIT_START);
StreamsBuilder builder = new StreamsBuilder();
builder.<String, String>stream(properties.getInputTopic())
.foreach((key, value) -> this.processMessage(value));
Properties configuration = new Properties();
configuration.putAll(properties.getKafkaProperties());
configuration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
configuration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
Topology topology = builder.build(configuration);
KafkaStreams ret = streamsFactory.createKafkaStreams(topology, configuration);
LOG.info(INIT_COMPLETED);
return ret;
}
private void processMessage(String value) {
try {
var currentTimestamp = Instant.now().toEpochMilli();
HeartbeatProcessedMessage message = MESSAGE_READER.readValue(value);
var lastTimestamp = message.getTimestamp().longValue();
for(var serviceMetric: servicesMetrics) {
switch (serviceMetric.getKey()) {
case PARSING_APP:
serviceMetric.getValue().setValue(message.getParsingTime().longValue() - lastTimestamp);
lastTimestamp = message.getParsingTime().longValue();
break;
case ENRICHMENT:
serviceMetric.getValue().setValue(message.getEnrichingTime().longValue() - lastTimestamp);
lastTimestamp = message.getEnrichingTime().longValue();
break;
case RESPONSE:
serviceMetric.getValue().setValue(message.getResponseTime().longValue() - lastTimestamp);
lastTimestamp = message.getResponseTime().longValue();
}
}
totalLatencyGauge.setValue(currentTimestamp - message.getTimestamp().longValue());
consumerMessageRead.increment();
} catch (Exception e) {
LOG.error("Error reading heartbeat message from kafka: {}", e.toString());
consumerErrorCount.increment();
}
}
public Health checkHealth() {
return streams.state().isRunningOrRebalancing() || streams.state().equals(KafkaStreams.State.CREATED)
? Health.up().build() :
Health.down().build();
}
public void close() {
streams.close();
}
}

View File

@@ -0,0 +1,36 @@
package uk.co.gresearch.siembol.deployment.monitoring.heartbeat;
import java.util.List;
import java.util.Map;
import uk.co.gresearch.siembol.common.constants.ServiceType;
public class HeartbeatConsumerProperties {
private List<ServiceType> enabledServices;
private String inputTopic;
private Map<String, Object> kafkaProperties;
public List<ServiceType> getEnabledServices() {
return enabledServices;
}
public void setEnabledServices(List<ServiceType> enabledServices) {
this.enabledServices = enabledServices;
}
public String getInputTopic() {
return inputTopic;
}
public void setInputTopic(String inputTopic) {
this.inputTopic = inputTopic;
}
public Map<String, Object> getKafkaProperties() {
return kafkaProperties;
}
public void setKafkaProperties(Map<String, Object> kafkaProperties) {
this.kafkaProperties = kafkaProperties;
}
}

View File

@@ -0,0 +1,52 @@
package uk.co.gresearch.siembol.deployment.monitoring.heartbeat;
import com.fasterxml.jackson.annotation.JsonAnyGetter;
import com.fasterxml.jackson.annotation.JsonAnySetter;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.LinkedHashMap;
import java.util.Map;
public class HeartbeatMessage {
@JsonProperty("event_time")
private String eventTime;
@JsonProperty("siembol_heartbeat")
private Boolean siembolHeartbeat = true;
@JsonProperty("producer_name")
private String producerName;
private Map<String, Object> message = new LinkedHashMap<>();
@JsonAnySetter
void setMessage(String key, Object value) {
message.put(key, value);
}
@JsonAnyGetter
public Map<String, Object> getMessage() {
return message;
}
public Boolean getSiembolHeartbeat() {
return siembolHeartbeat;
}
public void setSiembolHeartbeat(Boolean siembolHeartbeat) {
this.siembolHeartbeat = siembolHeartbeat;
}
public String getEventTime() {
return eventTime;
}
public void setEventTime(String eventTime) {
this.eventTime = eventTime;
}
public String getProducerName() {
return producerName;
}
public void setProducerName(String producerName) {
this.producerName = producerName;
}
}

View File

@@ -0,0 +1,50 @@
package uk.co.gresearch.siembol.deployment.monitoring.heartbeat;
import com.fasterxml.jackson.annotation.JsonProperty;
import uk.co.gresearch.siembol.common.constants.SiembolConstants;
public class HeartbeatProcessedMessage extends HeartbeatMessage {
@JsonProperty(SiembolConstants.TIMESTAMP)
private Number timestamp;
@JsonProperty(SiembolConstants.PARSING_TIME)
private Number parsingTime;
@JsonProperty(SiembolConstants.ENRICHING_TIME)
private Number enrichingTime;
@JsonProperty(SiembolConstants.RESPONSE_TIME)
private Number responseTime;
public Number getTimestamp() {
return timestamp;
}
public void setTimestamp(Number timestamp) {
this.timestamp = timestamp;
}
public Number getParsingTime() {
return parsingTime;
}
public void setParsingTime(Number parsingTime) {
this.parsingTime = parsingTime;
}
public Number getEnrichingTime() {
return enrichingTime;
}
public void setEnrichingTime(Number enrichingTime) {
this.enrichingTime = enrichingTime;
}
public Number getResponseTime() {
return responseTime;
}
public void setResponseTime(Number responseTime) {
this.responseTime = responseTime;
}
}

View File

@@ -0,0 +1,96 @@
package uk.co.gresearch.siembol.deployment.monitoring.heartbeat;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.actuate.health.Health;
import uk.co.gresearch.siembol.common.metrics.SiembolCounter;
import uk.co.gresearch.siembol.common.metrics.SiembolMetrics;
import uk.co.gresearch.siembol.common.metrics.SiembolMetricsRegistrar;
import java.io.Closeable;
import java.lang.invoke.MethodHandles;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
public class HeartbeatProducer implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final String MISSING_KAFKA_WRITER_PROPS_MSG = "Missing heartbeat kafka producer properties for %s";
private static final ObjectWriter OBJECT_WRITER = new ObjectMapper().writer();
private final HeartbeatMessage message = new HeartbeatMessage();
private final AtomicReference<Exception> exception = new AtomicReference<>();
private final String producerName;
private final Producer<String, String> producer;
private final SiembolCounter updateCounter;
private final SiembolCounter errorCounter;
private final String topicName;
public HeartbeatProducer(HeartbeatProducerProperties producerProperties,
String producerName,
Map<String, Object> heartbeatMessageProperties,
SiembolMetricsRegistrar metricsRegistrar) {
this(producerProperties, producerName, heartbeatMessageProperties, metricsRegistrar,
x -> new KafkaProducer<>(x.getKafkaProperties(), new StringSerializer(),
new StringSerializer()) );
}
HeartbeatProducer(HeartbeatProducerProperties producerProperties,
String producerName,
Map<String, Object> heartbeatMessageProperties,
SiembolMetricsRegistrar metricsRegistrar,
Function<HeartbeatProducerProperties, Producer<String, String>> factory) {
this.producerName = producerName;
this.initialiseMessage(heartbeatMessageProperties);
if (producerProperties == null) {
throw new IllegalArgumentException(String.format(MISSING_KAFKA_WRITER_PROPS_MSG,
producerName));
}
producer = factory.apply(producerProperties);
topicName = producerProperties.getOutputTopic();
updateCounter =
metricsRegistrar.registerCounter(SiembolMetrics.HEARTBEAT_MESSAGES_SENT.getMetricName(producerName));
errorCounter =
metricsRegistrar.registerCounter(SiembolMetrics.HEARTBEAT_PRODUCER_ERROR.getMetricName(producerName));
}
private void initialiseMessage(Map<String, Object> messageProperties) {
if (messageProperties != null) {
for (Map.Entry<String, Object> messageEntry : messageProperties.entrySet()) {
this.message.setMessage(messageEntry.getKey(), messageEntry.getValue());
}
}
}
public void sendHeartbeat() {
var now = Instant.now().truncatedTo(ChronoUnit.MILLIS);
this.message.setEventTime(now.toString()); // ISO format
this.message.setProducerName(producerName);
try {
producer.send(new ProducerRecord<>(topicName, OBJECT_WRITER.writeValueAsString(this.message))).get();
updateCounter.increment();
LOG.info("Sent heartbeat with producer {} to topic {}", producerName, topicName);
exception.set(null);
} catch (Exception e) {
LOG.error("Error sending message to kafka with producer {}: {}", producerName, e.toString());
errorCounter.increment();
exception.set(e);
}
}
public void close() {
producer.close();
}
public Health checkHealth() {
return exception.get() == null ? Health.down().withException(exception.get()).build(): Health.up().build();
}
}

View File

@@ -0,0 +1,27 @@
package uk.co.gresearch.siembol.deployment.monitoring.heartbeat;
import java.util.Map;
import java.util.Properties;
public class HeartbeatProducerProperties {
private String outputTopic;
private Map<String, Object> kafkaProperties;
public String getOutputTopic() {
return outputTopic;
}
public void setOutputTopic(String outputTopic) {
this.outputTopic = outputTopic;
}
public Properties getKafkaProperties() {
var props = new Properties();
props.putAll(kafkaProperties);
return props;
}
public void setKafkaProperties(Map<String, Object> kafkaProperties) {
this.kafkaProperties = kafkaProperties;
}
}

View File

@@ -0,0 +1,70 @@
package uk.co.gresearch.siembol.deployment.monitoring.heartbeat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.Status;
import uk.co.gresearch.siembol.common.metrics.SiembolMetricsRegistrar;
import java.io.Closeable;
import java.lang.invoke.MethodHandles;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
public class HeartbeatProducerScheduler implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final Map<String, HeartbeatProducer> producerMap = new HashMap<>();
private final int errorThreshold;
public HeartbeatProducerScheduler(HeartbeatProperties properties,
SiembolMetricsRegistrar metricsRegistrar) {
this(properties, Executors.newSingleThreadScheduledExecutor(),
(HeartbeatProducerProperties x, String y) ->
new HeartbeatProducer(x, y, properties.getMessage(), metricsRegistrar));
}
HeartbeatProducerScheduler(HeartbeatProperties properties,
ScheduledExecutorService executorService,
BiFunction<HeartbeatProducerProperties, String, HeartbeatProducer> factory) {
this.errorThreshold = properties.getHeartbeatProducers().size();
for (Map.Entry<String, HeartbeatProducerProperties> producerProperties : properties.getHeartbeatProducers().entrySet()) {
var producerName = producerProperties.getKey();
var producer = createHeartbeatProducer(producerProperties.getValue(), producerName, factory);
producerMap.put(producerName, producer);
executorService.scheduleAtFixedRate(
producer::sendHeartbeat,
properties.getHeartbeatIntervalSeconds(),
properties.getHeartbeatIntervalSeconds(),
TimeUnit.SECONDS);
}
}
public static HeartbeatProducer createHeartbeatProducer(HeartbeatProducerProperties properties,
String producerName,
BiFunction<HeartbeatProducerProperties, String, HeartbeatProducer> factory) {
LOG.info("Initialising producer {}", producerName);
var producer = factory.apply(properties, producerName);
LOG.info("Finished initialising heartbeat producer {}", producerName);
return producer;
}
public Health checkHealth() {
var countErrors = 0;
for (var producer: this.producerMap.values()) {
if (producer.checkHealth().getStatus() == Status.DOWN) {
countErrors ++;
}
}
return countErrors >= errorThreshold? Health.down().build(): Health.up().build();
}
public void close() {
for (var producer: this.producerMap.values()) {
producer.close();
}
}
}

View File

@@ -0,0 +1,48 @@
package uk.co.gresearch.siembol.deployment.monitoring.heartbeat;
import org.springframework.boot.context.properties.ConfigurationProperties;
import java.util.Map;
@ConfigurationProperties(prefix = "heartbeat-properties")
public class HeartbeatProperties {
private int heartbeatIntervalSeconds = 60;
private Map<String, HeartbeatProducerProperties> heartbeatProducers;
private HeartbeatConsumerProperties heartbeatConsumer;
private Map<String, Object> message;
public int getHeartbeatIntervalSeconds() {
return heartbeatIntervalSeconds;
}
public void setHeartbeatIntervalSeconds(int heartbeatIntervalSeconds) {
this.heartbeatIntervalSeconds = heartbeatIntervalSeconds;
}
public Map<String, HeartbeatProducerProperties> getHeartbeatProducers() {
return heartbeatProducers;
}
public void setHeartbeatProducers(Map<String, HeartbeatProducerProperties> heartbeatProducers) {
this.heartbeatProducers = heartbeatProducers;
}
public HeartbeatConsumerProperties getHeartbeatConsumer() {
return heartbeatConsumer;
}
public void setHeartbeatConsumer(HeartbeatConsumerProperties heartbeatConsumer) {
this.heartbeatConsumer = heartbeatConsumer;
}
public Map<String, Object> getMessage() {
return message;
}
public void setMessage(Map<String, Object> message) {
this.message = message;
}
}

View File

@@ -0,0 +1,123 @@
package uk.co.gresearch.siembol.deployment.monitoring.heartbeat;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import uk.co.gresearch.siembol.common.constants.ServiceType;
import uk.co.gresearch.siembol.common.metrics.SiembolMetrics;
import uk.co.gresearch.siembol.common.metrics.test.SiembolMetricsTestRegistrar;
import uk.co.gresearch.siembol.common.testing.TestingDriverKafkaStreamsFactory;
import java.time.Instant;
import java.util.Arrays;
import java.util.HashMap;
public class HeartbeatConsumerTest {
private final String heartbeatMessageStr = """
{
"timestamp": 1654095527000,
"siembol_parsing_ts": 1654095527001,
"siembol_enriching_ts": 1654095527020,
"siembol_response_ts": 1654095527031,
"siembol_heartbeat": true,
"source_type": "heartbeat",
"producer_name": "p1",
"event_time": "2022-06-01T14:58:47.000Z"
}
""";
private final String heartbeatMessageWithoutEnrichmentStr = """
{
"timestamp": 1654095527000,
"siembol_parsing_ts": 1654095527001,
"siembol_response_ts": 1654095527031,
"siembol_heartbeat": true,
"source_type": "heartbeat",
"producer_name": "p1",
"event_time": "2022-06-01T14:58:47.000Z"
}
""";
private final String inputTopic = "input";
private SiembolMetricsTestRegistrar metricsTestRegistrar;
private KafkaStreams kafkaStreams;
private TestingDriverKafkaStreamsFactory streamsFactory;
private TopologyTestDriver testDriver;
private TestInputTopic<String, String> testInputTopic;
private MockedStatic<Instant> mockInstant;
private HeartbeatConsumerProperties properties;
@Before
public void setUp() {
metricsTestRegistrar = new SiembolMetricsTestRegistrar();
kafkaStreams = Mockito.mock(KafkaStreams.class);
streamsFactory = new TestingDriverKafkaStreamsFactory(kafkaStreams);
var instant = Instant.parse("2022-06-01T14:58:47.823Z");
mockInstant = Mockito.mockStatic(Instant.class);
mockInstant.when(Instant::now).thenReturn(instant);
properties = new HeartbeatConsumerProperties();
properties.setInputTopic(inputTopic);
properties.setKafkaProperties(new HashMap<>());
properties.setEnabledServices(Arrays.asList(ServiceType.PARSING_APP, ServiceType.ENRICHMENT,
ServiceType.RESPONSE));
}
@After
public void tearDown() {
streamsFactory.close();
mockInstant.close();
}
@Test
public void processingMessageOk() {
new HeartbeatConsumer(properties, metricsTestRegistrar, streamsFactory);
testDriver = streamsFactory.getTestDriver();
testInputTopic = testDriver.createInputTopic(inputTopic, Serdes.String().serializer(),
Serdes.String().serializer());
testInputTopic.pipeInput(heartbeatMessageStr);
Assert.assertEquals(1,
metricsTestRegistrar.getGaugeValue(SiembolMetrics.HEARTBEAT_LATENCY_PARSING_MS.name()), 0);
Assert.assertEquals(19,
metricsTestRegistrar.getGaugeValue(SiembolMetrics.HEARTBEAT_LATENCY_ENRICHING_MS.name()), 0);
Assert.assertEquals(11,
metricsTestRegistrar.getGaugeValue(SiembolMetrics.HEARTBEAT_LATENCY_RESPONDING_MS.name()), 0);
Assert.assertEquals(823,
metricsTestRegistrar.getGaugeValue(SiembolMetrics.HEARTBEAT_LATENCY_TOTAL_MS.name()), 0);
Assert.assertEquals(1, metricsTestRegistrar.getCounterValue(SiembolMetrics.HEARTBEAT_MESSAGES_READ.name()));
}
@Test
public void processingError() {
new HeartbeatConsumer(properties, metricsTestRegistrar, streamsFactory);
testDriver = streamsFactory.getTestDriver();
testInputTopic = testDriver.createInputTopic(inputTopic, Serdes.String().serializer(),
Serdes.String().serializer());
testInputTopic.pipeInput("test");
Assert.assertEquals(1, metricsTestRegistrar.getCounterValue(SiembolMetrics.HEARTBEAT_CONSUMER_ERROR.name()));
}
@Test
public void withoutEnrichmentService() {
properties.setEnabledServices(Arrays.asList(ServiceType.PARSING_APP,
ServiceType.RESPONSE));
new HeartbeatConsumer(properties, metricsTestRegistrar, streamsFactory);
testDriver = streamsFactory.getTestDriver();
testInputTopic = testDriver.createInputTopic(inputTopic, Serdes.String().serializer(),
Serdes.String().serializer());
testInputTopic.pipeInput(heartbeatMessageWithoutEnrichmentStr);
Assert.assertEquals(1,
metricsTestRegistrar.getGaugeValue(SiembolMetrics.HEARTBEAT_LATENCY_PARSING_MS.name()), 0);
Assert.assertEquals(30,
metricsTestRegistrar.getGaugeValue(SiembolMetrics.HEARTBEAT_LATENCY_RESPONDING_MS.name()), 0);
Assert.assertEquals(823,
metricsTestRegistrar.getGaugeValue(SiembolMetrics.HEARTBEAT_LATENCY_TOTAL_MS.name()), 0);
}
}

View File

@@ -0,0 +1,89 @@
package uk.co.gresearch.siembol.deployment.monitoring.heartbeat;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.springframework.boot.actuate.health.Health;
import java.util.*;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.*;
public class HeartbeatProducerSchedulerTest {
private ScheduledExecutorService mockScheduledService;
private final Map<String, HeartbeatProducerProperties> producerPropertiesMap= new HashMap<>();
private final Map<String, Object> heartbeatMessageProperties = new HashMap<>();
private final int heartbeatIntervalSeconds = 10;
private HeartbeatProducer heartbeatProducer;
private final HeartbeatProperties properties = new HeartbeatProperties();
private BiFunction factory;
@Before
public void setUp() {
heartbeatProducer = Mockito.mock(HeartbeatProducer.class);
factory = Mockito.mock(BiFunction.class);
doNothing().when(heartbeatProducer).sendHeartbeat();
when(factory.apply(any(HeartbeatProducerProperties.class), anyString())).thenReturn(heartbeatProducer);
mockScheduledService = mock(ScheduledExecutorService.class);
given(mockScheduledService.scheduleAtFixedRate(any(Runnable.class), anyLong(), anyLong(),
any(TimeUnit.class))).willReturn(mock(ScheduledFuture.class));
var heartbeatProducerProperties1 = new HeartbeatProducerProperties();
var heartbeatProducerProperties2 = new HeartbeatProducerProperties();
producerPropertiesMap.put("p1", heartbeatProducerProperties1);
producerPropertiesMap.put("p2", heartbeatProducerProperties2);
properties.setHeartbeatProducers(producerPropertiesMap);
properties.setMessage(heartbeatMessageProperties);
properties.setHeartbeatIntervalSeconds(heartbeatIntervalSeconds);
}
@Test
public void ok() {
new HeartbeatProducerScheduler(properties, mockScheduledService, factory);
ArgumentCaptor<Runnable> argumentCaptor = ArgumentCaptor.forClass(Runnable.class);
ArgumentCaptor<HeartbeatProducerProperties> producerPropertiesCaptor =
ArgumentCaptor.forClass(HeartbeatProducerProperties.class);
ArgumentCaptor<String> producerNameCaptor = ArgumentCaptor.forClass(String.class);
verify(mockScheduledService, times(2)).scheduleAtFixedRate(
argumentCaptor.capture(),
eq(Long.valueOf(heartbeatIntervalSeconds)),
eq(Long.valueOf(heartbeatIntervalSeconds)),
any(TimeUnit.class));
argumentCaptor.getAllValues().get(0).run();
argumentCaptor.getAllValues().get(1).run();
verify(heartbeatProducer, times(2)).sendHeartbeat();
verify(factory, times(2)).apply(producerPropertiesCaptor.capture(), producerNameCaptor.capture());
List<HeartbeatProducerProperties> producerPropertiesList = producerPropertiesCaptor.getAllValues();
List<String> producerNameList = producerNameCaptor.getAllValues();
assertEquals(Arrays.asList("p1", "p2"), producerNameList);
assertEquals(new ArrayList(producerPropertiesMap.values()), producerPropertiesList);
}
@Test
public void checkHealthUp() {
var heartbeatProducerScheduler = new HeartbeatProducerScheduler(properties, mockScheduledService,
(x, y) -> heartbeatProducer);
when(heartbeatProducer.checkHealth()).thenReturn(Health.up().build(), Health.down().build());
assertEquals(heartbeatProducerScheduler.checkHealth(), Health.up().build());
verify(heartbeatProducer, times(2)).checkHealth();
}
@Test
public void checkHealthDown() {
var heartbeatProducerScheduler = new HeartbeatProducerScheduler(properties, mockScheduledService, (x, y) -> heartbeatProducer);
when(heartbeatProducer.checkHealth()).thenReturn(Health.down().build());
assertEquals(heartbeatProducerScheduler.checkHealth(), Health.down().build());
verify(heartbeatProducer, times(2)).checkHealth();
}
}

View File

@@ -0,0 +1,71 @@
package uk.co.gresearch.siembol.deployment.monitoring.heartbeat;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import uk.co.gresearch.siembol.common.metrics.SiembolMetrics;
import uk.co.gresearch.siembol.common.metrics.test.SiembolMetricsTestRegistrar;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import org.mockito.MockedStatic;
public class HeartbeatProducerTest {
private SiembolMetricsTestRegistrar metricsTestRegistrar;
private final Map<String, Object> heartbeatMessageProperties = new HashMap<>();
private MockedStatic<Instant> mockInstant;
@Before
public void setUp() {
metricsTestRegistrar = new SiembolMetricsTestRegistrar();
var instant = Instant.parse("2022-05-31T09:10:11.50Z");
mockInstant = Mockito.mockStatic(Instant.class);
mockInstant.when(Instant::now).thenReturn(instant);
heartbeatMessageProperties.put("key", "value");
}
@After
public void tearDown() {
mockInstant.close();
}
@Test
public void sendHeartbeatOk() {
var producerProperties = new HeartbeatProducerProperties();
producerProperties.setOutputTopic("heartbeat");
var producer = new MockProducer<>(true, new StringSerializer(), new StringSerializer());
var heartbeatProducer = new HeartbeatProducer(producerProperties, "p", heartbeatMessageProperties,
metricsTestRegistrar, x -> producer);
heartbeatProducer.sendHeartbeat();
Assert.assertEquals(producer.history().size(), 1);
Assert.assertEquals(producer.history().get(0).topic(), "heartbeat");
Assert.assertEquals(producer.history().get(0).value(), "{\"event_time\":\"2022-05-31T09:10:11.500Z\"," +
"\"siembol_heartbeat\":true," +
"\"producer_name\":\"p\",\"key\":\"value\"}");
heartbeatProducer.sendHeartbeat();
Assert.assertEquals(2,
metricsTestRegistrar.getCounterValue(SiembolMetrics.HEARTBEAT_MESSAGES_SENT.getMetricName("p")));
}
@Test
public void sendHeartbeatError() {
var producer = new MockProducer<>(true, new StringSerializer(), new StringSerializer());
var producerProperties = new HeartbeatProducerProperties();
var heartbeatProducer = new HeartbeatProducer(
producerProperties,
"p",
heartbeatMessageProperties,
metricsTestRegistrar,
x -> producer);
heartbeatProducer.sendHeartbeat();
Assert.assertEquals(1,
metricsTestRegistrar.getCounterValue(SiembolMetrics.HEARTBEAT_PRODUCER_ERROR.getMetricName("p")));
}
}

View File

@@ -9,7 +9,7 @@
<parent>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>siembol</artifactId>
<version>2.5.11-SNAPSHOT</version>
<version>2.5.12-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<dependencyManagement>
@@ -43,7 +43,7 @@
<dependency>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>siembol-common</artifactId>
<version>2.5.11-SNAPSHOT</version>
<version>2.5.12-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>

View File

@@ -11,7 +11,7 @@
<parent>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>enriching</artifactId>
<version>2.5.11-SNAPSHOT</version>
<version>2.5.12-SNAPSHOT</version>
</parent>
<dependencies>
<dependency>
@@ -35,12 +35,12 @@
<dependency>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>siembol-common</artifactId>
<version>2.5.11-SNAPSHOT</version>
<version>2.5.12-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>alerting-core</artifactId>
<version>2.5.11-SNAPSHOT</version>
<version>2.5.12-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>junit</groupId>

View File

@@ -9,7 +9,7 @@
<parent>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>enriching</artifactId>
<version>2.5.11-SNAPSHOT</version>
<version>2.5.12-SNAPSHOT</version>
</parent>
<dependencies>
<dependency>
@@ -75,7 +75,7 @@
<dependency>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>enriching-core</artifactId>
<version>2.5.11-SNAPSHOT</version>
<version>2.5.12-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>

View File

@@ -11,7 +11,7 @@
<parent>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>siembol</artifactId>
<version>2.5.11-SNAPSHOT</version>
<version>2.5.12-SNAPSHOT</version>
</parent>
<modules>
<module>enriching-core</module>

View File

@@ -11,7 +11,7 @@
<parent>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>parsing</artifactId>
<version>2.5.11-SNAPSHOT</version>
<version>2.5.12-SNAPSHOT</version>
</parent>
<dependencies>
<dependency>
@@ -39,12 +39,12 @@
<dependency>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>siembol-common</artifactId>
<version>2.5.11-SNAPSHOT</version>
<version>2.5.12-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>parsing-core</artifactId>
<version>2.5.11-SNAPSHOT</version>
<version>2.5.12-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>junit</groupId>

View File

@@ -11,7 +11,7 @@
<parent>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>parsing</artifactId>
<version>2.5.11-SNAPSHOT</version>
<version>2.5.12-SNAPSHOT</version>
</parent>
<dependencies>
<dependency>
@@ -50,7 +50,7 @@
<dependency>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>siembol-common</artifactId>
<version>2.5.11-SNAPSHOT</version>
<version>2.5.12-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>joda-time</groupId>

View File

@@ -9,7 +9,7 @@
<parent>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>parsing</artifactId>
<version>2.5.11-SNAPSHOT</version>
<version>2.5.12-SNAPSHOT</version>
</parent>
<dependencies>
<dependency>
@@ -75,7 +75,7 @@
<dependency>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>parsing-app</artifactId>
<version>2.5.11-SNAPSHOT</version>
<version>2.5.12-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>

View File

@@ -11,7 +11,7 @@
<parent>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>siembol</artifactId>
<version>2.5.11-SNAPSHOT</version>
<version>2.5.12-SNAPSHOT</version>
</parent>
<modules>
<module>parsing-core</module>

View File

@@ -6,7 +6,7 @@
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>siembol</artifactId>
<name>siembol</name>
<version>2.5.11-SNAPSHOT</version>
<version>2.5.12-SNAPSHOT</version>
<description>A scalable, advanced security analytics framework based on open-source big data technologies.</description>
<inceptionYear>2019</inceptionYear>
<url>https://siembol.io/</url>
@@ -78,6 +78,7 @@
<module>responding</module>
<module>config-editor</module>
<module>deployment/storm-topology-manager</module>
<module>deployment/siembol-monitoring</module>
</modules>
<packaging>pom</packaging>
<distributionManagement>

View File

@@ -11,7 +11,7 @@
<parent>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>siembol</artifactId>
<version>2.5.11-SNAPSHOT</version>
<version>2.5.12-SNAPSHOT</version>
</parent>
<modules>
<module>responding-core</module>

View File

@@ -11,7 +11,7 @@
<parent>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>responding</artifactId>
<version>2.5.11-SNAPSHOT</version>
<version>2.5.12-SNAPSHOT</version>
</parent>
<dependencies>
<dependency>
@@ -35,12 +35,12 @@
<dependency>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>siembol-common</artifactId>
<version>2.5.11-SNAPSHOT</version>
<version>2.5.12-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>alerting-core</artifactId>
<version>2.5.11-SNAPSHOT</version>
<version>2.5.12-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.jayway.jsonpath</groupId>

View File

@@ -9,7 +9,7 @@
<parent>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>responding</artifactId>
<version>2.5.11-SNAPSHOT</version>
<version>2.5.12-SNAPSHOT</version>
</parent>
<dependencyManagement>
<dependencies>
@@ -51,7 +51,7 @@
<dependency>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>siembol-common</artifactId>
<version>2.5.11-SNAPSHOT</version>
<version>2.5.12-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
@@ -62,7 +62,7 @@
<dependency>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>responding-core</artifactId>
<version>2.5.11-SNAPSHOT</version>
<version>2.5.12-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>

View File

@@ -9,6 +9,8 @@ import reactor.core.publisher.Mono;
import uk.co.gresearch.siembol.common.constants.SiembolMessageFields;
import uk.co.gresearch.siembol.common.error.ErrorMessage;
import uk.co.gresearch.siembol.common.error.ErrorType;
import uk.co.gresearch.siembol.common.utils.KafkaStreamsFactory;
import uk.co.gresearch.siembol.common.utils.KafkaStreamsFactoryImpl;
import uk.co.gresearch.siembol.response.stream.rest.application.ResponseConfigurationProperties;
import uk.co.gresearch.siembol.response.common.RespondingResult;
import uk.co.gresearch.siembol.response.common.RespondingResultAttributes;

View File

@@ -15,6 +15,7 @@ import org.springframework.boot.actuate.health.Status;
import reactor.core.publisher.Mono;
import uk.co.gresearch.siembol.common.error.ErrorMessage;
import uk.co.gresearch.siembol.common.error.ErrorType;
import uk.co.gresearch.siembol.common.testing.TestingDriverKafkaStreamsFactory;
import uk.co.gresearch.siembol.response.stream.rest.application.ResponseConfigurationProperties;
import uk.co.gresearch.siembol.response.common.RespondingResult;
import uk.co.gresearch.siembol.response.common.RespondingResultAttributes;

View File

@@ -9,7 +9,7 @@
<parent>
<groupId>uk.co.gresearch.siembol</groupId>
<artifactId>siembol</artifactId>
<version>2.5.11-SNAPSHOT</version>
<version>2.5.12-SNAPSHOT</version>
</parent>
<dependencies>
<dependency>
@@ -190,6 +190,16 @@
<version>${io_micrometer_version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>${kafka_version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams-test-utils</artifactId>
<version>${kafka_version}</version>
</dependency>
</dependencies>
<build>
</build>

View File

@@ -1,4 +1,4 @@
package uk.co.gresearch.siembol.configeditor.common;
package uk.co.gresearch.siembol.common.constants;
public enum ServiceType {
RESPONSE("response"),

View File

@@ -2,4 +2,16 @@ package uk.co.gresearch.siembol.common.constants;
public class SiembolConstants {
public static final int MAX_SIZE_CONFIG_UPDATE_LOG = 100;
public static final String TIMESTAMP = "timestamp";
public static final String PARSING_TIME = "siembol_parsing_ts";
public static final String ENRICHING_TIME = "siembol_enriching_ts";
public static final String RESPONSE_TIME = "siembol_response_ts";
public static final String SRC_ADDR = "ip_src_addr";
public static final String SRC_PORT = "ip_src_port";
public static final String DST_ADDR = "ip_dst_addr";
public static final String DST_PORT = "ip_dst_port";
public static final String PROTOCOL = "protocol";
public static final String ORIGINAL = "original_string";
public static final String GUID = "guid";
public static final String SENSOR_TYPE = "source_type";
}

View File

@@ -6,18 +6,18 @@ import java.util.Set;
import java.util.stream.Collectors;
public enum SiembolMessageFields {
SRC_ADDR("ip_src_addr"),
SRC_PORT("ip_src_port"),
DST_ADDR("ip_dst_addr"),
DST_PORT("ip_dst_port"),
PROTOCOL("protocol"),
TIMESTAMP("timestamp"),
ORIGINAL("original_string"),
GUID("guid"),
SENSOR_TYPE("source_type"),
PARSING_TIME("siembol_parsing_ts"),
ENRICHING_TIME("siembol_enriching_ts"),
RESPONSE_TIME("siembol_response_ts");
SRC_ADDR(SiembolConstants.SRC_ADDR),
SRC_PORT(SiembolConstants.SRC_PORT),
DST_ADDR(SiembolConstants.DST_ADDR),
DST_PORT(SiembolConstants.DST_PORT),
PROTOCOL(SiembolConstants.PROTOCOL),
ORIGINAL(SiembolConstants.ORIGINAL),
GUID(SiembolConstants.GUID),
SENSOR_TYPE(SiembolConstants.SENSOR_TYPE),
TIMESTAMP(SiembolConstants.TIMESTAMP),
PARSING_TIME(SiembolConstants.PARSING_TIME),
ENRICHING_TIME(SiembolConstants.ENRICHING_TIME),
RESPONSE_TIME(SiembolConstants.RESPONSE_TIME);
private final String name;
SiembolMessageFields(String name) {

View File

@@ -54,7 +54,16 @@ public enum SiembolMetrics {
SIEMBOL_SYNC_ADMIN_CONFIG("siembol_counter_sync_admin_config_%s"),
SIEMBOL_SYNC_RULES_VERSION("siembol_counter_sync_config_version_%s"),
CONFIG_EDITOR_REST_RELEASE_PR_SERVICE("siembol_counter_release_pr_%s"),
CONFIG_EDITOR_REST_ADMIN_CONFIG_PR_SERVICE("siembol_counter_admin_counfig_pr_%s");
CONFIG_EDITOR_REST_ADMIN_CONFIG_PR_SERVICE("siembol_counter_admin_counfig_pr_%s"),
HEARTBEAT_MESSAGES_SENT("siembol_counter_hearbeat_messages_sent_%s"),
HEARTBEAT_MESSAGES_READ("siembol_counter_hearbeat_messages_read"),
HEARTBEAT_LATENCY_PARSING_MS("siembol_gauge_hearbeat_latency_ms_parsing"),
HEARTBEAT_LATENCY_ENRICHING_MS("siembol_gauge_hearbeat_latency_ms_enriching"),
HEARTBEAT_LATENCY_RESPONDING_MS("siembol_gauge_hearbeat_latency_ms_responding"),
HEARTBEAT_LATENCY_TOTAL_MS("siembol_gauge_hearbeat_latency_ms_total"),
HEARTBEAT_PRODUCER_ERROR("siembol_counter_heartbeat_producer_error_%s"),
HEARTBEAT_CONSUMER_ERROR("siembol_counter_heartbeat_consumer_error");
private final String formatStringName;

View File

@@ -39,4 +39,11 @@ public class SiembolMetricsTestRegistrar implements SiembolMetricsRegistrar {
}
return countersMap.get(name).getValue();
}
public double getGaugeValue(String name) {
if (!gaugesMap.containsKey(name)) {
throw new IllegalArgumentException(String.format(METRIC_DOES_NOT_EXIST_MSG, name));
}
return gaugesMap.get(name).getValue();
}
}

View File

@@ -1,8 +1,9 @@
package uk.co.gresearch.siembol.response.stream.ruleservice;
package uk.co.gresearch.siembol.common.testing;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import uk.co.gresearch.siembol.common.utils.KafkaStreamsFactory;
import java.io.Closeable;
import java.util.Properties;

View File

@@ -1,4 +1,4 @@
package uk.co.gresearch.siembol.response.stream.ruleservice;
package uk.co.gresearch.siembol.common.utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.Topology;
@@ -8,3 +8,4 @@ import java.util.Properties;
public interface KafkaStreamsFactory {
KafkaStreams createKafkaStreams(Topology topology, Properties properties);
}

View File

@@ -1,4 +1,4 @@
package uk.co.gresearch.siembol.response.stream.ruleservice;
package uk.co.gresearch.siembol.common.utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.Topology;
@@ -9,6 +9,7 @@ import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
import java.util.Properties;
public class KafkaStreamsFactoryImpl implements KafkaStreamsFactory {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final String UNCAUGHT_EXCEPTION = "Uncaught exception in siembol response kafka streams";