moving num workers into parsing application (#689)

fixing header reoutingapp with null headers
This commit is contained in:
Marian Novotny
2022-06-21 12:33:56 +01:00
committed by GitHub
parent 71830b8c43
commit 6111335983
30 changed files with 113 additions and 69 deletions

View File

@@ -11,7 +11,7 @@
<parent> <parent>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>alerting</artifactId> <artifactId>alerting</artifactId>
<version>2.5.13-SNAPSHOT</version> <version>2.5.14-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>
<dependency> <dependency>
@@ -35,7 +35,7 @@
<dependency> <dependency>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>siembol-common</artifactId> <artifactId>siembol-common</artifactId>
<version>2.5.13-SNAPSHOT</version> <version>2.5.14-SNAPSHOT</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>junit</groupId> <groupId>junit</groupId>

View File

@@ -11,7 +11,7 @@
<parent> <parent>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>alerting</artifactId> <artifactId>alerting</artifactId>
<version>2.5.13-SNAPSHOT</version> <version>2.5.14-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>
<dependency> <dependency>
@@ -23,7 +23,7 @@
<dependency> <dependency>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>alerting-core</artifactId> <artifactId>alerting-core</artifactId>
<version>2.5.13-SNAPSHOT</version> <version>2.5.14-SNAPSHOT</version>
<exclusions> <exclusions>
<exclusion> <exclusion>
<artifactId>jackson-databind</artifactId> <artifactId>jackson-databind</artifactId>

View File

@@ -9,7 +9,7 @@
<parent> <parent>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>alerting</artifactId> <artifactId>alerting</artifactId>
<version>2.5.13-SNAPSHOT</version> <version>2.5.14-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>
<dependency> <dependency>
@@ -51,7 +51,7 @@
<dependency> <dependency>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>alerting-core</artifactId> <artifactId>alerting-core</artifactId>
<version>2.5.13-SNAPSHOT</version> <version>2.5.14-SNAPSHOT</version>
<exclusions> <exclusions>
<exclusion> <exclusion>
<groupId>org.slf4j</groupId> <groupId>org.slf4j</groupId>

View File

@@ -11,7 +11,7 @@
<parent> <parent>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>siembol</artifactId> <artifactId>siembol</artifactId>
<version>2.5.13-SNAPSHOT</version> <version>2.5.14-SNAPSHOT</version>
</parent> </parent>
<modules> <modules>
<module>alerting-core</module> <module>alerting-core</module>

View File

@@ -9,13 +9,13 @@
<parent> <parent>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>config-editor</artifactId> <artifactId>config-editor</artifactId>
<version>2.5.13-SNAPSHOT</version> <version>2.5.14-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>
<dependency> <dependency>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>siembol-common</artifactId> <artifactId>siembol-common</artifactId>
<version>2.5.13-SNAPSHOT</version> <version>2.5.14-SNAPSHOT</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.commons</groupId> <groupId>org.apache.commons</groupId>

View File

@@ -9,7 +9,7 @@
<parent> <parent>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>config-editor</artifactId> <artifactId>config-editor</artifactId>
<version>2.5.13-SNAPSHOT</version> <version>2.5.14-SNAPSHOT</version>
</parent> </parent>
<dependencyManagement> <dependencyManagement>
<dependencies> <dependencies>
@@ -56,7 +56,7 @@
<dependency> <dependency>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>siembol-common</artifactId> <artifactId>siembol-common</artifactId>
<version>2.5.13-SNAPSHOT</version> <version>2.5.14-SNAPSHOT</version>
<exclusions> <exclusions>
<exclusion> <exclusion>
<groupId>org.slf4j</groupId> <groupId>org.slf4j</groupId>
@@ -67,22 +67,22 @@
<dependency> <dependency>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>config-editor-core</artifactId> <artifactId>config-editor-core</artifactId>
<version>2.5.13-SNAPSHOT</version> <version>2.5.14-SNAPSHOT</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>config-editor-services</artifactId> <artifactId>config-editor-services</artifactId>
<version>2.5.13-SNAPSHOT</version> <version>2.5.14-SNAPSHOT</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>config-editor-sync</artifactId> <artifactId>config-editor-sync</artifactId>
<version>2.5.13-SNAPSHOT</version> <version>2.5.14-SNAPSHOT</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>alerting-core</artifactId> <artifactId>alerting-core</artifactId>
<version>2.5.13-SNAPSHOT</version> <version>2.5.14-SNAPSHOT</version>
<exclusions> <exclusions>
<exclusion> <exclusion>
<groupId>org.slf4j</groupId> <groupId>org.slf4j</groupId>
@@ -93,7 +93,7 @@
<dependency> <dependency>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>parsing-app</artifactId> <artifactId>parsing-app</artifactId>
<version>2.5.13-SNAPSHOT</version> <version>2.5.14-SNAPSHOT</version>
<exclusions> <exclusions>
<exclusion> <exclusion>
<groupId>org.slf4j</groupId> <groupId>org.slf4j</groupId>
@@ -104,7 +104,7 @@
<dependency> <dependency>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>enriching-core</artifactId> <artifactId>enriching-core</artifactId>
<version>2.5.13-SNAPSHOT</version> <version>2.5.14-SNAPSHOT</version>
<exclusions> <exclusions>
<exclusion> <exclusion>
<groupId>org.slf4j</groupId> <groupId>org.slf4j</groupId>
@@ -115,7 +115,7 @@
<dependency> <dependency>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>responding-core</artifactId> <artifactId>responding-core</artifactId>
<version>2.5.13-SNAPSHOT</version> <version>2.5.14-SNAPSHOT</version>
<exclusions> <exclusions>
<exclusion> <exclusion>
<groupId>org.slf4j</groupId> <groupId>org.slf4j</groupId>

View File

@@ -10,7 +10,7 @@
<parent> <parent>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>config-editor</artifactId> <artifactId>config-editor</artifactId>
<version>2.5.13-SNAPSHOT</version> <version>2.5.14-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>
<dependency> <dependency>
@@ -41,32 +41,32 @@
<dependency> <dependency>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>siembol-common</artifactId> <artifactId>siembol-common</artifactId>
<version>2.5.13-SNAPSHOT</version> <version>2.5.14-SNAPSHOT</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>config-editor-core</artifactId> <artifactId>config-editor-core</artifactId>
<version>2.5.13-SNAPSHOT</version> <version>2.5.14-SNAPSHOT</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>alerting-core</artifactId> <artifactId>alerting-core</artifactId>
<version>2.5.13-SNAPSHOT</version> <version>2.5.14-SNAPSHOT</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>parsing-app</artifactId> <artifactId>parsing-app</artifactId>
<version>2.5.13-SNAPSHOT</version> <version>2.5.14-SNAPSHOT</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>enriching-core</artifactId> <artifactId>enriching-core</artifactId>
<version>2.5.13-SNAPSHOT</version> <version>2.5.14-SNAPSHOT</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>responding-core</artifactId> <artifactId>responding-core</artifactId>
<version>2.5.13-SNAPSHOT</version> <version>2.5.14-SNAPSHOT</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>junit</groupId> <groupId>junit</groupId>

View File

@@ -9,7 +9,7 @@
<parent> <parent>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>config-editor</artifactId> <artifactId>config-editor</artifactId>
<version>2.5.13-SNAPSHOT</version> <version>2.5.14-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>
<dependency> <dependency>
@@ -20,17 +20,17 @@
<dependency> <dependency>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>siembol-common</artifactId> <artifactId>siembol-common</artifactId>
<version>2.5.13-SNAPSHOT</version> <version>2.5.14-SNAPSHOT</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>config-editor-core</artifactId> <artifactId>config-editor-core</artifactId>
<version>2.5.13-SNAPSHOT</version> <version>2.5.14-SNAPSHOT</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>parsing-app</artifactId> <artifactId>parsing-app</artifactId>
<version>2.5.13-SNAPSHOT</version> <version>2.5.14-SNAPSHOT</version>
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>
<dependency> <dependency>

View File

@@ -11,7 +11,7 @@
<parent> <parent>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>siembol</artifactId> <artifactId>siembol</artifactId>
<version>2.5.13-SNAPSHOT</version> <version>2.5.14-SNAPSHOT</version>
</parent> </parent>
<modules> <modules>
<module>config-editor-core</module> <module>config-editor-core</module>

View File

@@ -9,7 +9,7 @@
<parent> <parent>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>siembol</artifactId> <artifactId>siembol</artifactId>
<version>2.5.13-SNAPSHOT</version> <version>2.5.14-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath> <relativePath>../../pom.xml</relativePath>
</parent> </parent>
<dependencyManagement> <dependencyManagement>
@@ -37,7 +37,7 @@
<dependency> <dependency>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>siembol-common</artifactId> <artifactId>siembol-common</artifactId>
<version>2.5.13-SNAPSHOT</version> <version>2.5.14-SNAPSHOT</version>
<exclusions> <exclusions>
<exclusion> <exclusion>
<groupId>org.slf4j</groupId> <groupId>org.slf4j</groupId>

View File

@@ -9,7 +9,7 @@
<parent> <parent>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>siembol</artifactId> <artifactId>siembol</artifactId>
<version>2.5.13-SNAPSHOT</version> <version>2.5.14-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath> <relativePath>../../pom.xml</relativePath>
</parent> </parent>
<dependencyManagement> <dependencyManagement>
@@ -43,7 +43,7 @@
<dependency> <dependency>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>siembol-common</artifactId> <artifactId>siembol-common</artifactId>
<version>2.5.13-SNAPSHOT</version> <version>2.5.14-SNAPSHOT</version>
<exclusions> <exclusions>
<exclusion> <exclusion>
<groupId>org.slf4j</groupId> <groupId>org.slf4j</groupId>

View File

@@ -120,9 +120,10 @@ Parsers are integrated in a stream application (storm topology) that combines on
- `parsing_app_type`- The type of the parsing application - `single_parser`, `router_parsing`, `topic_routing_parsing` or `header_routing_parsing` - `parsing_app_type`- The type of the parsing application - `single_parser`, `router_parsing`, `topic_routing_parsing` or `header_routing_parsing`
- `input_topics` - The kafka topics for reading messages for parsing - `input_topics` - The kafka topics for reading messages for parsing
- `error_topic`- The kafka topic for publishing error messages - `error_topic`- The kafka topic for publishing error messages
- `input_parallelism` - The number of parallel executors for reading messages from the input kafka topics - `num_workers` - The number of workers for the parsing application
- `parsing_parallelism` - The number of parallel executors for parsing messages - `input_parallelism` - The number of parallel executors per worker for reading messages from the input kafka topics
- `output_parallelism` - The number of parallel executors for publishing parsed messages to kafka - `parsing_parallelism` - The number of parallel executors per worker for parsing messages
- `output_parallelism` - The number of parallel executors per worker for publishing parsed messages to kafka
- `parse_metadata` - Parsing json metadata from input key records using `metadata_prefix` added to metadata field names, by default `metadata_` - `parse_metadata` - Parsing json metadata from input key records using `metadata_prefix` added to metadata field names, by default `metadata_`
- `max_num_fields` - Maximum number of fields after parsing the message - `max_num_fields` - Maximum number of fields after parsing the message
- `max_field_size` - Maximum field size after parsing the message in bytes - `max_field_size` - Maximum field size after parsing the message in bytes

View File

@@ -11,7 +11,7 @@
<parent> <parent>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>enriching</artifactId> <artifactId>enriching</artifactId>
<version>2.5.13-SNAPSHOT</version> <version>2.5.14-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>
<dependency> <dependency>
@@ -35,12 +35,12 @@
<dependency> <dependency>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>siembol-common</artifactId> <artifactId>siembol-common</artifactId>
<version>2.5.13-SNAPSHOT</version> <version>2.5.14-SNAPSHOT</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>alerting-core</artifactId> <artifactId>alerting-core</artifactId>
<version>2.5.13-SNAPSHOT</version> <version>2.5.14-SNAPSHOT</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>junit</groupId> <groupId>junit</groupId>

View File

@@ -9,7 +9,7 @@
<parent> <parent>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>enriching</artifactId> <artifactId>enriching</artifactId>
<version>2.5.13-SNAPSHOT</version> <version>2.5.14-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>
<dependency> <dependency>
@@ -75,7 +75,7 @@
<dependency> <dependency>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>enriching-core</artifactId> <artifactId>enriching-core</artifactId>
<version>2.5.13-SNAPSHOT</version> <version>2.5.14-SNAPSHOT</version>
<exclusions> <exclusions>
<exclusion> <exclusion>
<groupId>org.slf4j</groupId> <groupId>org.slf4j</groupId>

View File

@@ -11,7 +11,7 @@
<parent> <parent>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>siembol</artifactId> <artifactId>siembol</artifactId>
<version>2.5.13-SNAPSHOT</version> <version>2.5.14-SNAPSHOT</version>
</parent> </parent>
<modules> <modules>
<module>enriching-core</module> <module>enriching-core</module>

View File

@@ -11,7 +11,7 @@
<parent> <parent>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>parsing</artifactId> <artifactId>parsing</artifactId>
<version>2.5.13-SNAPSHOT</version> <version>2.5.14-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>
<dependency> <dependency>
@@ -39,12 +39,12 @@
<dependency> <dependency>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>siembol-common</artifactId> <artifactId>siembol-common</artifactId>
<version>2.5.13-SNAPSHOT</version> <version>2.5.14-SNAPSHOT</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>parsing-core</artifactId> <artifactId>parsing-core</artifactId>
<version>2.5.13-SNAPSHOT</version> <version>2.5.14-SNAPSHOT</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>junit</groupId> <groupId>junit</groupId>

View File

@@ -9,6 +9,7 @@ public class ParsingApplicationFactoryAttributes {
private String jsonSchema; private String jsonSchema;
private String name; private String name;
private String applicationParserSpecification; private String applicationParserSpecification;
private Integer numWorkers;
private Integer inputParallelism; private Integer inputParallelism;
private Integer outputParallelism; private Integer outputParallelism;
private Integer parsingParallelism; private Integer parsingParallelism;
@@ -105,4 +106,12 @@ public class ParsingApplicationFactoryAttributes {
public void setApplicationType(ParsingApplicationTypeDto applicationType) { public void setApplicationType(ParsingApplicationTypeDto applicationType) {
this.applicationType = applicationType; this.applicationType = applicationType;
} }
public Integer getNumWorkers() {
return numWorkers;
}
public void setNumWorkers(Integer numWorkers) {
this.numWorkers = numWorkers;
}
} }

View File

@@ -330,6 +330,7 @@ public class ParsingApplicationFactoryImpl implements ParsingApplicationFactory
attributes.setSourceHeaderName( attributes.setSourceHeaderName(
application.getParsingSettingsDto().getHeaderRoutingParserDto().getHeaderName()); application.getParsingSettingsDto().getHeaderRoutingParserDto().getHeaderName());
} }
attributes.setNumWorkers(application.getParsingApplicationSettingsDto().getNumWorkers());
} }
private String getParserFromMap(String parserName, Map<String, String> parserMap) { private String getParserFromMap(String parserName, Map<String, String> parserMap) {

View File

@@ -19,18 +19,21 @@ public class ParsingApplicationSettingsDto {
@Attributes(description = "The kafka topic for publishing error messages", required = true) @Attributes(description = "The kafka topic for publishing error messages", required = true)
private String errorTopic; private String errorTopic;
@JsonProperty("num_workers")
@Attributes(description = "The number of workers for the parsing application", minimum = 1, required = true)
private Integer numWorkers = 1;
@JsonProperty("input_parallelism") @JsonProperty("input_parallelism")
@Attributes(description = "The number of parallel executors for reading messages from the input kafka topics", @Attributes(description = "The number of parallel executors per worker for reading messages from the input topics",
required = true, minimum = 1) required = true, minimum = 1)
private Integer inputParallelism; private Integer inputParallelism;
@JsonProperty("parsing_parallelism") @JsonProperty("parsing_parallelism")
@Attributes(description = "The number of parallel executors for parsing messages", @Attributes(description = "The number of parallel executors per worker for parsing messages",
required = true, minimum = 1) required = true, minimum = 1)
private Integer parsingParallelism; private Integer parsingParallelism;
@JsonProperty("output_parallelism") @JsonProperty("output_parallelism")
@Attributes(description = "The number of parallel executors for publishing to kafka", @Attributes(description = "The number of parallel executors per worker for publishing to kafka",
required = true, minimum = 1) required = true, minimum = 1)
private Integer outputParallelism; private Integer outputParallelism;
@@ -141,4 +144,12 @@ public class ParsingApplicationSettingsDto {
public void setOriginalStringTopic(String originalStringTopic) { public void setOriginalStringTopic(String originalStringTopic) {
this.originalStringTopic = originalStringTopic; this.originalStringTopic = originalStringTopic;
} }
public Integer getNumWorkers() {
return numWorkers;
}
public void setNumWorkers(Integer numWorkers) {
this.numWorkers = numWorkers;
}
} }

View File

@@ -24,6 +24,7 @@ public class ParsingApplicationFactoryImplTest {
"max_num_fields" : 100, "max_num_fields" : 100,
"max_field_size" : 40000, "max_field_size" : 40000,
"original_string_topic" : "truncated", "original_string_topic" : "truncated",
"num_workers" : 3,
"input_parallelism": 1, "input_parallelism": 1,
"parsing_parallelism": 2, "parsing_parallelism": 2,
"output_parallelism": 3, "output_parallelism": 3,
@@ -49,6 +50,7 @@ public class ParsingApplicationFactoryImplTest {
"secret" "secret"
], ],
"error_topic": "error", "error_topic": "error",
"num_workers" : 3,
"input_parallelism": 1, "input_parallelism": 1,
"parsing_parallelism": 2, "parsing_parallelism": 2,
"output_parallelism": 3, "output_parallelism": 3,
@@ -96,6 +98,7 @@ public class ParsingApplicationFactoryImplTest {
"public" "public"
], ],
"error_topic": "error", "error_topic": "error",
"num_workers" : 3,
"input_parallelism": 1, "input_parallelism": 1,
"parsing_parallelism": 2, "parsing_parallelism": 2,
"output_parallelism": 3, "output_parallelism": 3,
@@ -144,6 +147,7 @@ public class ParsingApplicationFactoryImplTest {
"public" "public"
], ],
"error_topic": "error", "error_topic": "error",
"num_workers" : 3,
"input_parallelism": 1, "input_parallelism": 1,
"parsing_parallelism": 2, "parsing_parallelism": 2,
"output_parallelism": 3, "output_parallelism": 3,
@@ -260,6 +264,7 @@ public class ParsingApplicationFactoryImplTest {
ParsingApplicationFactoryResult result = factory.create(simpleSingleApplicationParser, testParsersConfigs); ParsingApplicationFactoryResult result = factory.create(simpleSingleApplicationParser, testParsersConfigs);
Assert.assertSame(ParsingApplicationFactoryResult.StatusCode.OK, result.getStatusCode()); Assert.assertSame(ParsingApplicationFactoryResult.StatusCode.OK, result.getStatusCode());
Assert.assertEquals("test", result.getAttributes().getName()); Assert.assertEquals("test", result.getAttributes().getName());
Assert.assertEquals(3, result.getAttributes().getNumWorkers().intValue());
Assert.assertEquals(1, result.getAttributes().getInputParallelism().intValue()); Assert.assertEquals(1, result.getAttributes().getInputParallelism().intValue());
Assert.assertEquals(2, result.getAttributes().getParsingParallelism().intValue()); Assert.assertEquals(2, result.getAttributes().getParsingParallelism().intValue());
Assert.assertEquals(3, result.getAttributes().getOutputParallelism().intValue()); Assert.assertEquals(3, result.getAttributes().getOutputParallelism().intValue());
@@ -312,6 +317,7 @@ public class ParsingApplicationFactoryImplTest {
ParsingApplicationFactoryResult result = factory.create(simpleRoutingApplicationParser, testParsersConfigs); ParsingApplicationFactoryResult result = factory.create(simpleRoutingApplicationParser, testParsersConfigs);
Assert.assertSame(ParsingApplicationFactoryResult.StatusCode.OK, result.getStatusCode()); Assert.assertSame(ParsingApplicationFactoryResult.StatusCode.OK, result.getStatusCode());
Assert.assertEquals("test", result.getAttributes().getName()); Assert.assertEquals("test", result.getAttributes().getName());
Assert.assertEquals(3, result.getAttributes().getNumWorkers().intValue());
Assert.assertEquals(1, result.getAttributes().getInputParallelism().intValue()); Assert.assertEquals(1, result.getAttributes().getInputParallelism().intValue());
Assert.assertEquals(2, result.getAttributes().getParsingParallelism().intValue()); Assert.assertEquals(2, result.getAttributes().getParsingParallelism().intValue());
Assert.assertEquals(3, result.getAttributes().getOutputParallelism().intValue()); Assert.assertEquals(3, result.getAttributes().getOutputParallelism().intValue());
@@ -397,6 +403,7 @@ public class ParsingApplicationFactoryImplTest {
ParsingApplicationFactoryResult result = factory.create(headerRoutingApplicationParser, testParsersConfigs); ParsingApplicationFactoryResult result = factory.create(headerRoutingApplicationParser, testParsersConfigs);
Assert.assertSame(ParsingApplicationFactoryResult.StatusCode.OK, result.getStatusCode()); Assert.assertSame(ParsingApplicationFactoryResult.StatusCode.OK, result.getStatusCode());
Assert.assertEquals("test", result.getAttributes().getName()); Assert.assertEquals("test", result.getAttributes().getName());
Assert.assertEquals(3, result.getAttributes().getNumWorkers().intValue());
Assert.assertEquals(1, result.getAttributes().getInputParallelism().intValue()); Assert.assertEquals(1, result.getAttributes().getInputParallelism().intValue());
Assert.assertEquals(2, result.getAttributes().getParsingParallelism().intValue()); Assert.assertEquals(2, result.getAttributes().getParsingParallelism().intValue());
Assert.assertEquals(3, result.getAttributes().getOutputParallelism().intValue()); Assert.assertEquals(3, result.getAttributes().getOutputParallelism().intValue());
@@ -452,6 +459,7 @@ public class ParsingApplicationFactoryImplTest {
ParsingApplicationFactoryResult result = factory.create(topicRoutingApplicationParser, testParsersConfigs); ParsingApplicationFactoryResult result = factory.create(topicRoutingApplicationParser, testParsersConfigs);
Assert.assertSame(ParsingApplicationFactoryResult.StatusCode.OK, result.getStatusCode()); Assert.assertSame(ParsingApplicationFactoryResult.StatusCode.OK, result.getStatusCode());
Assert.assertEquals("test", result.getAttributes().getName()); Assert.assertEquals("test", result.getAttributes().getName());
Assert.assertEquals(3, result.getAttributes().getNumWorkers().intValue());
Assert.assertEquals(1, result.getAttributes().getInputParallelism().intValue()); Assert.assertEquals(1, result.getAttributes().getInputParallelism().intValue());
Assert.assertEquals(2, result.getAttributes().getParsingParallelism().intValue()); Assert.assertEquals(2, result.getAttributes().getParsingParallelism().intValue());
Assert.assertEquals(3, result.getAttributes().getOutputParallelism().intValue()); Assert.assertEquals(3, result.getAttributes().getOutputParallelism().intValue());

View File

@@ -11,7 +11,7 @@
<parent> <parent>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>parsing</artifactId> <artifactId>parsing</artifactId>
<version>2.5.13-SNAPSHOT</version> <version>2.5.14-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>
<dependency> <dependency>
@@ -50,7 +50,7 @@
<dependency> <dependency>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>siembol-common</artifactId> <artifactId>siembol-common</artifactId>
<version>2.5.13-SNAPSHOT</version> <version>2.5.14-SNAPSHOT</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>joda-time</groupId> <groupId>joda-time</groupId>

View File

@@ -9,7 +9,7 @@
<parent> <parent>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>parsing</artifactId> <artifactId>parsing</artifactId>
<version>2.5.13-SNAPSHOT</version> <version>2.5.14-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>
<dependency> <dependency>
@@ -75,7 +75,7 @@
<dependency> <dependency>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>parsing-app</artifactId> <artifactId>parsing-app</artifactId>
<version>2.5.13-SNAPSHOT</version> <version>2.5.14-SNAPSHOT</version>
<exclusions> <exclusions>
<exclusion> <exclusion>
<groupId>org.slf4j</groupId> <groupId>org.slf4j</groupId>

View File

@@ -2,6 +2,7 @@ package uk.co.gresearch.siembol.parsers.storm;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.storm.Config; import org.apache.storm.Config;
@@ -51,6 +52,8 @@ public class StormParsingApplication {
"with storm attributes: {}\nparsing application attributes: {}"; "with storm attributes: {}\nparsing application attributes: {}";
private static final String UNKNOWN_SOURCE = "unknown"; private static final String UNKNOWN_SOURCE = "unknown";
private static final String UNKNOWN_SOURCE_HEADER = "unknown_header";
private static KafkaSpoutConfig<String, byte[]> createKafkaSpoutConfig( private static KafkaSpoutConfig<String, byte[]> createKafkaSpoutConfig(
StormParsingApplicationAttributesDto parsingApplicationAttributes, StormParsingApplicationAttributesDto parsingApplicationAttributes,
ParsingApplicationFactoryAttributes parsingAttributes) { ParsingApplicationFactoryAttributes parsingAttributes) {
@@ -77,9 +80,16 @@ public class StormParsingApplication {
return r -> new Values(r.topic(), r.key(), r.value()); return r -> new Values(r.topic(), r.key(), r.value());
case HEADER_ROUTING_PARSING: case HEADER_ROUTING_PARSING:
final String headerName = parsingAttributes.getSourceHeaderName(); final String headerName = parsingAttributes.getSourceHeaderName();
return r -> new Values(new String(r.headers().lastHeader(headerName).value(), StandardCharsets.UTF_8), return r -> {
r.key(), Header header = r.headers() != null
r.value()); ? r.headers().lastHeader(headerName)
: null;
String headerValue = header != null && header.value() != null
? new String(header.value(), StandardCharsets.UTF_8)
: UNKNOWN_SOURCE_HEADER;
return new Values(headerValue, r.key(), r.value());
};
default: default:
throw new IllegalArgumentException(); throw new IllegalArgumentException();
} }
@@ -95,14 +105,15 @@ public class StormParsingApplication {
.put(CLIENT_ID_CONFIG, stormAppAttributes.getClientId(parsingAttributes.getName())); .put(CLIENT_ID_CONFIG, stormAppAttributes.getClientId(parsingAttributes.getName()));
stormAppAttributes.getStormAttributes().setKafkaTopics(parsingAttributes.getInputTopics()); stormAppAttributes.getStormAttributes().setKafkaTopics(parsingAttributes.getInputTopics());
var numWorkers = parsingAttributes.getNumWorkers();
TopologyBuilder builder = new TopologyBuilder(); TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(KAFKA_SPOUT, builder.setSpout(KAFKA_SPOUT,
new KafkaSpout<>(createKafkaSpoutConfig(stormAppAttributes, parsingAttributes)), new KafkaSpout<>(createKafkaSpoutConfig(stormAppAttributes, parsingAttributes)),
parsingAttributes.getInputParallelism()); parsingAttributes.getInputParallelism() * numWorkers);
builder.setBolt(parsingAttributes.getName(), builder.setBolt(parsingAttributes.getName(),
new ParsingApplicationBolt(stormAppAttributes, parsingAttributes, zooKeeperConnectorFactory, metricsFactory), new ParsingApplicationBolt(stormAppAttributes, parsingAttributes, zooKeeperConnectorFactory, metricsFactory),
parsingAttributes.getParsingParallelism()) parsingAttributes.getParsingParallelism() * numWorkers)
.localOrShuffleGrouping(KAFKA_SPOUT); .localOrShuffleGrouping(KAFKA_SPOUT);
builder.setBolt(KAFKA_WRITER, builder.setBolt(KAFKA_WRITER,
@@ -110,7 +121,7 @@ public class StormParsingApplication {
ParsingApplicationTuples.PARSING_MESSAGES.toString(), ParsingApplicationTuples.PARSING_MESSAGES.toString(),
ParsingApplicationTuples.COUNTERS.toString(), ParsingApplicationTuples.COUNTERS.toString(),
metricsFactory), metricsFactory),
parsingAttributes.getOutputParallelism()) parsingAttributes.getOutputParallelism() * numWorkers)
.localOrShuffleGrouping(parsingAttributes.getName()); .localOrShuffleGrouping(parsingAttributes.getName());
return builder.createTopology(); return builder.createTopology();
@@ -137,6 +148,8 @@ public class StormParsingApplication {
ParsingApplicationFactoryAttributes parsingAttributes = result.getAttributes(); ParsingApplicationFactoryAttributes parsingAttributes = result.getAttributes();
Config config = new Config(); Config config = new Config();
config.putAll(stormAttributes.getStormAttributes().getStormConfig().getRawMap()); config.putAll(stormAttributes.getStormAttributes().getStormConfig().getRawMap());
config.put(Config.TOPOLOGY_WORKERS, parsingAttributes.getNumWorkers());
StormTopology topology = createTopology(stormAttributes, StormTopology topology = createTopology(stormAttributes,
parsingAttributes, parsingAttributes,
new ZooKeeperConnectorFactoryImpl(), new ZooKeeperConnectorFactoryImpl(),

View File

@@ -42,6 +42,7 @@ public class StormParsingApplicationTest {
], ],
"parse_metadata" : false, "parse_metadata" : false,
"error_topic": "error", "error_topic": "error",
"num_workers" : 1,
"input_parallelism": 1, "input_parallelism": 1,
"parsing_parallelism": 1, "parsing_parallelism": 1,
"output_parallelism": 1, "output_parallelism": 1,

View File

@@ -11,7 +11,7 @@
<parent> <parent>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>siembol</artifactId> <artifactId>siembol</artifactId>
<version>2.5.13-SNAPSHOT</version> <version>2.5.14-SNAPSHOT</version>
</parent> </parent>
<modules> <modules>
<module>parsing-core</module> <module>parsing-core</module>

View File

@@ -6,7 +6,7 @@
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>siembol</artifactId> <artifactId>siembol</artifactId>
<name>siembol</name> <name>siembol</name>
<version>2.5.13-SNAPSHOT</version> <version>2.5.14-SNAPSHOT</version>
<description>A scalable, advanced security analytics framework based on open-source big data technologies.</description> <description>A scalable, advanced security analytics framework based on open-source big data technologies.</description>
<inceptionYear>2019</inceptionYear> <inceptionYear>2019</inceptionYear>
<url>https://siembol.io/</url> <url>https://siembol.io/</url>

View File

@@ -11,7 +11,7 @@
<parent> <parent>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>siembol</artifactId> <artifactId>siembol</artifactId>
<version>2.5.13-SNAPSHOT</version> <version>2.5.14-SNAPSHOT</version>
</parent> </parent>
<modules> <modules>
<module>responding-core</module> <module>responding-core</module>

View File

@@ -11,7 +11,7 @@
<parent> <parent>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>responding</artifactId> <artifactId>responding</artifactId>
<version>2.5.13-SNAPSHOT</version> <version>2.5.14-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>
<dependency> <dependency>
@@ -35,12 +35,12 @@
<dependency> <dependency>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>siembol-common</artifactId> <artifactId>siembol-common</artifactId>
<version>2.5.13-SNAPSHOT</version> <version>2.5.14-SNAPSHOT</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>alerting-core</artifactId> <artifactId>alerting-core</artifactId>
<version>2.5.13-SNAPSHOT</version> <version>2.5.14-SNAPSHOT</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.jayway.jsonpath</groupId> <groupId>com.jayway.jsonpath</groupId>

View File

@@ -9,7 +9,7 @@
<parent> <parent>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>responding</artifactId> <artifactId>responding</artifactId>
<version>2.5.13-SNAPSHOT</version> <version>2.5.14-SNAPSHOT</version>
</parent> </parent>
<dependencyManagement> <dependencyManagement>
<dependencies> <dependencies>
@@ -51,7 +51,7 @@
<dependency> <dependency>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>siembol-common</artifactId> <artifactId>siembol-common</artifactId>
<version>2.5.13-SNAPSHOT</version> <version>2.5.14-SNAPSHOT</version>
<exclusions> <exclusions>
<exclusion> <exclusion>
<groupId>org.slf4j</groupId> <groupId>org.slf4j</groupId>
@@ -62,7 +62,7 @@
<dependency> <dependency>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>responding-core</artifactId> <artifactId>responding-core</artifactId>
<version>2.5.13-SNAPSHOT</version> <version>2.5.14-SNAPSHOT</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.kafka</groupId> <groupId>org.apache.kafka</groupId>

View File

@@ -9,7 +9,7 @@
<parent> <parent>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>siembol</artifactId> <artifactId>siembol</artifactId>
<version>2.5.13-SNAPSHOT</version> <version>2.5.14-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>
<dependency> <dependency>