diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 74a7c891..3ef795d4 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -70,6 +70,11 @@ jobs: with: name: storm-topology-manager path: deployment/storm-topology-manager/target/storm-topology-manager-*.jar + - name: Upload siembol-monitoring jar + uses: actions/upload-artifact@v3 + with: + name: siembol-monitoring + path: deployment/siembol-monitoring/target/siembol-monitoring-*.jar build-docker-storm: runs-on: ubuntu-latest @@ -133,7 +138,7 @@ jobs: needs: build-java strategy: matrix: - component: [config-editor-rest, responding-stream, storm-topology-manager] + component: [config-editor-rest, responding-stream, storm-topology-manager, siembol-monitoring] fail-fast: false steps: - name: Checkout @@ -341,7 +346,7 @@ jobs: environment: release strategy: matrix: - component: [alerting-storm, enriching-storm, parsing-storm, config-editor-rest, responding-stream, storm-topology-manager, config-editor-ui] + component: [alerting-storm, enriching-storm, parsing-storm, config-editor-rest, responding-stream, storm-topology-manager, config-editor-ui, siembol-monitoring] fail-fast: false steps: - name: Login to DockerHub diff --git a/alerting/alerting-core/pom.xml b/alerting/alerting-core/pom.xml index a368093b..89ec5519 100644 --- a/alerting/alerting-core/pom.xml +++ b/alerting/alerting-core/pom.xml @@ -11,7 +11,7 @@ uk.co.gresearch.siembol alerting - 2.5.11-SNAPSHOT + 2.5.12-SNAPSHOT @@ -35,7 +35,7 @@ uk.co.gresearch.siembol siembol-common - 2.5.11-SNAPSHOT + 2.5.12-SNAPSHOT junit diff --git a/alerting/alerting-spark/pom.xml b/alerting/alerting-spark/pom.xml index 96047cd9..b1a09995 100644 --- a/alerting/alerting-spark/pom.xml +++ b/alerting/alerting-spark/pom.xml @@ -11,7 +11,7 @@ uk.co.gresearch.siembol alerting - 2.5.11-SNAPSHOT + 2.5.12-SNAPSHOT @@ -23,7 +23,7 @@ uk.co.gresearch.siembol alerting-core - 2.5.11-SNAPSHOT + 2.5.12-SNAPSHOT jackson-databind diff --git a/alerting/alerting-storm/pom.xml b/alerting/alerting-storm/pom.xml index 33d74f0a..7b7ad5ab 100644 --- a/alerting/alerting-storm/pom.xml +++ b/alerting/alerting-storm/pom.xml @@ -9,7 +9,7 @@ uk.co.gresearch.siembol alerting - 2.5.11-SNAPSHOT + 2.5.12-SNAPSHOT @@ -51,7 +51,7 @@ uk.co.gresearch.siembol alerting-core - 2.5.11-SNAPSHOT + 2.5.12-SNAPSHOT org.slf4j diff --git a/alerting/pom.xml b/alerting/pom.xml index 13929985..1cdba6d3 100644 --- a/alerting/pom.xml +++ b/alerting/pom.xml @@ -11,7 +11,7 @@ uk.co.gresearch.siembol siembol - 2.5.11-SNAPSHOT + 2.5.12-SNAPSHOT alerting-core diff --git a/config-editor/config-editor-core/pom.xml b/config-editor/config-editor-core/pom.xml index 194efba9..d282c12a 100644 --- a/config-editor/config-editor-core/pom.xml +++ b/config-editor/config-editor-core/pom.xml @@ -9,13 +9,13 @@ uk.co.gresearch.siembol config-editor - 2.5.11-SNAPSHOT + 2.5.12-SNAPSHOT uk.co.gresearch.siembol siembol-common - 2.5.11-SNAPSHOT + 2.5.12-SNAPSHOT org.apache.commons diff --git a/config-editor/config-editor-core/src/main/java/uk/co/gresearch/siembol/configeditor/serviceaggregator/ServiceAggregatorImpl.java b/config-editor/config-editor-core/src/main/java/uk/co/gresearch/siembol/configeditor/serviceaggregator/ServiceAggregatorImpl.java index d46d67a2..864a18bf 100644 --- a/config-editor/config-editor-core/src/main/java/uk/co/gresearch/siembol/configeditor/serviceaggregator/ServiceAggregatorImpl.java +++ b/config-editor/config-editor-core/src/main/java/uk/co/gresearch/siembol/configeditor/serviceaggregator/ServiceAggregatorImpl.java @@ -8,6 +8,7 @@ import org.slf4j.LoggerFactory; import org.springframework.boot.actuate.health.Health; import org.springframework.boot.actuate.health.Status; +import uk.co.gresearch.siembol.common.constants.ServiceType; import uk.co.gresearch.siembol.configeditor.common.*; import uk.co.gresearch.siembol.configeditor.configstore.ConfigStore; import uk.co.gresearch.siembol.configeditor.configstore.ConfigStoreImpl; diff --git a/config-editor/config-editor-core/src/main/java/uk/co/gresearch/siembol/configeditor/serviceaggregator/ServiceAggregatorService.java b/config-editor/config-editor-core/src/main/java/uk/co/gresearch/siembol/configeditor/serviceaggregator/ServiceAggregatorService.java index 05a0bc48..ce247870 100644 --- a/config-editor/config-editor-core/src/main/java/uk/co/gresearch/siembol/configeditor/serviceaggregator/ServiceAggregatorService.java +++ b/config-editor/config-editor-core/src/main/java/uk/co/gresearch/siembol/configeditor/serviceaggregator/ServiceAggregatorService.java @@ -1,7 +1,7 @@ package uk.co.gresearch.siembol.configeditor.serviceaggregator; import uk.co.gresearch.siembol.configeditor.common.ConfigSchemaService; -import uk.co.gresearch.siembol.configeditor.common.ServiceType; +import uk.co.gresearch.siembol.common.constants.ServiceType; import uk.co.gresearch.siembol.configeditor.configstore.ConfigStore; import static uk.co.gresearch.siembol.configeditor.model.ConfigEditorResult.StatusCode.OK; diff --git a/config-editor/config-editor-core/src/test/java/uk/co/gresearch/siembol/configeditor/serviceaggregator/ServiceAggregatorImplTest.java b/config-editor/config-editor-core/src/test/java/uk/co/gresearch/siembol/configeditor/serviceaggregator/ServiceAggregatorImplTest.java index 814e0bdb..26ea2362 100644 --- a/config-editor/config-editor-core/src/test/java/uk/co/gresearch/siembol/configeditor/serviceaggregator/ServiceAggregatorImplTest.java +++ b/config-editor/config-editor-core/src/test/java/uk/co/gresearch/siembol/configeditor/serviceaggregator/ServiceAggregatorImplTest.java @@ -8,7 +8,7 @@ import org.springframework.boot.actuate.health.Health; import org.springframework.boot.actuate.health.Status; import uk.co.gresearch.siembol.configeditor.common.AuthorisationProvider; import uk.co.gresearch.siembol.configeditor.common.ConfigSchemaService; -import uk.co.gresearch.siembol.configeditor.common.ServiceType; +import uk.co.gresearch.siembol.common.constants.ServiceType; import uk.co.gresearch.siembol.configeditor.common.UserInfo; import uk.co.gresearch.siembol.configeditor.configstore.ConfigStore; import uk.co.gresearch.siembol.configeditor.model.ConfigEditorAttributes; diff --git a/config-editor/config-editor-rest/pom.xml b/config-editor/config-editor-rest/pom.xml index 01ab3a62..939b2738 100644 --- a/config-editor/config-editor-rest/pom.xml +++ b/config-editor/config-editor-rest/pom.xml @@ -9,7 +9,7 @@ uk.co.gresearch.siembol config-editor - 2.5.11-SNAPSHOT + 2.5.12-SNAPSHOT @@ -56,7 +56,7 @@ uk.co.gresearch.siembol siembol-common - 2.5.11-SNAPSHOT + 2.5.12-SNAPSHOT org.slf4j @@ -67,22 +67,22 @@ uk.co.gresearch.siembol config-editor-core - 2.5.11-SNAPSHOT + 2.5.12-SNAPSHOT uk.co.gresearch.siembol config-editor-services - 2.5.11-SNAPSHOT + 2.5.12-SNAPSHOT uk.co.gresearch.siembol config-editor-sync - 2.5.11-SNAPSHOT + 2.5.12-SNAPSHOT uk.co.gresearch.siembol alerting-core - 2.5.11-SNAPSHOT + 2.5.12-SNAPSHOT org.slf4j @@ -93,7 +93,7 @@ uk.co.gresearch.siembol parsing-app - 2.5.11-SNAPSHOT + 2.5.12-SNAPSHOT org.slf4j @@ -104,7 +104,7 @@ uk.co.gresearch.siembol enriching-core - 2.5.11-SNAPSHOT + 2.5.12-SNAPSHOT org.slf4j @@ -115,7 +115,7 @@ uk.co.gresearch.siembol responding-core - 2.5.11-SNAPSHOT + 2.5.12-SNAPSHOT org.slf4j diff --git a/config-editor/config-editor-rest/src/main/java/uk/co/gresearch/siembol/configeditor/rest/application/ConfigEditorConfiguration.java b/config-editor/config-editor-rest/src/main/java/uk/co/gresearch/siembol/configeditor/rest/application/ConfigEditorConfiguration.java index 02986465..2a7265c7 100644 --- a/config-editor/config-editor-rest/src/main/java/uk/co/gresearch/siembol/configeditor/rest/application/ConfigEditorConfiguration.java +++ b/config-editor/config-editor-rest/src/main/java/uk/co/gresearch/siembol/configeditor/rest/application/ConfigEditorConfiguration.java @@ -17,7 +17,7 @@ import uk.co.gresearch.siembol.common.zookeeper.ZooKeeperConnectorFactoryImpl; import uk.co.gresearch.siembol.configeditor.common.AuthorisationProvider; import uk.co.gresearch.siembol.configeditor.common.ConfigEditorUtils; import uk.co.gresearch.siembol.configeditor.common.ConfigSchemaService; -import uk.co.gresearch.siembol.configeditor.common.ServiceType; +import uk.co.gresearch.siembol.common.constants.ServiceType; import uk.co.gresearch.siembol.configeditor.sync.common.ConfigServiceHelper; import uk.co.gresearch.siembol.configeditor.sync.service.*; import uk.co.gresearch.siembol.configeditor.model.ConfigEditorUiLayout; diff --git a/config-editor/config-editor-rest/src/main/java/uk/co/gresearch/siembol/configeditor/rest/application/ConfigServiceHelperImpl.java b/config-editor/config-editor-rest/src/main/java/uk/co/gresearch/siembol/configeditor/rest/application/ConfigServiceHelperImpl.java index c3d3123c..b9bb4d63 100644 --- a/config-editor/config-editor-rest/src/main/java/uk/co/gresearch/siembol/configeditor/rest/application/ConfigServiceHelperImpl.java +++ b/config-editor/config-editor-rest/src/main/java/uk/co/gresearch/siembol/configeditor/rest/application/ConfigServiceHelperImpl.java @@ -6,7 +6,7 @@ import uk.co.gresearch.siembol.common.zookeeper.ZooKeeperConnector; import uk.co.gresearch.siembol.common.zookeeper.ZooKeeperConnectorFactory; import uk.co.gresearch.siembol.configeditor.common.ConfigInfoProvider; import uk.co.gresearch.siembol.configeditor.common.ConfigInfoType; -import uk.co.gresearch.siembol.configeditor.common.ServiceType; +import uk.co.gresearch.siembol.common.constants.ServiceType; import uk.co.gresearch.siembol.configeditor.configinfo.AdminConfigInfoProvider; import uk.co.gresearch.siembol.configeditor.model.ConfigEditorResult; import uk.co.gresearch.siembol.configeditor.rest.common.ConfigEditorConfigurationProperties; diff --git a/config-editor/config-editor-services/pom.xml b/config-editor/config-editor-services/pom.xml index 734589b9..e008b8b0 100644 --- a/config-editor/config-editor-services/pom.xml +++ b/config-editor/config-editor-services/pom.xml @@ -10,7 +10,7 @@ uk.co.gresearch.siembol config-editor - 2.5.11-SNAPSHOT + 2.5.12-SNAPSHOT @@ -41,32 +41,32 @@ uk.co.gresearch.siembol siembol-common - 2.5.11-SNAPSHOT + 2.5.12-SNAPSHOT uk.co.gresearch.siembol config-editor-core - 2.5.11-SNAPSHOT + 2.5.12-SNAPSHOT uk.co.gresearch.siembol alerting-core - 2.5.11-SNAPSHOT + 2.5.12-SNAPSHOT uk.co.gresearch.siembol parsing-app - 2.5.11-SNAPSHOT + 2.5.12-SNAPSHOT uk.co.gresearch.siembol enriching-core - 2.5.11-SNAPSHOT + 2.5.12-SNAPSHOT uk.co.gresearch.siembol responding-core - 2.5.11-SNAPSHOT + 2.5.12-SNAPSHOT junit diff --git a/config-editor/config-editor-services/src/main/java/uk/co/gresearch/siembol/configeditor/service/common/ConfigEditorServiceFactory.java b/config-editor/config-editor-services/src/main/java/uk/co/gresearch/siembol/configeditor/service/common/ConfigEditorServiceFactory.java index aa6b2fac..54dfdeed 100644 --- a/config-editor/config-editor-services/src/main/java/uk/co/gresearch/siembol/configeditor/service/common/ConfigEditorServiceFactory.java +++ b/config-editor/config-editor-services/src/main/java/uk/co/gresearch/siembol/configeditor/service/common/ConfigEditorServiceFactory.java @@ -2,7 +2,7 @@ package uk.co.gresearch.siembol.configeditor.service.common; import uk.co.gresearch.siembol.configeditor.common.ConfigInfoProvider; import uk.co.gresearch.siembol.configeditor.common.ConfigSchemaService; -import uk.co.gresearch.siembol.configeditor.common.ServiceType; +import uk.co.gresearch.siembol.common.constants.ServiceType; import uk.co.gresearch.siembol.configeditor.configinfo.JsonRuleConfigInfoProvider; import uk.co.gresearch.siembol.configeditor.model.ConfigEditorUiLayout; import uk.co.gresearch.siembol.configeditor.service.alerts.AlertingRuleSchemaService; diff --git a/config-editor/config-editor-sync/pom.xml b/config-editor/config-editor-sync/pom.xml index 421ea39b..70bec0a5 100644 --- a/config-editor/config-editor-sync/pom.xml +++ b/config-editor/config-editor-sync/pom.xml @@ -9,7 +9,7 @@ uk.co.gresearch.siembol config-editor - 2.5.11-SNAPSHOT + 2.5.12-SNAPSHOT @@ -20,17 +20,17 @@ uk.co.gresearch.siembol siembol-common - 2.5.11-SNAPSHOT + 2.5.12-SNAPSHOT uk.co.gresearch.siembol config-editor-core - 2.5.11-SNAPSHOT + 2.5.12-SNAPSHOT uk.co.gresearch.siembol parsing-app - 2.5.11-SNAPSHOT + 2.5.12-SNAPSHOT provided diff --git a/config-editor/config-editor-sync/src/main/java/uk/co/gresearch/siembol/configeditor/sync/common/ConfigServiceHelper.java b/config-editor/config-editor-sync/src/main/java/uk/co/gresearch/siembol/configeditor/sync/common/ConfigServiceHelper.java index 2e724101..85f4ca60 100644 --- a/config-editor/config-editor-sync/src/main/java/uk/co/gresearch/siembol/configeditor/sync/common/ConfigServiceHelper.java +++ b/config-editor/config-editor-sync/src/main/java/uk/co/gresearch/siembol/configeditor/sync/common/ConfigServiceHelper.java @@ -1,7 +1,7 @@ package uk.co.gresearch.siembol.configeditor.sync.common; import uk.co.gresearch.siembol.common.zookeeper.ZooKeeperConnector; -import uk.co.gresearch.siembol.configeditor.common.ServiceType; +import uk.co.gresearch.siembol.common.constants.ServiceType; import java.util.Optional; diff --git a/config-editor/config-editor-sync/src/main/java/uk/co/gresearch/siembol/configeditor/sync/service/SynchronisationServiceImpl.java b/config-editor/config-editor-sync/src/main/java/uk/co/gresearch/siembol/configeditor/sync/service/SynchronisationServiceImpl.java index 4ae7df61..2dff6c81 100644 --- a/config-editor/config-editor-sync/src/main/java/uk/co/gresearch/siembol/configeditor/sync/service/SynchronisationServiceImpl.java +++ b/config-editor/config-editor-sync/src/main/java/uk/co/gresearch/siembol/configeditor/sync/service/SynchronisationServiceImpl.java @@ -4,7 +4,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.actuate.health.Health; import uk.co.gresearch.siembol.common.model.StormTopologyDto; -import uk.co.gresearch.siembol.configeditor.common.ServiceType; +import uk.co.gresearch.siembol.common.constants.ServiceType; import uk.co.gresearch.siembol.configeditor.sync.actions.*; import uk.co.gresearch.siembol.configeditor.sync.common.ConfigServiceHelper; import uk.co.gresearch.siembol.configeditor.model.*; diff --git a/config-editor/pom.xml b/config-editor/pom.xml index 584729ae..eb34cee2 100644 --- a/config-editor/pom.xml +++ b/config-editor/pom.xml @@ -11,7 +11,7 @@ uk.co.gresearch.siembol siembol - 2.5.11-SNAPSHOT + 2.5.12-SNAPSHOT config-editor-core diff --git a/config/siembol-monitoring/application.properties b/config/siembol-monitoring/application.properties new file mode 100644 index 00000000..35aefcea --- /dev/null +++ b/config/siembol-monitoring/application.properties @@ -0,0 +1,30 @@ +management.endpoint.health.show-details=always +management.endpoints.web.exposure.include=info,prometheus,health +management.endpoints.web.path-mapping.prometheus=metrics +management.endpoints.web.base-path= + +server.port=8083 + +info.env.java.vendor=${java.specification.vendor} +info.env.java.vm-name=${java.vm.name} +info.env.java.runtime-version=${java.runtime.version} +info.env.java.app.description=Siembol monitoring + +logging.level.org.apache.http=INFO +logging.level.org.apache.http.wire=INFO +logging.level.org.springframework.web.filter.CommonsRequestLoggingFilter=INFO +logging.level.org.springframework.web=INFO +logging.level.org.springframework.web.servlet.mvc.method.annotation.RequestResponseBodyMethodProcessor=INFO + +siembol-monitoring.heartbeat-properties.heartbeat-interval-seconds=60 + +siembol-monitoring.heartbeat-properties.heartbeat-consumer.enabled-services=parsingapp,enrichment,response +siembol-monitoring.heartbeat-properties.heartbeat-consumer.input-topic=siembol.response.heartbeat +siembol-monitoring.heartbeat-properties.heartbeat-consumer.kafka-properties.[bootstrap.servers]=kafka-0.kafka-headless.siembol.svc.cluster.local:9092 +siembol-monitoring.heartbeat-properties.heartbeat-consumer.kafka-properties.[application.id]=siembol.heartbeat.reader +siembol-monitoring.heartbeat-properties.heartbeat-consumer.kafka-properties.[auto.offset.reset]=earliest +siembol-monitoring.heartbeat-properties.heartbeat-consumer.kafka-properties.[security.protocol]=PLAINTEXT + +siembol-monitoring.heartbeat-properties.heartbeat-producers.producer1.output-topic=siembol.heartbeat +siembol-monitoring.heartbeat-properties.heartbeat-producers.producer1.kafka-properties.[bootstrap.servers]=kafka-0.kafka-headless.siembol.svc.cluster.local:9092 +siembol-monitoring.heartbeat-properties.heartbeat-producers.producer1.kafka-properties.[security.protocol]=PLAINTEXT \ No newline at end of file diff --git a/deployment/siembol-monitoring/pom.xml b/deployment/siembol-monitoring/pom.xml new file mode 100644 index 00000000..fd141484 --- /dev/null +++ b/deployment/siembol-monitoring/pom.xml @@ -0,0 +1,139 @@ + + + 4.0.0 + siembol-monitoring + siembol-monitoring + jar + + uk.co.gresearch.siembol + siembol + 2.5.12-SNAPSHOT + ../../pom.xml + + + + + org.springframework.boot + spring-boot-dependencies + ${spring_boot_version} + pom + import + + + + + + com.fasterxml.jackson.core + jackson-databind + ${jackson_version} + + + org.springframework.boot + spring-boot-starter-logging + ${spring_boot_version} + + + uk.co.gresearch.siembol + siembol-common + 2.5.12-SNAPSHOT + + + org.slf4j + slf4j-log4j12 + + + + + io.projectreactor + reactor-core + ${reactor_core_version} + + + org.springframework.boot + spring-boot-starter-actuator + ${spring_boot_version} + + + org.springframework.boot + spring-boot-starter-web + ${spring_boot_version} + + + org.springframework.boot + spring-boot-starter-security + ${spring_boot_version} + + + net.bytebuddy + byte-buddy + ${byte_buddy_version} + test + + + io.micrometer + micrometer-registry-prometheus + ${io_micrometer_version} + + + org.mockito + mockito-core + ${mockito_version} + test + + + org.mockito + mockito-inline + ${mockito_version} + test + + + junit + junit + ${junit_version} + test + + + org.apache.kafka + kafka-clients + ${kafka_version} + + + org.apache.kafka + kafka-streams + ${kafka_version} + + + org.apache.kafka + kafka-streams-test-utils + ${kafka_version} + test + + + + + + org.springframework.boot + spring-boot-maven-plugin + ${spring_boot_version} + + uk.co.gresearch.siembol.deployment.monitoring.application.Application + + + + build-info + + build-info + + + + + repackage + + + + + + + \ No newline at end of file diff --git a/deployment/siembol-monitoring/src/main/java/uk/co/gresearch/siembol/deployment/monitoring/application/Application.java b/deployment/siembol-monitoring/src/main/java/uk/co/gresearch/siembol/deployment/monitoring/application/Application.java new file mode 100644 index 00000000..5a0fc9e5 --- /dev/null +++ b/deployment/siembol-monitoring/src/main/java/uk/co/gresearch/siembol/deployment/monitoring/application/Application.java @@ -0,0 +1,13 @@ +package uk.co.gresearch.siembol.deployment.monitoring.application; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class Application { + public static void main(String[] args) { + SpringApplication app = new SpringApplication(Application.class); + app.setRegisterShutdownHook(true); + app.run(args); + } +} diff --git a/deployment/siembol-monitoring/src/main/java/uk/co/gresearch/siembol/deployment/monitoring/application/ServiceConfigurationProperties.java b/deployment/siembol-monitoring/src/main/java/uk/co/gresearch/siembol/deployment/monitoring/application/ServiceConfigurationProperties.java new file mode 100644 index 00000000..38b2c832 --- /dev/null +++ b/deployment/siembol-monitoring/src/main/java/uk/co/gresearch/siembol/deployment/monitoring/application/ServiceConfigurationProperties.java @@ -0,0 +1,19 @@ +package uk.co.gresearch.siembol.deployment.monitoring.application; + +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.boot.context.properties.NestedConfigurationProperty; +import uk.co.gresearch.siembol.deployment.monitoring.heartbeat.HeartbeatProperties; + +@ConfigurationProperties(prefix = "siembol-monitoring") +public class ServiceConfigurationProperties { + @NestedConfigurationProperty + private HeartbeatProperties heartbeatProperties; + + public HeartbeatProperties getHeartbeatProperties() { + return heartbeatProperties; + } + + public void setHeartbeatProperties(HeartbeatProperties heartbeatProperties) { + this.heartbeatProperties = heartbeatProperties; + } +} diff --git a/deployment/siembol-monitoring/src/main/java/uk/co/gresearch/siembol/deployment/monitoring/application/SiembolMonitoringConfiguration.java b/deployment/siembol-monitoring/src/main/java/uk/co/gresearch/siembol/deployment/monitoring/application/SiembolMonitoringConfiguration.java new file mode 100644 index 00000000..e17cb56a --- /dev/null +++ b/deployment/siembol-monitoring/src/main/java/uk/co/gresearch/siembol/deployment/monitoring/application/SiembolMonitoringConfiguration.java @@ -0,0 +1,49 @@ +package uk.co.gresearch.siembol.deployment.monitoring.application; + +import io.micrometer.core.instrument.MeterRegistry; +import org.springframework.beans.factory.DisposableBean; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.DependsOn; +import uk.co.gresearch.siembol.common.metrics.SiembolMetricsRegistrar; +import uk.co.gresearch.siembol.common.metrics.spring.SpringMetricsRegistrar; +import uk.co.gresearch.siembol.deployment.monitoring.heartbeat.HeartbeatConsumer; +import uk.co.gresearch.siembol.deployment.monitoring.heartbeat.HeartbeatProducerScheduler; + +@Configuration +@EnableConfigurationProperties(ServiceConfigurationProperties.class) +public class SiembolMonitoringConfiguration implements DisposableBean { + @Autowired + private ServiceConfigurationProperties properties; + @Autowired + private MeterRegistry springMeterRegistrar; + private HeartbeatConsumer heartbeatConsumer; + + @Bean("metricsRegistrar") + SiembolMetricsRegistrar metricsRegistrar() { + return new SpringMetricsRegistrar(springMeterRegistrar); + } + + @Bean("heartbeatProducerScheduler") + @DependsOn("metricsRegistrar") + HeartbeatProducerScheduler heartbeatProducerScheduler(@Autowired SiembolMetricsRegistrar metricsRegistrar) { + return new HeartbeatProducerScheduler(properties.getHeartbeatProperties(), metricsRegistrar); + } + + @Bean("heartbeatConsumer") + @DependsOn("metricsRegistrar") + HeartbeatConsumer heartbeatConsumer(@Autowired SiembolMetricsRegistrar metricsRegistrar) { + heartbeatConsumer = new HeartbeatConsumer(properties.getHeartbeatProperties().getHeartbeatConsumer(), metricsRegistrar); + return heartbeatConsumer; + } + + @Override + public void destroy() { + if (heartbeatConsumer == null) { + return; + } + heartbeatConsumer.close(); + } +} diff --git a/deployment/siembol-monitoring/src/main/java/uk/co/gresearch/siembol/deployment/monitoring/application/SiembolMonitoringHealthIndicator.java b/deployment/siembol-monitoring/src/main/java/uk/co/gresearch/siembol/deployment/monitoring/application/SiembolMonitoringHealthIndicator.java new file mode 100644 index 00000000..07e83cea --- /dev/null +++ b/deployment/siembol-monitoring/src/main/java/uk/co/gresearch/siembol/deployment/monitoring/application/SiembolMonitoringHealthIndicator.java @@ -0,0 +1,35 @@ +package uk.co.gresearch.siembol.deployment.monitoring.application; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.actuate.health.Health; +import org.springframework.boot.actuate.health.ReactiveHealthIndicator; +import org.springframework.boot.actuate.health.SimpleStatusAggregator; +import org.springframework.boot.actuate.health.Status; +import org.springframework.stereotype.Component; +import reactor.core.publisher.Mono; +import uk.co.gresearch.siembol.deployment.monitoring.heartbeat.HeartbeatConsumer; +import uk.co.gresearch.siembol.deployment.monitoring.heartbeat.HeartbeatProducerScheduler; + + +@Component +public class SiembolMonitoringHealthIndicator implements ReactiveHealthIndicator { + private final SimpleStatusAggregator statusAggregator = new SimpleStatusAggregator(); + @Autowired + private HeartbeatProducerScheduler heartbeatProducerScheduler; + @Autowired + private HeartbeatConsumer heartbeatConsumer; + + @Override + public Mono health() { + return checkDownstreamServiceHealth().onErrorResume( + ex -> Mono.just(new Health.Builder().down(ex).build())); + } + + private Mono checkDownstreamServiceHealth() { + Status status = statusAggregator.getAggregateStatus( + heartbeatProducerScheduler.checkHealth().getStatus(), + heartbeatConsumer.checkHealth().getStatus()); + return Mono.just(new Health.Builder().status(status).build()); + } +} + diff --git a/deployment/siembol-monitoring/src/main/java/uk/co/gresearch/siembol/deployment/monitoring/application/SiembolMonitoringUnauthenticatedSecurityAdapter.java b/deployment/siembol-monitoring/src/main/java/uk/co/gresearch/siembol/deployment/monitoring/application/SiembolMonitoringUnauthenticatedSecurityAdapter.java new file mode 100644 index 00000000..7dd0c0d0 --- /dev/null +++ b/deployment/siembol-monitoring/src/main/java/uk/co/gresearch/siembol/deployment/monitoring/application/SiembolMonitoringUnauthenticatedSecurityAdapter.java @@ -0,0 +1,10 @@ +package uk.co.gresearch.siembol.deployment.monitoring.application; + +import org.springframework.context.annotation.Configuration; +import org.springframework.security.config.annotation.web.configuration.EnableWebSecurity; +import uk.co.gresearch.siembol.common.authorisation.SiembolUnauthenticatedSecurityAdapter; + +@Configuration +@EnableWebSecurity +public class SiembolMonitoringUnauthenticatedSecurityAdapter extends SiembolUnauthenticatedSecurityAdapter { +} diff --git a/deployment/siembol-monitoring/src/main/java/uk/co/gresearch/siembol/deployment/monitoring/heartbeat/HeartbeatConsumer.java b/deployment/siembol-monitoring/src/main/java/uk/co/gresearch/siembol/deployment/monitoring/heartbeat/HeartbeatConsumer.java new file mode 100644 index 00000000..a8a2fc04 --- /dev/null +++ b/deployment/siembol-monitoring/src/main/java/uk/co/gresearch/siembol/deployment/monitoring/heartbeat/HeartbeatConsumer.java @@ -0,0 +1,124 @@ +package uk.co.gresearch.siembol.deployment.monitoring.heartbeat; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectReader; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.Topology; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.actuate.health.Health; +import uk.co.gresearch.siembol.common.constants.ServiceType; +import uk.co.gresearch.siembol.common.metrics.SiembolCounter; +import uk.co.gresearch.siembol.common.metrics.SiembolGauge; +import uk.co.gresearch.siembol.common.metrics.SiembolMetrics; +import uk.co.gresearch.siembol.common.metrics.SiembolMetricsRegistrar; +import uk.co.gresearch.siembol.common.utils.KafkaStreamsFactoryImpl; +import uk.co.gresearch.siembol.common.utils.KafkaStreamsFactory; + +import java.lang.invoke.MethodHandles; +import java.time.Instant; +import java.util.*; + + +public class HeartbeatConsumer { + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private static final String INIT_START = "Kafka stream service initialisation started"; + private static final String INIT_COMPLETED = "Kafka stream service initialisation completed"; + private static final ObjectReader MESSAGE_READER = new ObjectMapper() + .readerFor(HeartbeatProcessedMessage.class); + private final KafkaStreams streams; + private final SiembolGauge totalLatencyGauge; + private final SiembolCounter consumerErrorCount; + private final SiembolCounter consumerMessageRead; + private final List> servicesMetrics = new ArrayList<>(); + + public HeartbeatConsumer(HeartbeatConsumerProperties properties, SiembolMetricsRegistrar metricsRegistrar) { + this(properties, metricsRegistrar, new KafkaStreamsFactoryImpl()); + } + + HeartbeatConsumer(HeartbeatConsumerProperties properties, SiembolMetricsRegistrar metricsRegistrar, + KafkaStreamsFactory streamsFactory) { + consumerErrorCount = metricsRegistrar.registerCounter(SiembolMetrics.HEARTBEAT_CONSUMER_ERROR.name()); + consumerMessageRead = metricsRegistrar.registerCounter(SiembolMetrics.HEARTBEAT_MESSAGES_READ.name()); + + if (properties.getEnabledServices() == null) { + throw new IllegalArgumentException("Missing enabled services for heartbeat consumer."); + } + if (properties.getEnabledServices().contains(ServiceType.PARSING_APP)) { + servicesMetrics.add(Pair.of(ServiceType.PARSING_APP, + metricsRegistrar.registerGauge(SiembolMetrics.HEARTBEAT_LATENCY_PARSING_MS.name()))); + } + if (properties.getEnabledServices().contains(ServiceType.ENRICHMENT)) { + servicesMetrics.add(Pair.of(ServiceType.ENRICHMENT, + metricsRegistrar.registerGauge(SiembolMetrics.HEARTBEAT_LATENCY_ENRICHING_MS.name()))); + } + if (properties.getEnabledServices().contains(ServiceType.RESPONSE)) { + servicesMetrics.add(Pair.of(ServiceType.RESPONSE, + metricsRegistrar.registerGauge(SiembolMetrics.HEARTBEAT_LATENCY_RESPONDING_MS.name()))); + } + totalLatencyGauge = metricsRegistrar.registerGauge(SiembolMetrics.HEARTBEAT_LATENCY_TOTAL_MS.name()); + streams = createStreams(streamsFactory, properties); + streams.start(); + } + + private KafkaStreams createStreams(KafkaStreamsFactory streamsFactory, HeartbeatConsumerProperties properties) { + LOG.info(INIT_START); + StreamsBuilder builder = new StreamsBuilder(); + builder.stream(properties.getInputTopic()) + .foreach((key, value) -> this.processMessage(value)); + + Properties configuration = new Properties(); + configuration.putAll(properties.getKafkaProperties()); + configuration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + configuration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + Topology topology = builder.build(configuration); + + KafkaStreams ret = streamsFactory.createKafkaStreams(topology, configuration); + LOG.info(INIT_COMPLETED); + return ret; + } + + private void processMessage(String value) { + try { + var currentTimestamp = Instant.now().toEpochMilli(); + HeartbeatProcessedMessage message = MESSAGE_READER.readValue(value); + var lastTimestamp = message.getTimestamp().longValue(); + + for(var serviceMetric: servicesMetrics) { + switch (serviceMetric.getKey()) { + case PARSING_APP: + serviceMetric.getValue().setValue(message.getParsingTime().longValue() - lastTimestamp); + lastTimestamp = message.getParsingTime().longValue(); + break; + case ENRICHMENT: + serviceMetric.getValue().setValue(message.getEnrichingTime().longValue() - lastTimestamp); + lastTimestamp = message.getEnrichingTime().longValue(); + break; + case RESPONSE: + serviceMetric.getValue().setValue(message.getResponseTime().longValue() - lastTimestamp); + lastTimestamp = message.getResponseTime().longValue(); + } + + } + totalLatencyGauge.setValue(currentTimestamp - message.getTimestamp().longValue()); + consumerMessageRead.increment(); + } catch (Exception e) { + LOG.error("Error reading heartbeat message from kafka: {}", e.toString()); + consumerErrorCount.increment(); + } + } + + public Health checkHealth() { + return streams.state().isRunningOrRebalancing() || streams.state().equals(KafkaStreams.State.CREATED) + ? Health.up().build() : + Health.down().build(); + } + + public void close() { + streams.close(); + } +} diff --git a/deployment/siembol-monitoring/src/main/java/uk/co/gresearch/siembol/deployment/monitoring/heartbeat/HeartbeatConsumerProperties.java b/deployment/siembol-monitoring/src/main/java/uk/co/gresearch/siembol/deployment/monitoring/heartbeat/HeartbeatConsumerProperties.java new file mode 100644 index 00000000..43522bf9 --- /dev/null +++ b/deployment/siembol-monitoring/src/main/java/uk/co/gresearch/siembol/deployment/monitoring/heartbeat/HeartbeatConsumerProperties.java @@ -0,0 +1,36 @@ +package uk.co.gresearch.siembol.deployment.monitoring.heartbeat; + +import java.util.List; +import java.util.Map; +import uk.co.gresearch.siembol.common.constants.ServiceType; + +public class HeartbeatConsumerProperties { + private List enabledServices; + private String inputTopic; + private Map kafkaProperties; + + public List getEnabledServices() { + return enabledServices; + } + + public void setEnabledServices(List enabledServices) { + this.enabledServices = enabledServices; + } + + public String getInputTopic() { + return inputTopic; + } + + public void setInputTopic(String inputTopic) { + this.inputTopic = inputTopic; + } + + public Map getKafkaProperties() { + return kafkaProperties; + } + + public void setKafkaProperties(Map kafkaProperties) { + this.kafkaProperties = kafkaProperties; + } + +} diff --git a/deployment/siembol-monitoring/src/main/java/uk/co/gresearch/siembol/deployment/monitoring/heartbeat/HeartbeatMessage.java b/deployment/siembol-monitoring/src/main/java/uk/co/gresearch/siembol/deployment/monitoring/heartbeat/HeartbeatMessage.java new file mode 100644 index 00000000..dcee98ec --- /dev/null +++ b/deployment/siembol-monitoring/src/main/java/uk/co/gresearch/siembol/deployment/monitoring/heartbeat/HeartbeatMessage.java @@ -0,0 +1,52 @@ +package uk.co.gresearch.siembol.deployment.monitoring.heartbeat; + +import com.fasterxml.jackson.annotation.JsonAnyGetter; +import com.fasterxml.jackson.annotation.JsonAnySetter; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.LinkedHashMap; +import java.util.Map; + +public class HeartbeatMessage { + @JsonProperty("event_time") + private String eventTime; + @JsonProperty("siembol_heartbeat") + private Boolean siembolHeartbeat = true; + @JsonProperty("producer_name") + private String producerName; + + private Map message = new LinkedHashMap<>(); + + @JsonAnySetter + void setMessage(String key, Object value) { + message.put(key, value); + } + @JsonAnyGetter + public Map getMessage() { + return message; + } + + public Boolean getSiembolHeartbeat() { + return siembolHeartbeat; + } + + public void setSiembolHeartbeat(Boolean siembolHeartbeat) { + this.siembolHeartbeat = siembolHeartbeat; + } + + public String getEventTime() { + return eventTime; + } + + public void setEventTime(String eventTime) { + this.eventTime = eventTime; + } + + public String getProducerName() { + return producerName; + } + + public void setProducerName(String producerName) { + this.producerName = producerName; + } +} diff --git a/deployment/siembol-monitoring/src/main/java/uk/co/gresearch/siembol/deployment/monitoring/heartbeat/HeartbeatProcessedMessage.java b/deployment/siembol-monitoring/src/main/java/uk/co/gresearch/siembol/deployment/monitoring/heartbeat/HeartbeatProcessedMessage.java new file mode 100644 index 00000000..9e750c92 --- /dev/null +++ b/deployment/siembol-monitoring/src/main/java/uk/co/gresearch/siembol/deployment/monitoring/heartbeat/HeartbeatProcessedMessage.java @@ -0,0 +1,50 @@ +package uk.co.gresearch.siembol.deployment.monitoring.heartbeat; + +import com.fasterxml.jackson.annotation.JsonProperty; +import uk.co.gresearch.siembol.common.constants.SiembolConstants; + +public class HeartbeatProcessedMessage extends HeartbeatMessage { + @JsonProperty(SiembolConstants.TIMESTAMP) + private Number timestamp; + + @JsonProperty(SiembolConstants.PARSING_TIME) + private Number parsingTime; + + @JsonProperty(SiembolConstants.ENRICHING_TIME) + private Number enrichingTime; + + @JsonProperty(SiembolConstants.RESPONSE_TIME) + private Number responseTime; + + public Number getTimestamp() { + return timestamp; + } + + public void setTimestamp(Number timestamp) { + this.timestamp = timestamp; + } + + public Number getParsingTime() { + return parsingTime; + } + + public void setParsingTime(Number parsingTime) { + this.parsingTime = parsingTime; + } + + public Number getEnrichingTime() { + return enrichingTime; + } + + public void setEnrichingTime(Number enrichingTime) { + this.enrichingTime = enrichingTime; + } + + public Number getResponseTime() { + return responseTime; + } + + public void setResponseTime(Number responseTime) { + this.responseTime = responseTime; + } +} diff --git a/deployment/siembol-monitoring/src/main/java/uk/co/gresearch/siembol/deployment/monitoring/heartbeat/HeartbeatProducer.java b/deployment/siembol-monitoring/src/main/java/uk/co/gresearch/siembol/deployment/monitoring/heartbeat/HeartbeatProducer.java new file mode 100644 index 00000000..50aab6ba --- /dev/null +++ b/deployment/siembol-monitoring/src/main/java/uk/co/gresearch/siembol/deployment/monitoring/heartbeat/HeartbeatProducer.java @@ -0,0 +1,96 @@ +package uk.co.gresearch.siembol.deployment.monitoring.heartbeat; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectWriter; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.actuate.health.Health; +import uk.co.gresearch.siembol.common.metrics.SiembolCounter; +import uk.co.gresearch.siembol.common.metrics.SiembolMetrics; +import uk.co.gresearch.siembol.common.metrics.SiembolMetricsRegistrar; + +import java.io.Closeable; +import java.lang.invoke.MethodHandles; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; + +public class HeartbeatProducer implements Closeable { + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private static final String MISSING_KAFKA_WRITER_PROPS_MSG = "Missing heartbeat kafka producer properties for %s"; + private static final ObjectWriter OBJECT_WRITER = new ObjectMapper().writer(); + private final HeartbeatMessage message = new HeartbeatMessage(); + private final AtomicReference exception = new AtomicReference<>(); + private final String producerName; + private final Producer producer; + private final SiembolCounter updateCounter; + private final SiembolCounter errorCounter; + private final String topicName; + + public HeartbeatProducer(HeartbeatProducerProperties producerProperties, + String producerName, + Map heartbeatMessageProperties, + SiembolMetricsRegistrar metricsRegistrar) { + this(producerProperties, producerName, heartbeatMessageProperties, metricsRegistrar, + x -> new KafkaProducer<>(x.getKafkaProperties(), new StringSerializer(), + new StringSerializer()) ); + } + + HeartbeatProducer(HeartbeatProducerProperties producerProperties, + String producerName, + Map heartbeatMessageProperties, + SiembolMetricsRegistrar metricsRegistrar, + Function> factory) { + this.producerName = producerName; + this.initialiseMessage(heartbeatMessageProperties); + if (producerProperties == null) { + throw new IllegalArgumentException(String.format(MISSING_KAFKA_WRITER_PROPS_MSG, + producerName)); + } + producer = factory.apply(producerProperties); + topicName = producerProperties.getOutputTopic(); + updateCounter = + metricsRegistrar.registerCounter(SiembolMetrics.HEARTBEAT_MESSAGES_SENT.getMetricName(producerName)); + errorCounter = + metricsRegistrar.registerCounter(SiembolMetrics.HEARTBEAT_PRODUCER_ERROR.getMetricName(producerName)); + + } + + private void initialiseMessage(Map messageProperties) { + if (messageProperties != null) { + for (Map.Entry messageEntry : messageProperties.entrySet()) { + this.message.setMessage(messageEntry.getKey(), messageEntry.getValue()); + } + } + } + + public void sendHeartbeat() { + var now = Instant.now().truncatedTo(ChronoUnit.MILLIS); + this.message.setEventTime(now.toString()); // ISO format + this.message.setProducerName(producerName); + try { + producer.send(new ProducerRecord<>(topicName, OBJECT_WRITER.writeValueAsString(this.message))).get(); + updateCounter.increment(); + LOG.info("Sent heartbeat with producer {} to topic {}", producerName, topicName); + exception.set(null); + } catch (Exception e) { + LOG.error("Error sending message to kafka with producer {}: {}", producerName, e.toString()); + errorCounter.increment(); + exception.set(e); + } + } + + public void close() { + producer.close(); + } + + public Health checkHealth() { + return exception.get() == null ? Health.down().withException(exception.get()).build(): Health.up().build(); + } +} diff --git a/deployment/siembol-monitoring/src/main/java/uk/co/gresearch/siembol/deployment/monitoring/heartbeat/HeartbeatProducerProperties.java b/deployment/siembol-monitoring/src/main/java/uk/co/gresearch/siembol/deployment/monitoring/heartbeat/HeartbeatProducerProperties.java new file mode 100644 index 00000000..e10183ca --- /dev/null +++ b/deployment/siembol-monitoring/src/main/java/uk/co/gresearch/siembol/deployment/monitoring/heartbeat/HeartbeatProducerProperties.java @@ -0,0 +1,27 @@ +package uk.co.gresearch.siembol.deployment.monitoring.heartbeat; + +import java.util.Map; +import java.util.Properties; + +public class HeartbeatProducerProperties { + private String outputTopic; + private Map kafkaProperties; + + public String getOutputTopic() { + return outputTopic; + } + + public void setOutputTopic(String outputTopic) { + this.outputTopic = outputTopic; + } + + public Properties getKafkaProperties() { + var props = new Properties(); + props.putAll(kafkaProperties); + return props; + } + + public void setKafkaProperties(Map kafkaProperties) { + this.kafkaProperties = kafkaProperties; + } +} diff --git a/deployment/siembol-monitoring/src/main/java/uk/co/gresearch/siembol/deployment/monitoring/heartbeat/HeartbeatProducerScheduler.java b/deployment/siembol-monitoring/src/main/java/uk/co/gresearch/siembol/deployment/monitoring/heartbeat/HeartbeatProducerScheduler.java new file mode 100644 index 00000000..7c14de12 --- /dev/null +++ b/deployment/siembol-monitoring/src/main/java/uk/co/gresearch/siembol/deployment/monitoring/heartbeat/HeartbeatProducerScheduler.java @@ -0,0 +1,70 @@ +package uk.co.gresearch.siembol.deployment.monitoring.heartbeat; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.actuate.health.Health; +import org.springframework.boot.actuate.health.Status; +import uk.co.gresearch.siembol.common.metrics.SiembolMetricsRegistrar; + +import java.io.Closeable; +import java.lang.invoke.MethodHandles; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.function.BiFunction; + +public class HeartbeatProducerScheduler implements Closeable { + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private final Map producerMap = new HashMap<>(); + private final int errorThreshold; + + public HeartbeatProducerScheduler(HeartbeatProperties properties, + SiembolMetricsRegistrar metricsRegistrar) { + this(properties, Executors.newSingleThreadScheduledExecutor(), + (HeartbeatProducerProperties x, String y) -> + new HeartbeatProducer(x, y, properties.getMessage(), metricsRegistrar)); + } + + HeartbeatProducerScheduler(HeartbeatProperties properties, + ScheduledExecutorService executorService, + BiFunction factory) { + this.errorThreshold = properties.getHeartbeatProducers().size(); + for (Map.Entry producerProperties : properties.getHeartbeatProducers().entrySet()) { + var producerName = producerProperties.getKey(); + var producer = createHeartbeatProducer(producerProperties.getValue(), producerName, factory); + producerMap.put(producerName, producer); + executorService.scheduleAtFixedRate( + producer::sendHeartbeat, + properties.getHeartbeatIntervalSeconds(), + properties.getHeartbeatIntervalSeconds(), + TimeUnit.SECONDS); + } + } + + public static HeartbeatProducer createHeartbeatProducer(HeartbeatProducerProperties properties, + String producerName, + BiFunction factory) { + LOG.info("Initialising producer {}", producerName); + var producer = factory.apply(properties, producerName); + LOG.info("Finished initialising heartbeat producer {}", producerName); + return producer; + } + + public Health checkHealth() { + var countErrors = 0; + for (var producer: this.producerMap.values()) { + if (producer.checkHealth().getStatus() == Status.DOWN) { + countErrors ++; + } + } + return countErrors >= errorThreshold? Health.down().build(): Health.up().build(); + } + + public void close() { + for (var producer: this.producerMap.values()) { + producer.close(); + } + } +} diff --git a/deployment/siembol-monitoring/src/main/java/uk/co/gresearch/siembol/deployment/monitoring/heartbeat/HeartbeatProperties.java b/deployment/siembol-monitoring/src/main/java/uk/co/gresearch/siembol/deployment/monitoring/heartbeat/HeartbeatProperties.java new file mode 100644 index 00000000..53d81d61 --- /dev/null +++ b/deployment/siembol-monitoring/src/main/java/uk/co/gresearch/siembol/deployment/monitoring/heartbeat/HeartbeatProperties.java @@ -0,0 +1,48 @@ +package uk.co.gresearch.siembol.deployment.monitoring.heartbeat; + +import org.springframework.boot.context.properties.ConfigurationProperties; + +import java.util.Map; + +@ConfigurationProperties(prefix = "heartbeat-properties") +public class HeartbeatProperties { + private int heartbeatIntervalSeconds = 60; + + private Map heartbeatProducers; + + private HeartbeatConsumerProperties heartbeatConsumer; + + private Map message; + + public int getHeartbeatIntervalSeconds() { + return heartbeatIntervalSeconds; + } + + public void setHeartbeatIntervalSeconds(int heartbeatIntervalSeconds) { + this.heartbeatIntervalSeconds = heartbeatIntervalSeconds; + } + + public Map getHeartbeatProducers() { + return heartbeatProducers; + } + + public void setHeartbeatProducers(Map heartbeatProducers) { + this.heartbeatProducers = heartbeatProducers; + } + + public HeartbeatConsumerProperties getHeartbeatConsumer() { + return heartbeatConsumer; + } + + public void setHeartbeatConsumer(HeartbeatConsumerProperties heartbeatConsumer) { + this.heartbeatConsumer = heartbeatConsumer; + } + + public Map getMessage() { + return message; + } + + public void setMessage(Map message) { + this.message = message; + } +} diff --git a/deployment/siembol-monitoring/src/test/java/uk/co/gresearch/siembol/deployment/monitoring/heartbeat/HeartbeatConsumerTest.java b/deployment/siembol-monitoring/src/test/java/uk/co/gresearch/siembol/deployment/monitoring/heartbeat/HeartbeatConsumerTest.java new file mode 100644 index 00000000..b54d0270 --- /dev/null +++ b/deployment/siembol-monitoring/src/test/java/uk/co/gresearch/siembol/deployment/monitoring/heartbeat/HeartbeatConsumerTest.java @@ -0,0 +1,123 @@ +package uk.co.gresearch.siembol.deployment.monitoring.heartbeat; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.TestInputTopic; +import org.apache.kafka.streams.TopologyTestDriver; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.MockedStatic; +import org.mockito.Mockito; +import uk.co.gresearch.siembol.common.constants.ServiceType; +import uk.co.gresearch.siembol.common.metrics.SiembolMetrics; +import uk.co.gresearch.siembol.common.metrics.test.SiembolMetricsTestRegistrar; +import uk.co.gresearch.siembol.common.testing.TestingDriverKafkaStreamsFactory; + +import java.time.Instant; +import java.util.Arrays; +import java.util.HashMap; + +public class HeartbeatConsumerTest { + private final String heartbeatMessageStr = """ + { + "timestamp": 1654095527000, + "siembol_parsing_ts": 1654095527001, + "siembol_enriching_ts": 1654095527020, + "siembol_response_ts": 1654095527031, + "siembol_heartbeat": true, + "source_type": "heartbeat", + "producer_name": "p1", + "event_time": "2022-06-01T14:58:47.000Z" + } + """; + private final String heartbeatMessageWithoutEnrichmentStr = """ + { + "timestamp": 1654095527000, + "siembol_parsing_ts": 1654095527001, + "siembol_response_ts": 1654095527031, + "siembol_heartbeat": true, + "source_type": "heartbeat", + "producer_name": "p1", + "event_time": "2022-06-01T14:58:47.000Z" + } + """; + private final String inputTopic = "input"; + private SiembolMetricsTestRegistrar metricsTestRegistrar; + private KafkaStreams kafkaStreams; + private TestingDriverKafkaStreamsFactory streamsFactory; + private TopologyTestDriver testDriver; + private TestInputTopic testInputTopic; + private MockedStatic mockInstant; + private HeartbeatConsumerProperties properties; + + @Before + public void setUp() { + metricsTestRegistrar = new SiembolMetricsTestRegistrar(); + kafkaStreams = Mockito.mock(KafkaStreams.class); + streamsFactory = new TestingDriverKafkaStreamsFactory(kafkaStreams); + + var instant = Instant.parse("2022-06-01T14:58:47.823Z"); + mockInstant = Mockito.mockStatic(Instant.class); + mockInstant.when(Instant::now).thenReturn(instant); + + properties = new HeartbeatConsumerProperties(); + properties.setInputTopic(inputTopic); + properties.setKafkaProperties(new HashMap<>()); + properties.setEnabledServices(Arrays.asList(ServiceType.PARSING_APP, ServiceType.ENRICHMENT, + ServiceType.RESPONSE)); + } + + @After + public void tearDown() { + streamsFactory.close(); + mockInstant.close(); + } + + @Test + public void processingMessageOk() { + new HeartbeatConsumer(properties, metricsTestRegistrar, streamsFactory); + testDriver = streamsFactory.getTestDriver(); + testInputTopic = testDriver.createInputTopic(inputTopic, Serdes.String().serializer(), + Serdes.String().serializer()); + testInputTopic.pipeInput(heartbeatMessageStr); + Assert.assertEquals(1, + metricsTestRegistrar.getGaugeValue(SiembolMetrics.HEARTBEAT_LATENCY_PARSING_MS.name()), 0); + Assert.assertEquals(19, + metricsTestRegistrar.getGaugeValue(SiembolMetrics.HEARTBEAT_LATENCY_ENRICHING_MS.name()), 0); + Assert.assertEquals(11, + metricsTestRegistrar.getGaugeValue(SiembolMetrics.HEARTBEAT_LATENCY_RESPONDING_MS.name()), 0); + Assert.assertEquals(823, + metricsTestRegistrar.getGaugeValue(SiembolMetrics.HEARTBEAT_LATENCY_TOTAL_MS.name()), 0); + Assert.assertEquals(1, metricsTestRegistrar.getCounterValue(SiembolMetrics.HEARTBEAT_MESSAGES_READ.name())); + } + + @Test + public void processingError() { + new HeartbeatConsumer(properties, metricsTestRegistrar, streamsFactory); + testDriver = streamsFactory.getTestDriver(); + testInputTopic = testDriver.createInputTopic(inputTopic, Serdes.String().serializer(), + Serdes.String().serializer()); + testInputTopic.pipeInput("test"); + Assert.assertEquals(1, metricsTestRegistrar.getCounterValue(SiembolMetrics.HEARTBEAT_CONSUMER_ERROR.name())); + } + + @Test + public void withoutEnrichmentService() { + properties.setEnabledServices(Arrays.asList(ServiceType.PARSING_APP, + ServiceType.RESPONSE)); + new HeartbeatConsumer(properties, metricsTestRegistrar, streamsFactory); + testDriver = streamsFactory.getTestDriver(); + testInputTopic = testDriver.createInputTopic(inputTopic, Serdes.String().serializer(), + Serdes.String().serializer()); + testInputTopic.pipeInput(heartbeatMessageWithoutEnrichmentStr); + + Assert.assertEquals(1, + metricsTestRegistrar.getGaugeValue(SiembolMetrics.HEARTBEAT_LATENCY_PARSING_MS.name()), 0); + Assert.assertEquals(30, + metricsTestRegistrar.getGaugeValue(SiembolMetrics.HEARTBEAT_LATENCY_RESPONDING_MS.name()), 0); + Assert.assertEquals(823, + metricsTestRegistrar.getGaugeValue(SiembolMetrics.HEARTBEAT_LATENCY_TOTAL_MS.name()), 0); + } +} diff --git a/deployment/siembol-monitoring/src/test/java/uk/co/gresearch/siembol/deployment/monitoring/heartbeat/HeartbeatProducerSchedulerTest.java b/deployment/siembol-monitoring/src/test/java/uk/co/gresearch/siembol/deployment/monitoring/heartbeat/HeartbeatProducerSchedulerTest.java new file mode 100644 index 00000000..519fa701 --- /dev/null +++ b/deployment/siembol-monitoring/src/test/java/uk/co/gresearch/siembol/deployment/monitoring/heartbeat/HeartbeatProducerSchedulerTest.java @@ -0,0 +1,89 @@ +package uk.co.gresearch.siembol.deployment.monitoring.heartbeat; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; +import org.springframework.boot.actuate.health.Health; + +import java.util.*; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.BiFunction; + +import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.*; + +public class HeartbeatProducerSchedulerTest { + private ScheduledExecutorService mockScheduledService; + private final Map producerPropertiesMap= new HashMap<>(); + private final Map heartbeatMessageProperties = new HashMap<>(); + private final int heartbeatIntervalSeconds = 10; + private HeartbeatProducer heartbeatProducer; + private final HeartbeatProperties properties = new HeartbeatProperties(); + private BiFunction factory; + + @Before + public void setUp() { + heartbeatProducer = Mockito.mock(HeartbeatProducer.class); + factory = Mockito.mock(BiFunction.class); + doNothing().when(heartbeatProducer).sendHeartbeat(); + when(factory.apply(any(HeartbeatProducerProperties.class), anyString())).thenReturn(heartbeatProducer); + mockScheduledService = mock(ScheduledExecutorService.class); + given(mockScheduledService.scheduleAtFixedRate(any(Runnable.class), anyLong(), anyLong(), + any(TimeUnit.class))).willReturn(mock(ScheduledFuture.class)); + + var heartbeatProducerProperties1 = new HeartbeatProducerProperties(); + var heartbeatProducerProperties2 = new HeartbeatProducerProperties(); + producerPropertiesMap.put("p1", heartbeatProducerProperties1); + producerPropertiesMap.put("p2", heartbeatProducerProperties2); + properties.setHeartbeatProducers(producerPropertiesMap); + properties.setMessage(heartbeatMessageProperties); + properties.setHeartbeatIntervalSeconds(heartbeatIntervalSeconds); + + } + + @Test + public void ok() { + new HeartbeatProducerScheduler(properties, mockScheduledService, factory); + ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(Runnable.class); + ArgumentCaptor producerPropertiesCaptor = + ArgumentCaptor.forClass(HeartbeatProducerProperties.class); + ArgumentCaptor producerNameCaptor = ArgumentCaptor.forClass(String.class); + verify(mockScheduledService, times(2)).scheduleAtFixedRate( + argumentCaptor.capture(), + eq(Long.valueOf(heartbeatIntervalSeconds)), + eq(Long.valueOf(heartbeatIntervalSeconds)), + any(TimeUnit.class)); + argumentCaptor.getAllValues().get(0).run(); + argumentCaptor.getAllValues().get(1).run(); + verify(heartbeatProducer, times(2)).sendHeartbeat(); + verify(factory, times(2)).apply(producerPropertiesCaptor.capture(), producerNameCaptor.capture()); + List producerPropertiesList = producerPropertiesCaptor.getAllValues(); + List producerNameList = producerNameCaptor.getAllValues(); + + assertEquals(Arrays.asList("p1", "p2"), producerNameList); + assertEquals(new ArrayList(producerPropertiesMap.values()), producerPropertiesList); + } + + @Test + public void checkHealthUp() { + var heartbeatProducerScheduler = new HeartbeatProducerScheduler(properties, mockScheduledService, + (x, y) -> heartbeatProducer); + when(heartbeatProducer.checkHealth()).thenReturn(Health.up().build(), Health.down().build()); + assertEquals(heartbeatProducerScheduler.checkHealth(), Health.up().build()); + verify(heartbeatProducer, times(2)).checkHealth(); + } + + @Test + public void checkHealthDown() { + var heartbeatProducerScheduler = new HeartbeatProducerScheduler(properties, mockScheduledService, (x, y) -> heartbeatProducer); + when(heartbeatProducer.checkHealth()).thenReturn(Health.down().build()); + assertEquals(heartbeatProducerScheduler.checkHealth(), Health.down().build()); + verify(heartbeatProducer, times(2)).checkHealth(); + } +} diff --git a/deployment/siembol-monitoring/src/test/java/uk/co/gresearch/siembol/deployment/monitoring/heartbeat/HeartbeatProducerTest.java b/deployment/siembol-monitoring/src/test/java/uk/co/gresearch/siembol/deployment/monitoring/heartbeat/HeartbeatProducerTest.java new file mode 100644 index 00000000..224fa935 --- /dev/null +++ b/deployment/siembol-monitoring/src/test/java/uk/co/gresearch/siembol/deployment/monitoring/heartbeat/HeartbeatProducerTest.java @@ -0,0 +1,71 @@ +package uk.co.gresearch.siembol.deployment.monitoring.heartbeat; + +import org.apache.kafka.clients.producer.MockProducer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import uk.co.gresearch.siembol.common.metrics.SiembolMetrics; +import uk.co.gresearch.siembol.common.metrics.test.SiembolMetricsTestRegistrar; + +import java.time.Instant; +import java.util.HashMap; +import java.util.Map; +import org.mockito.MockedStatic; + +public class HeartbeatProducerTest { + private SiembolMetricsTestRegistrar metricsTestRegistrar; + private final Map heartbeatMessageProperties = new HashMap<>(); + private MockedStatic mockInstant; + + @Before + public void setUp() { + metricsTestRegistrar = new SiembolMetricsTestRegistrar(); + var instant = Instant.parse("2022-05-31T09:10:11.50Z"); + mockInstant = Mockito.mockStatic(Instant.class); + mockInstant.when(Instant::now).thenReturn(instant); + heartbeatMessageProperties.put("key", "value"); + } + + @After + public void tearDown() { + mockInstant.close(); + } + + @Test + public void sendHeartbeatOk() { + var producerProperties = new HeartbeatProducerProperties(); + producerProperties.setOutputTopic("heartbeat"); + var producer = new MockProducer<>(true, new StringSerializer(), new StringSerializer()); + var heartbeatProducer = new HeartbeatProducer(producerProperties, "p", heartbeatMessageProperties, + metricsTestRegistrar, x -> producer); + heartbeatProducer.sendHeartbeat(); + Assert.assertEquals(producer.history().size(), 1); + Assert.assertEquals(producer.history().get(0).topic(), "heartbeat"); + Assert.assertEquals(producer.history().get(0).value(), "{\"event_time\":\"2022-05-31T09:10:11.500Z\"," + + "\"siembol_heartbeat\":true," + + "\"producer_name\":\"p\",\"key\":\"value\"}"); + heartbeatProducer.sendHeartbeat(); + Assert.assertEquals(2, + metricsTestRegistrar.getCounterValue(SiembolMetrics.HEARTBEAT_MESSAGES_SENT.getMetricName("p"))); + } + + @Test + public void sendHeartbeatError() { + var producer = new MockProducer<>(true, new StringSerializer(), new StringSerializer()); + var producerProperties = new HeartbeatProducerProperties(); + var heartbeatProducer = new HeartbeatProducer( + producerProperties, + "p", + heartbeatMessageProperties, + metricsTestRegistrar, + x -> producer); + + heartbeatProducer.sendHeartbeat(); + Assert.assertEquals(1, + metricsTestRegistrar.getCounterValue(SiembolMetrics.HEARTBEAT_PRODUCER_ERROR.getMetricName("p"))); + } + +} diff --git a/deployment/storm-topology-manager/pom.xml b/deployment/storm-topology-manager/pom.xml index 7f147b29..55dec412 100644 --- a/deployment/storm-topology-manager/pom.xml +++ b/deployment/storm-topology-manager/pom.xml @@ -9,7 +9,7 @@ uk.co.gresearch.siembol siembol - 2.5.11-SNAPSHOT + 2.5.12-SNAPSHOT ../../pom.xml @@ -43,7 +43,7 @@ uk.co.gresearch.siembol siembol-common - 2.5.11-SNAPSHOT + 2.5.12-SNAPSHOT org.slf4j diff --git a/enriching/enriching-core/pom.xml b/enriching/enriching-core/pom.xml index a335f984..7ab6e9de 100644 --- a/enriching/enriching-core/pom.xml +++ b/enriching/enriching-core/pom.xml @@ -11,7 +11,7 @@ uk.co.gresearch.siembol enriching - 2.5.11-SNAPSHOT + 2.5.12-SNAPSHOT @@ -35,12 +35,12 @@ uk.co.gresearch.siembol siembol-common - 2.5.11-SNAPSHOT + 2.5.12-SNAPSHOT uk.co.gresearch.siembol alerting-core - 2.5.11-SNAPSHOT + 2.5.12-SNAPSHOT junit diff --git a/enriching/enriching-storm/pom.xml b/enriching/enriching-storm/pom.xml index 2ce1e5c3..cd7fc5ad 100644 --- a/enriching/enriching-storm/pom.xml +++ b/enriching/enriching-storm/pom.xml @@ -9,7 +9,7 @@ uk.co.gresearch.siembol enriching - 2.5.11-SNAPSHOT + 2.5.12-SNAPSHOT @@ -75,7 +75,7 @@ uk.co.gresearch.siembol enriching-core - 2.5.11-SNAPSHOT + 2.5.12-SNAPSHOT org.slf4j diff --git a/enriching/pom.xml b/enriching/pom.xml index 01191bfd..3472490b 100644 --- a/enriching/pom.xml +++ b/enriching/pom.xml @@ -11,7 +11,7 @@ uk.co.gresearch.siembol siembol - 2.5.11-SNAPSHOT + 2.5.12-SNAPSHOT enriching-core diff --git a/parsing/parsing-app/pom.xml b/parsing/parsing-app/pom.xml index e4024119..f845e84a 100644 --- a/parsing/parsing-app/pom.xml +++ b/parsing/parsing-app/pom.xml @@ -11,7 +11,7 @@ uk.co.gresearch.siembol parsing - 2.5.11-SNAPSHOT + 2.5.12-SNAPSHOT @@ -39,12 +39,12 @@ uk.co.gresearch.siembol siembol-common - 2.5.11-SNAPSHOT + 2.5.12-SNAPSHOT uk.co.gresearch.siembol parsing-core - 2.5.11-SNAPSHOT + 2.5.12-SNAPSHOT junit diff --git a/parsing/parsing-core/pom.xml b/parsing/parsing-core/pom.xml index 9cac0b84..0e5f7a64 100644 --- a/parsing/parsing-core/pom.xml +++ b/parsing/parsing-core/pom.xml @@ -11,7 +11,7 @@ uk.co.gresearch.siembol parsing - 2.5.11-SNAPSHOT + 2.5.12-SNAPSHOT @@ -50,7 +50,7 @@ uk.co.gresearch.siembol siembol-common - 2.5.11-SNAPSHOT + 2.5.12-SNAPSHOT joda-time diff --git a/parsing/parsing-storm/pom.xml b/parsing/parsing-storm/pom.xml index 97015f93..d4520b99 100644 --- a/parsing/parsing-storm/pom.xml +++ b/parsing/parsing-storm/pom.xml @@ -9,7 +9,7 @@ uk.co.gresearch.siembol parsing - 2.5.11-SNAPSHOT + 2.5.12-SNAPSHOT @@ -75,7 +75,7 @@ uk.co.gresearch.siembol parsing-app - 2.5.11-SNAPSHOT + 2.5.12-SNAPSHOT org.slf4j diff --git a/parsing/pom.xml b/parsing/pom.xml index 9b1dece7..36808b3e 100644 --- a/parsing/pom.xml +++ b/parsing/pom.xml @@ -11,7 +11,7 @@ uk.co.gresearch.siembol siembol - 2.5.11-SNAPSHOT + 2.5.12-SNAPSHOT parsing-core diff --git a/pom.xml b/pom.xml index cc2b6a0c..4a3f1c06 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ uk.co.gresearch.siembol siembol siembol - 2.5.11-SNAPSHOT + 2.5.12-SNAPSHOT A scalable, advanced security analytics framework based on open-source big data technologies. 2019 https://siembol.io/ @@ -78,6 +78,7 @@ responding config-editor deployment/storm-topology-manager + deployment/siembol-monitoring pom diff --git a/responding/pom.xml b/responding/pom.xml index 14b70611..d8f526bf 100644 --- a/responding/pom.xml +++ b/responding/pom.xml @@ -11,7 +11,7 @@ uk.co.gresearch.siembol siembol - 2.5.11-SNAPSHOT + 2.5.12-SNAPSHOT responding-core diff --git a/responding/responding-core/pom.xml b/responding/responding-core/pom.xml index c5875ac0..dac23bb3 100644 --- a/responding/responding-core/pom.xml +++ b/responding/responding-core/pom.xml @@ -11,7 +11,7 @@ uk.co.gresearch.siembol responding - 2.5.11-SNAPSHOT + 2.5.12-SNAPSHOT @@ -35,12 +35,12 @@ uk.co.gresearch.siembol siembol-common - 2.5.11-SNAPSHOT + 2.5.12-SNAPSHOT uk.co.gresearch.siembol alerting-core - 2.5.11-SNAPSHOT + 2.5.12-SNAPSHOT com.jayway.jsonpath diff --git a/responding/responding-stream/pom.xml b/responding/responding-stream/pom.xml index cf57babf..12e9c83a 100644 --- a/responding/responding-stream/pom.xml +++ b/responding/responding-stream/pom.xml @@ -9,7 +9,7 @@ uk.co.gresearch.siembol responding - 2.5.11-SNAPSHOT + 2.5.12-SNAPSHOT @@ -51,7 +51,7 @@ uk.co.gresearch.siembol siembol-common - 2.5.11-SNAPSHOT + 2.5.12-SNAPSHOT org.slf4j @@ -62,7 +62,7 @@ uk.co.gresearch.siembol responding-core - 2.5.11-SNAPSHOT + 2.5.12-SNAPSHOT org.apache.kafka diff --git a/responding/responding-stream/src/main/java/uk/co/gresearch/siembol/response/stream/ruleservice/KafkaStreamRulesService.java b/responding/responding-stream/src/main/java/uk/co/gresearch/siembol/response/stream/ruleservice/KafkaStreamRulesService.java index b67a3a7b..73adb8cd 100644 --- a/responding/responding-stream/src/main/java/uk/co/gresearch/siembol/response/stream/ruleservice/KafkaStreamRulesService.java +++ b/responding/responding-stream/src/main/java/uk/co/gresearch/siembol/response/stream/ruleservice/KafkaStreamRulesService.java @@ -9,6 +9,8 @@ import reactor.core.publisher.Mono; import uk.co.gresearch.siembol.common.constants.SiembolMessageFields; import uk.co.gresearch.siembol.common.error.ErrorMessage; import uk.co.gresearch.siembol.common.error.ErrorType; +import uk.co.gresearch.siembol.common.utils.KafkaStreamsFactory; +import uk.co.gresearch.siembol.common.utils.KafkaStreamsFactoryImpl; import uk.co.gresearch.siembol.response.stream.rest.application.ResponseConfigurationProperties; import uk.co.gresearch.siembol.response.common.RespondingResult; import uk.co.gresearch.siembol.response.common.RespondingResultAttributes; diff --git a/responding/responding-stream/src/test/java/uk/co/gresearch/siembol/response/stream/ruleservice/KafkaStreamRuleServiceTest.java b/responding/responding-stream/src/test/java/uk/co/gresearch/siembol/response/stream/ruleservice/KafkaStreamRuleServiceTest.java index ca69847d..7fb28f35 100644 --- a/responding/responding-stream/src/test/java/uk/co/gresearch/siembol/response/stream/ruleservice/KafkaStreamRuleServiceTest.java +++ b/responding/responding-stream/src/test/java/uk/co/gresearch/siembol/response/stream/ruleservice/KafkaStreamRuleServiceTest.java @@ -15,6 +15,7 @@ import org.springframework.boot.actuate.health.Status; import reactor.core.publisher.Mono; import uk.co.gresearch.siembol.common.error.ErrorMessage; import uk.co.gresearch.siembol.common.error.ErrorType; +import uk.co.gresearch.siembol.common.testing.TestingDriverKafkaStreamsFactory; import uk.co.gresearch.siembol.response.stream.rest.application.ResponseConfigurationProperties; import uk.co.gresearch.siembol.response.common.RespondingResult; import uk.co.gresearch.siembol.response.common.RespondingResultAttributes; diff --git a/siembol-common/pom.xml b/siembol-common/pom.xml index 2b1c5e2e..e74b7615 100644 --- a/siembol-common/pom.xml +++ b/siembol-common/pom.xml @@ -9,7 +9,7 @@ uk.co.gresearch.siembol siembol - 2.5.11-SNAPSHOT + 2.5.12-SNAPSHOT @@ -190,6 +190,16 @@ ${io_micrometer_version} provided + + org.apache.kafka + kafka-streams + ${kafka_version} + + + org.apache.kafka + kafka-streams-test-utils + ${kafka_version} + diff --git a/config-editor/config-editor-core/src/main/java/uk/co/gresearch/siembol/configeditor/common/ServiceType.java b/siembol-common/src/main/java/uk/co/gresearch/siembol/common/constants/ServiceType.java similarity index 94% rename from config-editor/config-editor-core/src/main/java/uk/co/gresearch/siembol/configeditor/common/ServiceType.java rename to siembol-common/src/main/java/uk/co/gresearch/siembol/common/constants/ServiceType.java index 5ac3beb7..a9f2fd08 100644 --- a/config-editor/config-editor-core/src/main/java/uk/co/gresearch/siembol/configeditor/common/ServiceType.java +++ b/siembol-common/src/main/java/uk/co/gresearch/siembol/common/constants/ServiceType.java @@ -1,4 +1,4 @@ -package uk.co.gresearch.siembol.configeditor.common; +package uk.co.gresearch.siembol.common.constants; public enum ServiceType { RESPONSE("response"), diff --git a/siembol-common/src/main/java/uk/co/gresearch/siembol/common/constants/SiembolConstants.java b/siembol-common/src/main/java/uk/co/gresearch/siembol/common/constants/SiembolConstants.java index d36378b2..8a5bafc0 100644 --- a/siembol-common/src/main/java/uk/co/gresearch/siembol/common/constants/SiembolConstants.java +++ b/siembol-common/src/main/java/uk/co/gresearch/siembol/common/constants/SiembolConstants.java @@ -2,4 +2,16 @@ package uk.co.gresearch.siembol.common.constants; public class SiembolConstants { public static final int MAX_SIZE_CONFIG_UPDATE_LOG = 100; + public static final String TIMESTAMP = "timestamp"; + public static final String PARSING_TIME = "siembol_parsing_ts"; + public static final String ENRICHING_TIME = "siembol_enriching_ts"; + public static final String RESPONSE_TIME = "siembol_response_ts"; + public static final String SRC_ADDR = "ip_src_addr"; + public static final String SRC_PORT = "ip_src_port"; + public static final String DST_ADDR = "ip_dst_addr"; + public static final String DST_PORT = "ip_dst_port"; + public static final String PROTOCOL = "protocol"; + public static final String ORIGINAL = "original_string"; + public static final String GUID = "guid"; + public static final String SENSOR_TYPE = "source_type"; } diff --git a/siembol-common/src/main/java/uk/co/gresearch/siembol/common/constants/SiembolMessageFields.java b/siembol-common/src/main/java/uk/co/gresearch/siembol/common/constants/SiembolMessageFields.java index 78190c64..11e11809 100644 --- a/siembol-common/src/main/java/uk/co/gresearch/siembol/common/constants/SiembolMessageFields.java +++ b/siembol-common/src/main/java/uk/co/gresearch/siembol/common/constants/SiembolMessageFields.java @@ -6,18 +6,18 @@ import java.util.Set; import java.util.stream.Collectors; public enum SiembolMessageFields { - SRC_ADDR("ip_src_addr"), - SRC_PORT("ip_src_port"), - DST_ADDR("ip_dst_addr"), - DST_PORT("ip_dst_port"), - PROTOCOL("protocol"), - TIMESTAMP("timestamp"), - ORIGINAL("original_string"), - GUID("guid"), - SENSOR_TYPE("source_type"), - PARSING_TIME("siembol_parsing_ts"), - ENRICHING_TIME("siembol_enriching_ts"), - RESPONSE_TIME("siembol_response_ts"); + SRC_ADDR(SiembolConstants.SRC_ADDR), + SRC_PORT(SiembolConstants.SRC_PORT), + DST_ADDR(SiembolConstants.DST_ADDR), + DST_PORT(SiembolConstants.DST_PORT), + PROTOCOL(SiembolConstants.PROTOCOL), + ORIGINAL(SiembolConstants.ORIGINAL), + GUID(SiembolConstants.GUID), + SENSOR_TYPE(SiembolConstants.SENSOR_TYPE), + TIMESTAMP(SiembolConstants.TIMESTAMP), + PARSING_TIME(SiembolConstants.PARSING_TIME), + ENRICHING_TIME(SiembolConstants.ENRICHING_TIME), + RESPONSE_TIME(SiembolConstants.RESPONSE_TIME); private final String name; SiembolMessageFields(String name) { diff --git a/siembol-common/src/main/java/uk/co/gresearch/siembol/common/metrics/SiembolMetrics.java b/siembol-common/src/main/java/uk/co/gresearch/siembol/common/metrics/SiembolMetrics.java index 8cd24b29..51f19a95 100644 --- a/siembol-common/src/main/java/uk/co/gresearch/siembol/common/metrics/SiembolMetrics.java +++ b/siembol-common/src/main/java/uk/co/gresearch/siembol/common/metrics/SiembolMetrics.java @@ -54,7 +54,16 @@ public enum SiembolMetrics { SIEMBOL_SYNC_ADMIN_CONFIG("siembol_counter_sync_admin_config_%s"), SIEMBOL_SYNC_RULES_VERSION("siembol_counter_sync_config_version_%s"), CONFIG_EDITOR_REST_RELEASE_PR_SERVICE("siembol_counter_release_pr_%s"), - CONFIG_EDITOR_REST_ADMIN_CONFIG_PR_SERVICE("siembol_counter_admin_counfig_pr_%s"); + CONFIG_EDITOR_REST_ADMIN_CONFIG_PR_SERVICE("siembol_counter_admin_counfig_pr_%s"), + + HEARTBEAT_MESSAGES_SENT("siembol_counter_hearbeat_messages_sent_%s"), + HEARTBEAT_MESSAGES_READ("siembol_counter_hearbeat_messages_read"), + HEARTBEAT_LATENCY_PARSING_MS("siembol_gauge_hearbeat_latency_ms_parsing"), + HEARTBEAT_LATENCY_ENRICHING_MS("siembol_gauge_hearbeat_latency_ms_enriching"), + HEARTBEAT_LATENCY_RESPONDING_MS("siembol_gauge_hearbeat_latency_ms_responding"), + HEARTBEAT_LATENCY_TOTAL_MS("siembol_gauge_hearbeat_latency_ms_total"), + HEARTBEAT_PRODUCER_ERROR("siembol_counter_heartbeat_producer_error_%s"), + HEARTBEAT_CONSUMER_ERROR("siembol_counter_heartbeat_consumer_error"); private final String formatStringName; diff --git a/siembol-common/src/main/java/uk/co/gresearch/siembol/common/metrics/test/SiembolMetricsTestRegistrar.java b/siembol-common/src/main/java/uk/co/gresearch/siembol/common/metrics/test/SiembolMetricsTestRegistrar.java index bd94ca57..09cb9d60 100644 --- a/siembol-common/src/main/java/uk/co/gresearch/siembol/common/metrics/test/SiembolMetricsTestRegistrar.java +++ b/siembol-common/src/main/java/uk/co/gresearch/siembol/common/metrics/test/SiembolMetricsTestRegistrar.java @@ -39,4 +39,11 @@ public class SiembolMetricsTestRegistrar implements SiembolMetricsRegistrar { } return countersMap.get(name).getValue(); } + + public double getGaugeValue(String name) { + if (!gaugesMap.containsKey(name)) { + throw new IllegalArgumentException(String.format(METRIC_DOES_NOT_EXIST_MSG, name)); + } + return gaugesMap.get(name).getValue(); + } } diff --git a/responding/responding-stream/src/test/java/uk/co/gresearch/siembol/response/stream/ruleservice/TestingDriverKafkaStreamsFactory.java b/siembol-common/src/main/java/uk/co/gresearch/siembol/common/testing/TestingDriverKafkaStreamsFactory.java similarity index 90% rename from responding/responding-stream/src/test/java/uk/co/gresearch/siembol/response/stream/ruleservice/TestingDriverKafkaStreamsFactory.java rename to siembol-common/src/main/java/uk/co/gresearch/siembol/common/testing/TestingDriverKafkaStreamsFactory.java index b2ea6ac1..ef6a7ad3 100644 --- a/responding/responding-stream/src/test/java/uk/co/gresearch/siembol/response/stream/ruleservice/TestingDriverKafkaStreamsFactory.java +++ b/siembol-common/src/main/java/uk/co/gresearch/siembol/common/testing/TestingDriverKafkaStreamsFactory.java @@ -1,8 +1,9 @@ -package uk.co.gresearch.siembol.response.stream.ruleservice; +package uk.co.gresearch.siembol.common.testing; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.TopologyTestDriver; +import uk.co.gresearch.siembol.common.utils.KafkaStreamsFactory; import java.io.Closeable; import java.util.Properties; diff --git a/responding/responding-stream/src/main/java/uk/co/gresearch/siembol/response/stream/ruleservice/KafkaStreamsFactory.java b/siembol-common/src/main/java/uk/co/gresearch/siembol/common/utils/KafkaStreamsFactory.java similarity index 79% rename from responding/responding-stream/src/main/java/uk/co/gresearch/siembol/response/stream/ruleservice/KafkaStreamsFactory.java rename to siembol-common/src/main/java/uk/co/gresearch/siembol/common/utils/KafkaStreamsFactory.java index a4d654f4..20f048ad 100644 --- a/responding/responding-stream/src/main/java/uk/co/gresearch/siembol/response/stream/ruleservice/KafkaStreamsFactory.java +++ b/siembol-common/src/main/java/uk/co/gresearch/siembol/common/utils/KafkaStreamsFactory.java @@ -1,4 +1,4 @@ -package uk.co.gresearch.siembol.response.stream.ruleservice; +package uk.co.gresearch.siembol.common.utils; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.Topology; @@ -8,3 +8,4 @@ import java.util.Properties; public interface KafkaStreamsFactory { KafkaStreams createKafkaStreams(Topology topology, Properties properties); } + diff --git a/responding/responding-stream/src/main/java/uk/co/gresearch/siembol/response/stream/ruleservice/KafkaStreamsFactoryImpl.java b/siembol-common/src/main/java/uk/co/gresearch/siembol/common/utils/KafkaStreamsFactoryImpl.java similarity index 94% rename from responding/responding-stream/src/main/java/uk/co/gresearch/siembol/response/stream/ruleservice/KafkaStreamsFactoryImpl.java rename to siembol-common/src/main/java/uk/co/gresearch/siembol/common/utils/KafkaStreamsFactoryImpl.java index bfbf69b2..c2e48efd 100644 --- a/responding/responding-stream/src/main/java/uk/co/gresearch/siembol/response/stream/ruleservice/KafkaStreamsFactoryImpl.java +++ b/siembol-common/src/main/java/uk/co/gresearch/siembol/common/utils/KafkaStreamsFactoryImpl.java @@ -1,4 +1,4 @@ -package uk.co.gresearch.siembol.response.stream.ruleservice; +package uk.co.gresearch.siembol.common.utils; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.Topology; @@ -9,6 +9,7 @@ import org.slf4j.LoggerFactory; import java.lang.invoke.MethodHandles; import java.util.Properties; + public class KafkaStreamsFactoryImpl implements KafkaStreamsFactory { private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private static final String UNCAUGHT_EXCEPTION = "Uncaught exception in siembol response kafka streams";