Siembol parsing: adding source routing applications (#495)

* adding topig routing and header rouitng into parsing

* increase siemobl version to 2.2.9-SNAPSHOT

* updating parsing app layout config file
This commit is contained in:
Marian Novotny
2022-01-26 13:38:31 +00:00
committed by GitHub
parent ff962eccfa
commit 4be1159387
43 changed files with 1056 additions and 145 deletions

View File

@@ -11,7 +11,7 @@
<parent> <parent>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>alerting</artifactId> <artifactId>alerting</artifactId>
<version>2.2.8-SNAPSHOT</version> <version>2.2.9-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>
<dependency> <dependency>
@@ -35,7 +35,7 @@
<dependency> <dependency>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>siembol-common</artifactId> <artifactId>siembol-common</artifactId>
<version>2.2.8-SNAPSHOT</version> <version>2.2.9-SNAPSHOT</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>junit</groupId> <groupId>junit</groupId>

View File

@@ -11,7 +11,7 @@
<parent> <parent>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>alerting</artifactId> <artifactId>alerting</artifactId>
<version>2.2.8-SNAPSHOT</version> <version>2.2.9-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>
<dependency> <dependency>
@@ -23,7 +23,7 @@
<dependency> <dependency>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>alerting-core</artifactId> <artifactId>alerting-core</artifactId>
<version>2.2.8-SNAPSHOT</version> <version>2.2.9-SNAPSHOT</version>
<exclusions> <exclusions>
<exclusion> <exclusion>
<artifactId>jackson-databind</artifactId> <artifactId>jackson-databind</artifactId>

View File

@@ -9,7 +9,7 @@
<parent> <parent>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>alerting</artifactId> <artifactId>alerting</artifactId>
<version>2.2.8-SNAPSHOT</version> <version>2.2.9-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>
<dependency> <dependency>
@@ -51,7 +51,7 @@
<dependency> <dependency>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>alerting-core</artifactId> <artifactId>alerting-core</artifactId>
<version>2.2.8-SNAPSHOT</version> <version>2.2.9-SNAPSHOT</version>
<exclusions> <exclusions>
<exclusion> <exclusion>
<groupId>org.slf4j</groupId> <groupId>org.slf4j</groupId>

View File

@@ -11,7 +11,7 @@
<parent> <parent>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>siembol</artifactId> <artifactId>siembol</artifactId>
<version>2.2.8-SNAPSHOT</version> <version>2.2.9-SNAPSHOT</version>
</parent> </parent>
<modules> <modules>
<module>alerting-core</module> <module>alerting-core</module>

View File

@@ -9,13 +9,13 @@
<parent> <parent>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>config-editor</artifactId> <artifactId>config-editor</artifactId>
<version>2.2.8-SNAPSHOT</version> <version>2.2.9-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>
<dependency> <dependency>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>siembol-common</artifactId> <artifactId>siembol-common</artifactId>
<version>2.2.8-SNAPSHOT</version> <version>2.2.9-SNAPSHOT</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.commons</groupId> <groupId>org.apache.commons</groupId>

View File

@@ -9,7 +9,7 @@
<parent> <parent>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>config-editor</artifactId> <artifactId>config-editor</artifactId>
<version>2.2.8-SNAPSHOT</version> <version>2.2.9-SNAPSHOT</version>
</parent> </parent>
<dependencyManagement> <dependencyManagement>
<dependencies> <dependencies>
@@ -56,7 +56,7 @@
<dependency> <dependency>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>siembol-common</artifactId> <artifactId>siembol-common</artifactId>
<version>2.2.8-SNAPSHOT</version> <version>2.2.9-SNAPSHOT</version>
<exclusions> <exclusions>
<exclusion> <exclusion>
<groupId>org.slf4j</groupId> <groupId>org.slf4j</groupId>
@@ -67,22 +67,22 @@
<dependency> <dependency>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>config-editor-core</artifactId> <artifactId>config-editor-core</artifactId>
<version>2.2.8-SNAPSHOT</version> <version>2.2.9-SNAPSHOT</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>config-editor-services</artifactId> <artifactId>config-editor-services</artifactId>
<version>2.2.8-SNAPSHOT</version> <version>2.2.9-SNAPSHOT</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>config-editor-sync</artifactId> <artifactId>config-editor-sync</artifactId>
<version>2.2.8-SNAPSHOT</version> <version>2.2.9-SNAPSHOT</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>alerting-core</artifactId> <artifactId>alerting-core</artifactId>
<version>2.2.8-SNAPSHOT</version> <version>2.2.9-SNAPSHOT</version>
<exclusions> <exclusions>
<exclusion> <exclusion>
<groupId>org.slf4j</groupId> <groupId>org.slf4j</groupId>
@@ -93,7 +93,7 @@
<dependency> <dependency>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>parsing-app</artifactId> <artifactId>parsing-app</artifactId>
<version>2.2.8-SNAPSHOT</version> <version>2.2.9-SNAPSHOT</version>
<exclusions> <exclusions>
<exclusion> <exclusion>
<groupId>org.slf4j</groupId> <groupId>org.slf4j</groupId>
@@ -104,7 +104,7 @@
<dependency> <dependency>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>enriching-core</artifactId> <artifactId>enriching-core</artifactId>
<version>2.2.8-SNAPSHOT</version> <version>2.2.9-SNAPSHOT</version>
<exclusions> <exclusions>
<exclusion> <exclusion>
<groupId>org.slf4j</groupId> <groupId>org.slf4j</groupId>
@@ -115,7 +115,7 @@
<dependency> <dependency>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>responding-core</artifactId> <artifactId>responding-core</artifactId>
<version>2.2.8-SNAPSHOT</version> <version>2.2.9-SNAPSHOT</version>
<exclusions> <exclusions>
<exclusion> <exclusion>
<groupId>org.slf4j</groupId> <groupId>org.slf4j</groupId>

View File

@@ -10,7 +10,7 @@
<parent> <parent>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>config-editor</artifactId> <artifactId>config-editor</artifactId>
<version>2.2.8-SNAPSHOT</version> <version>2.2.9-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>
<dependency> <dependency>
@@ -41,32 +41,32 @@
<dependency> <dependency>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>siembol-common</artifactId> <artifactId>siembol-common</artifactId>
<version>2.2.8-SNAPSHOT</version> <version>2.2.9-SNAPSHOT</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>config-editor-core</artifactId> <artifactId>config-editor-core</artifactId>
<version>2.2.8-SNAPSHOT</version> <version>2.2.9-SNAPSHOT</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>alerting-core</artifactId> <artifactId>alerting-core</artifactId>
<version>2.2.8-SNAPSHOT</version> <version>2.2.9-SNAPSHOT</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>parsing-app</artifactId> <artifactId>parsing-app</artifactId>
<version>2.2.8-SNAPSHOT</version> <version>2.2.9-SNAPSHOT</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>enriching-core</artifactId> <artifactId>enriching-core</artifactId>
<version>2.2.8-SNAPSHOT</version> <version>2.2.9-SNAPSHOT</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>responding-core</artifactId> <artifactId>responding-core</artifactId>
<version>2.2.8-SNAPSHOT</version> <version>2.2.9-SNAPSHOT</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>junit</groupId> <groupId>junit</groupId>

View File

@@ -9,7 +9,7 @@
<parent> <parent>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>config-editor</artifactId> <artifactId>config-editor</artifactId>
<version>2.2.8-SNAPSHOT</version> <version>2.2.9-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>
<dependency> <dependency>
@@ -20,17 +20,17 @@
<dependency> <dependency>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>siembol-common</artifactId> <artifactId>siembol-common</artifactId>
<version>2.2.8-SNAPSHOT</version> <version>2.2.9-SNAPSHOT</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>config-editor-core</artifactId> <artifactId>config-editor-core</artifactId>
<version>2.2.8-SNAPSHOT</version> <version>2.2.9-SNAPSHOT</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>parsing-app</artifactId> <artifactId>parsing-app</artifactId>
<version>2.2.8-SNAPSHOT</version> <version>2.2.9-SNAPSHOT</version>
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>
<dependency> <dependency>

View File

@@ -11,7 +11,7 @@
<parent> <parent>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>siembol</artifactId> <artifactId>siembol</artifactId>
<version>2.2.8-SNAPSHOT</version> <version>2.2.9-SNAPSHOT</version>
</parent> </parent>
<modules> <modules>
<module>config-editor-core</module> <module>config-editor-core</module>

View File

@@ -21,7 +21,7 @@
} }
} }
}, },
"$..parsers": { "$..routing_parser..parsers": {
"widget": { "widget": {
"formlyConfig": { "formlyConfig": {
"wrappers": [ "wrappers": [
@@ -30,7 +30,7 @@
} }
} }
}, },
"$..parsers.items": { "$..routing_parser..parsers.items": {
"widget": { "widget": {
"formlyConfig": { "formlyConfig": {
"wrappers": [ "wrappers": [
@@ -39,7 +39,7 @@
} }
} }
}, },
"$..default_parser": { "$..routing_parser..default_parser": {
"wrappers": [ "wrappers": [
"panel" "panel"
] ]
@@ -70,6 +70,66 @@
} }
} }
} }
},
"$..topic_routing_parser": {
"widget": {
"formlyConfig": {
"hideExpression": "field.parent.parent.model.parsing_app_settings.parsing_app_type !== 'topic_routing_parsing'"
}
}
},
"$..topic_routing_parser..parsers": {
"widget": {
"formlyConfig": {
"wrappers": [
"expansion-panel"
]
}
}
},
"$..topic_routing_parser..parsers.items": {
"widget": {
"formlyConfig": {
"wrappers": [
"panel"
]
}
}
},
"$..topic_routing_parser..default_parser": {
"wrappers": [
"panel"
]
},
"$..header_routing_parser": {
"widget": {
"formlyConfig": {
"hideExpression": "field.parent.parent.model.parsing_app_settings.parsing_app_type !== 'header_routing_parsing'"
}
}
},
"$..header_routing_parser..parsers": {
"widget": {
"formlyConfig": {
"wrappers": [
"expansion-panel"
]
}
}
},
"$..header_routing_parser..parsers.items": {
"widget": {
"formlyConfig": {
"wrappers": [
"panel"
]
}
}
},
"$..header_routing_parser..default_parser": {
"wrappers": [
"panel"
]
} }
}, },
"admin_config_layout": { "admin_config_layout": {

View File

@@ -9,7 +9,7 @@
<parent> <parent>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>siembol</artifactId> <artifactId>siembol</artifactId>
<version>2.2.8-SNAPSHOT</version> <version>2.2.9-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath> <relativePath>../../pom.xml</relativePath>
</parent> </parent>
<dependencyManagement> <dependencyManagement>
@@ -43,7 +43,7 @@
<dependency> <dependency>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>siembol-common</artifactId> <artifactId>siembol-common</artifactId>
<version>2.2.8-SNAPSHOT</version> <version>2.2.9-SNAPSHOT</version>
<exclusions> <exclusions>
<exclusion> <exclusion>
<groupId>org.slf4j</groupId> <groupId>org.slf4j</groupId>

View File

@@ -11,7 +11,7 @@
<parent> <parent>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>enriching</artifactId> <artifactId>enriching</artifactId>
<version>2.2.8-SNAPSHOT</version> <version>2.2.9-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>
<dependency> <dependency>
@@ -35,12 +35,12 @@
<dependency> <dependency>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>siembol-common</artifactId> <artifactId>siembol-common</artifactId>
<version>2.2.8-SNAPSHOT</version> <version>2.2.9-SNAPSHOT</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>alerting-core</artifactId> <artifactId>alerting-core</artifactId>
<version>2.2.8-SNAPSHOT</version> <version>2.2.9-SNAPSHOT</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>junit</groupId> <groupId>junit</groupId>

View File

@@ -9,7 +9,7 @@
<parent> <parent>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>enriching</artifactId> <artifactId>enriching</artifactId>
<version>2.2.8-SNAPSHOT</version> <version>2.2.9-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>
<dependency> <dependency>
@@ -71,7 +71,7 @@
<dependency> <dependency>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>enriching-core</artifactId> <artifactId>enriching-core</artifactId>
<version>2.2.8-SNAPSHOT</version> <version>2.2.9-SNAPSHOT</version>
<exclusions> <exclusions>
<exclusion> <exclusion>
<groupId>org.slf4j</groupId> <groupId>org.slf4j</groupId>

View File

@@ -11,7 +11,7 @@
<parent> <parent>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>siembol</artifactId> <artifactId>siembol</artifactId>
<version>2.2.8-SNAPSHOT</version> <version>2.2.9-SNAPSHOT</version>
</parent> </parent>
<modules> <modules>
<module>enriching-core</module> <module>enriching-core</module>

View File

@@ -11,7 +11,7 @@
<parent> <parent>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>parsing</artifactId> <artifactId>parsing</artifactId>
<version>2.2.8-SNAPSHOT</version> <version>2.2.9-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>
<dependency> <dependency>
@@ -39,12 +39,12 @@
<dependency> <dependency>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>siembol-common</artifactId> <artifactId>siembol-common</artifactId>
<version>2.2.8-SNAPSHOT</version> <version>2.2.9-SNAPSHOT</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>parsing-core</artifactId> <artifactId>parsing-core</artifactId>
<version>2.2.8-SNAPSHOT</version> <version>2.2.9-SNAPSHOT</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>junit</groupId> <groupId>junit</groupId>

View File

@@ -1,5 +1,6 @@
package uk.co.gresearch.siembol.parsers.application.factory; package uk.co.gresearch.siembol.parsers.application.factory;
import uk.co.gresearch.siembol.parsers.application.model.ParsingApplicationTypeDto;
import uk.co.gresearch.siembol.parsers.application.parsing.ParsingApplicationParser; import uk.co.gresearch.siembol.parsers.application.parsing.ParsingApplicationParser;
import java.util.List; import java.util.List;
@@ -14,6 +15,8 @@ public class ParsingApplicationFactoryAttributes {
private List<String> inputTopics; private List<String> inputTopics;
private ParsingApplicationParser applicationParser; private ParsingApplicationParser applicationParser;
private String message; private String message;
private String sourceHeaderName;
private ParsingApplicationTypeDto applicationType;
public String getJsonSchema() { public String getJsonSchema() {
return jsonSchema; return jsonSchema;
@@ -86,4 +89,20 @@ public class ParsingApplicationFactoryAttributes {
public void setApplicationParserSpecification(String applicationParserSpecification) { public void setApplicationParserSpecification(String applicationParserSpecification) {
this.applicationParserSpecification = applicationParserSpecification; this.applicationParserSpecification = applicationParserSpecification;
} }
public String getSourceHeaderName() {
return sourceHeaderName;
}
public void setSourceHeaderName(String sourceHeaderName) {
this.sourceHeaderName = sourceHeaderName;
}
public ParsingApplicationTypeDto getApplicationType() {
return applicationType;
}
public void setApplicationType(ParsingApplicationTypeDto applicationType) {
this.applicationType = applicationType;
}
} }

View File

@@ -13,6 +13,7 @@ import uk.co.gresearch.siembol.parsers.application.model.*;
import uk.co.gresearch.siembol.parsers.application.parsing.ParsingApplicationParser; import uk.co.gresearch.siembol.parsers.application.parsing.ParsingApplicationParser;
import uk.co.gresearch.siembol.parsers.application.parsing.RoutingParsingApplicationParser; import uk.co.gresearch.siembol.parsers.application.parsing.RoutingParsingApplicationParser;
import uk.co.gresearch.siembol.parsers.application.parsing.SingleApplicationParser; import uk.co.gresearch.siembol.parsers.application.parsing.SingleApplicationParser;
import uk.co.gresearch.siembol.parsers.application.parsing.SourceRoutingApplicationParser;
import uk.co.gresearch.siembol.parsers.common.SerializableSiembolParser; import uk.co.gresearch.siembol.parsers.common.SerializableSiembolParser;
import uk.co.gresearch.siembol.parsers.factory.ParserFactory; import uk.co.gresearch.siembol.parsers.factory.ParserFactory;
import uk.co.gresearch.siembol.parsers.factory.ParserFactoryImpl; import uk.co.gresearch.siembol.parsers.factory.ParserFactoryImpl;
@@ -21,18 +22,21 @@ import uk.co.gresearch.siembol.parsers.model.ParserConfigDto;
import uk.co.gresearch.siembol.parsers.model.ParsersConfigDto; import uk.co.gresearch.siembol.parsers.model.ParsersConfigDto;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static uk.co.gresearch.siembol.parsers.application.factory.ParsingApplicationFactoryResult.StatusCode.ERROR; import static uk.co.gresearch.siembol.parsers.application.factory.ParsingApplicationFactoryResult.StatusCode.ERROR;
import static uk.co.gresearch.siembol.parsers.application.factory.ParsingApplicationFactoryResult.StatusCode.OK; import static uk.co.gresearch.siembol.parsers.application.factory.ParsingApplicationFactoryResult.StatusCode.OK;
import static uk.co.gresearch.siembol.parsers.application.model.ParsingApplicationTypeDto.SINGLE_PARSER;
public class ParsingApplicationFactoryImpl implements ParsingApplicationFactory { public class ParsingApplicationFactoryImpl implements ParsingApplicationFactory {
private static final String MISSING_PARSER_MSG = "Missing parser: %s in parser configurations"; private static final String MISSING_PARSER_MSG = "Missing parser: %s in parser configurations";
private static final String MISSING_SINGLE_PARSER = "Missing single_parser properties"; private static final String MISSING_SINGLE_PARSER = "Missing single_parser properties";
private static final String MISSING_ROUTING_PARSER = "Missing routing_parser properties"; private static final String MISSING_ROUTING_PARSER = "Missing routing_parser properties";
private static final String MISSING_HEADER_ROUTING_PARSER = "Missing header_routing_parsing properties";
private static final String MISSING_TOPIC_ROUTING_PARSER = "Missing topic_routing_parsing properties";
private static final String UNSUPPORTED_PARSER_APP_TYPE = "Unsupported parsing application type %s";
private static final ObjectReader JSON_PARSERS_CONFIG_READER = new ObjectMapper() private static final ObjectReader JSON_PARSERS_CONFIG_READER = new ObjectMapper()
.readerFor(ParsersConfigDto.class); .readerFor(ParsersConfigDto.class);
@@ -49,7 +53,7 @@ public class ParsingApplicationFactoryImpl implements ParsingApplicationFactory
private final ParserFactory parserFactory; private final ParserFactory parserFactory;
public ParsingApplicationFactoryImpl() throws Exception { public ParsingApplicationFactoryImpl() throws Exception {
jsonSchemaValidator = new SiembolJsonSchemaValidator(ParsingApplicationsDto.class); jsonSchemaValidator = new SiembolJsonSchemaValidator(ParsingApplicationsDto.class);
parserFactory = ParserFactoryImpl.createParserFactory(); parserFactory = ParserFactoryImpl.createParserFactory();
} }
@@ -111,62 +115,54 @@ public class ParsingApplicationFactoryImpl implements ParsingApplicationFactory
public ParsingApplicationFactoryResult validateConfigurations(String parserConfigurations) { public ParsingApplicationFactoryResult validateConfigurations(String parserConfigurations) {
ParsingApplicationFactoryAttributes attributes = new ParsingApplicationFactoryAttributes(); ParsingApplicationFactoryAttributes attributes = new ParsingApplicationFactoryAttributes();
try { try {
SiembolResult validationResult = jsonSchemaValidator.validate(parserConfigurations); SiembolResult validationResult = jsonSchemaValidator.validate(parserConfigurations);
if (validationResult.getStatusCode() != SiembolResult.StatusCode.OK) { if (validationResult.getStatusCode() != SiembolResult.StatusCode.OK) {
attributes.setMessage(validationResult.getAttributes().getMessage()); attributes.setMessage(validationResult.getAttributes().getMessage());
return new ParsingApplicationFactoryResult(ERROR, attributes); return new ParsingApplicationFactoryResult(ERROR, attributes);
} }
return new ParsingApplicationFactoryResult(OK, attributes); return new ParsingApplicationFactoryResult(OK, attributes);
} catch(Exception e){ } catch (Exception e) {
attributes.setMessage(ExceptionUtils.getStackTrace(e)); attributes.setMessage(ExceptionUtils.getStackTrace(e));
return new ParsingApplicationFactoryResult(ERROR, attributes); return new ParsingApplicationFactoryResult(ERROR, attributes);
} }
} }
private ParsingApplicationParser createParser(ParsingApplicationDto application, private ParsingApplicationParser createSingleParser(String applicationName,
String parserConfigs) throws Exception { Map<String, String> parsersMap,
ParsersConfigDto parsers = JSON_PARSERS_CONFIG_READER.readValue(parserConfigs); ParsingSettingsDto parsingSettings,
Map<String, String> parsersMap = parsers.getParserConfigurations().stream() ParsingApplicationSettingsDto appSettings) throws Exception {
.collect(Collectors.toMap(x -> x.getParserName(), x -> { if (parsingSettings.getSingleParser() == null) {
try { throw new IllegalArgumentException(MISSING_SINGLE_PARSER);
return JSON_PARSER_CONFIG_WRITER.writeValueAsString(x);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}));
ParsingApplicationSettingsDto appSettings = application.getParsingApplicationSettingsDto();
ParsingSettingsDto parsingSettingsDto = application.getParsingSettingsDto();
if (appSettings.getApplicationType() == SINGLE_PARSER) {
if (parsingSettingsDto.getSingleParser() == null) {
throw new IllegalArgumentException(MISSING_SINGLE_PARSER);
}
if (!parsersMap.containsKey(parsingSettingsDto.getSingleParser().getParserName())) {
String errorMsg = String.format(MISSING_PARSER_MSG,
parsingSettingsDto.getSingleParser().getParserName());
throw new IllegalArgumentException(errorMsg);
}
return SingleApplicationParser.builder()
.parser(parsingSettingsDto.getSingleParser().getOutputTopic(),
new SerializableSiembolParser(parsersMap
.get(parsingSettingsDto.getSingleParser().getParserName())))
.parseMetadata(appSettings.getParseMetadata())
.addGuidToMessages(true)
.errorTopic(appSettings.getErrorTopic())
.metadataPrefix(appSettings.getMetadataPrefix())
.name(application.getParsingApplicationName())
.build();
} }
if (parsingSettingsDto.getRoutingParser() == null) { if (!parsersMap.containsKey(parsingSettings.getSingleParser().getParserName())) {
String errorMsg = String.format(MISSING_PARSER_MSG,
parsingSettings.getSingleParser().getParserName());
throw new IllegalArgumentException(errorMsg);
}
return SingleApplicationParser.builder()
.parser(parsingSettings.getSingleParser().getOutputTopic(),
new SerializableSiembolParser(parsersMap
.get(parsingSettings.getSingleParser().getParserName())))
.parseMetadata(appSettings.getParseMetadata())
.addGuidToMessages(true)
.errorTopic(appSettings.getErrorTopic())
.metadataPrefix(appSettings.getMetadataPrefix())
.name(applicationName)
.build();
}
private ParsingApplicationParser createRouterParser(String applicationName,
Map<String, String> parsersMap,
ParsingSettingsDto parsingSettings,
ParsingApplicationSettingsDto appSettings) throws Exception {
if (parsingSettings.getRoutingParser() == null) {
throw new IllegalArgumentException(MISSING_ROUTING_PARSER); throw new IllegalArgumentException(MISSING_ROUTING_PARSER);
} }
RoutingParserDto routingParser = parsingSettingsDto.getRoutingParser(); RoutingParserDto routingParser = parsingSettings.getRoutingParser();
if (!parsersMap.containsKey(routingParser.getRouterParserName())) { if (!parsersMap.containsKey(routingParser.getRouterParserName())) {
String errorMsg = String.format(MISSING_PARSER_MSG, routingParser.getRouterParserName()); String errorMsg = String.format(MISSING_PARSER_MSG, routingParser.getRouterParserName());
throw new IllegalArgumentException(errorMsg); throw new IllegalArgumentException(errorMsg);
@@ -200,17 +196,121 @@ public class ParsingApplicationFactoryImpl implements ParsingApplicationFactory
.parseMetadata(appSettings.getParseMetadata()) .parseMetadata(appSettings.getParseMetadata())
.addGuidToMessages(true) .addGuidToMessages(true)
.metadataPrefix(appSettings.getMetadataPrefix()) .metadataPrefix(appSettings.getMetadataPrefix())
.name(application.getParsingApplicationName()); .name(applicationName);
return builder.build(); return builder.build();
} }
private ParsingApplicationParser createHeaderRouterParser(String applicationName,
Map<String, String> parsersMap,
ParsingSettingsDto parsingSettings,
ParsingApplicationSettingsDto appSettings) throws Exception {
var headerRoutingParser = parsingSettings.getHeaderRoutingParserDto();
if (headerRoutingParser == null) {
throw new IllegalArgumentException(MISSING_HEADER_ROUTING_PARSER);
}
if (!parsersMap.containsKey(headerRoutingParser.getDefaultParser().getParserName())) {
String errorMsg = String.format(MISSING_PARSER_MSG, headerRoutingParser.getDefaultParser().getParserName());
throw new IllegalArgumentException(errorMsg);
}
var builder = SourceRoutingApplicationParser.builder()
.defaultParser(headerRoutingParser.getDefaultParser().getOutputTopic(),
new SerializableSiembolParser(
parsersMap.get(headerRoutingParser.getDefaultParser().getParserName())));
for (var parser : headerRoutingParser.getParsers()) {
if (!parsersMap.containsKey(parser.getParserProperties().getParserName())) {
String errorMsg = String.format(MISSING_PARSER_MSG, parser.getParserProperties().getParserName());
throw new IllegalArgumentException(errorMsg);
}
builder.addParser(parser.getSourceHeaderValue(),
parser.getParserProperties().getOutputTopic(),
new SerializableSiembolParser(parsersMap.get(parser.getParserProperties().getParserName())));
}
builder.errorTopic(appSettings.getErrorTopic())
.parseMetadata(appSettings.getParseMetadata())
.addGuidToMessages(true)
.metadataPrefix(appSettings.getMetadataPrefix())
.name(applicationName);
return builder.build();
}
private ParsingApplicationParser createTopicRouterParser(String applicationName,
Map<String, String> parsersMap,
ParsingSettingsDto parsingSettings,
ParsingApplicationSettingsDto appSettings) throws Exception {
var topicRoutingParser = parsingSettings.getTopicRoutingParserDto();
if (topicRoutingParser == null) {
throw new IllegalArgumentException(MISSING_TOPIC_ROUTING_PARSER);
}
if (!parsersMap.containsKey(topicRoutingParser.getDefaultParser().getParserName())) {
String errorMsg = String.format(MISSING_PARSER_MSG, topicRoutingParser.getDefaultParser().getParserName());
throw new IllegalArgumentException(errorMsg);
}
var builder = SourceRoutingApplicationParser.builder()
.defaultParser(topicRoutingParser.getDefaultParser().getOutputTopic(),
new SerializableSiembolParser(
parsersMap.get(topicRoutingParser.getDefaultParser().getParserName())));
for (var parser : topicRoutingParser.getParsers()) {
if (!parsersMap.containsKey(parser.getParserProperties().getParserName())) {
String errorMsg = String.format(MISSING_PARSER_MSG, parser.getParserProperties().getParserName());
throw new IllegalArgumentException(errorMsg);
}
builder.addParser(parser.getTopicName(),
parser.getParserProperties().getOutputTopic(),
new SerializableSiembolParser(parsersMap.get(parser.getParserProperties().getParserName())));
}
builder.errorTopic(appSettings.getErrorTopic())
.parseMetadata(appSettings.getParseMetadata())
.addGuidToMessages(true)
.metadataPrefix(appSettings.getMetadataPrefix())
.name(applicationName);
return builder.build();
}
private ParsingApplicationParser createParser(ParsingApplicationDto application,
String parserConfigs) throws Exception {
ParsersConfigDto parsers = JSON_PARSERS_CONFIG_READER.readValue(parserConfigs);
Map<String, String> parsersMap = parsers.getParserConfigurations().stream()
.collect(Collectors.toMap(ParserConfigDto::getParserName, x -> {
try {
return JSON_PARSER_CONFIG_WRITER.writeValueAsString(x);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}));
var appSettings = application.getParsingApplicationSettingsDto();
var parsingSettings = application.getParsingSettingsDto();
switch (appSettings.getApplicationType()) {
case SINGLE_PARSER:
return createSingleParser(application.getParsingApplicationName(), parsersMap, parsingSettings, appSettings);
case ROUTER_PARSING:
return createRouterParser(application.getParsingApplicationName(), parsersMap, parsingSettings, appSettings);
case TOPIC_ROUTING_PARSING:
return createTopicRouterParser(application.getParsingApplicationName(), parsersMap, parsingSettings, appSettings);
case HEADER_ROUTING_PARSING:
return createHeaderRouterParser(application.getParsingApplicationName(), parsersMap, parsingSettings, appSettings);
default:
throw new IllegalArgumentException(String.format(UNSUPPORTED_PARSER_APP_TYPE, appSettings.getApplicationType()));
}
}
private String wrapParserApplicationToParserApplications(String configStr) throws IOException { private String wrapParserApplicationToParserApplications(String configStr) throws IOException {
ParsingApplicationDto application = JSON_PARSING_APP_READER.readValue(configStr); ParsingApplicationDto application = JSON_PARSING_APP_READER.readValue(configStr);
ParsingApplicationsDto applications = new ParsingApplicationsDto(); ParsingApplicationsDto applications = new ParsingApplicationsDto();
applications.setParsingApplicationsVersion(application.getParsingApplicationVersion()); applications.setParsingApplicationsVersion(application.getParsingApplicationVersion());
applications.setParsingApplications(Arrays.asList(application)); applications.setParsingApplications(List.of(application));
return JSON_PARSING_APPS_WRITER.writeValueAsString(applications); return JSON_PARSING_APPS_WRITER.writeValueAsString(applications);
} }
@@ -221,5 +321,11 @@ public class ParsingApplicationFactoryImpl implements ParsingApplicationFactory
attributes.setOutputParallelism(application.getParsingApplicationSettingsDto().getOutputParallelism()); attributes.setOutputParallelism(application.getParsingApplicationSettingsDto().getOutputParallelism());
attributes.setParsingParallelism(application.getParsingApplicationSettingsDto().getParsingParallelism()); attributes.setParsingParallelism(application.getParsingApplicationSettingsDto().getParsingParallelism());
attributes.setInputTopics(application.getParsingApplicationSettingsDto().getInputTopics()); attributes.setInputTopics(application.getParsingApplicationSettingsDto().getInputTopics());
attributes.setApplicationType(application.getParsingApplicationSettingsDto().getApplicationType());
if (ParsingApplicationTypeDto.HEADER_ROUTING_PARSING.equals(
application.getParsingApplicationSettingsDto().getApplicationType())) {
attributes.setSourceHeaderName(
application.getParsingSettingsDto().getHeaderRoutingParserDto().getHeaderName());
}
} }
} }

View File

@@ -0,0 +1,46 @@
package uk.co.gresearch.siembol.parsers.application.model;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.github.reinert.jjschema.Attributes;
import java.util.List;
@Attributes(title = "header routing parser", description = "The specification for the topic routing parser")
public class HeaderRoutingParserDto {
@JsonProperty("header_name")
@Attributes(description = "The name of the header used for routing", required = true)
private String headerName;
@JsonProperty("parsers")
@Attributes(description = "The list of parsers for the further parsing", required = true, minItems = 1)
private List<HeaderRoutingParserPropertiesDto> parsers;
@JsonProperty("default_parser")
@Attributes(title = "default parser",
description = "The parser that should be used if no other parsers will be selected", required = true)
private ParserPropertiesDto defaultParser;
public String getHeaderName() {
return headerName;
}
public void setHeaderName(String headerName) {
this.headerName = headerName;
}
public List<HeaderRoutingParserPropertiesDto> getParsers() {
return parsers;
}
public void setParsers(List<HeaderRoutingParserPropertiesDto> parsers) {
this.parsers = parsers;
}
public ParserPropertiesDto getDefaultParser() {
return defaultParser;
}
public void setDefaultParser(ParserPropertiesDto defaultParser) {
this.defaultParser = defaultParser;
}
}

View File

@@ -0,0 +1,31 @@
package uk.co.gresearch.siembol.parsers.application.model;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.github.reinert.jjschema.Attributes;
@Attributes(title = "header routing parser properties", description = "The properties of header routing parser")
public class HeaderRoutingParserPropertiesDto {
@JsonProperty("source_header_value")
@Attributes(description = "The value in the header for selecting the parser", required = true)
private String sourceHeaderValue;
@JsonProperty("parser_properties")
@Attributes(description = "The properties of the selected parser", required = true)
private ParserPropertiesDto parserProperties;
public String getSourceHeaderValue() {
return sourceHeaderValue;
}
public void setSourceHeaderValue(String sourceHeaderValue) {
this.sourceHeaderValue = sourceHeaderValue;
}
public ParserPropertiesDto getParserProperties() {
return parserProperties;
}
public void setParserProperties(ParserPropertiesDto parserProperties) {
this.parserProperties = parserProperties;
}
}

View File

@@ -6,7 +6,9 @@ import com.github.reinert.jjschema.Attributes;
@Attributes(title = "parsing application type", description = "The type of parsing application") @Attributes(title = "parsing application type", description = "The type of parsing application")
public enum ParsingApplicationTypeDto { public enum ParsingApplicationTypeDto {
@JsonProperty("router_parsing") ROUTER_PARSING("router_parsing"), @JsonProperty("router_parsing") ROUTER_PARSING("router_parsing"),
@JsonProperty("single_parser") SINGLE_PARSER("single_parser"); @JsonProperty("single_parser") SINGLE_PARSER("single_parser"),
@JsonProperty("topic_routing_parsing") TOPIC_ROUTING_PARSING("topic_routing_parsing"),
@JsonProperty("header_routing_parsing") HEADER_ROUTING_PARSING("header_routing_parsing");
private final String name; private final String name;
ParsingApplicationTypeDto(String name) { ParsingApplicationTypeDto(String name) {

View File

@@ -13,6 +13,15 @@ public class ParsingSettingsDto {
@Attributes(title = "single parser", description = "The settings of the single parser parsing", minItems = 1) @Attributes(title = "single parser", description = "The settings of the single parser parsing", minItems = 1)
private ParserPropertiesDto singleParser; private ParserPropertiesDto singleParser;
@JsonProperty("topic_routing_parser")
@Attributes(description = "The settings of the topic name routing parsing")
private TopicRoutingParserDto topicRoutingParserDto;
@JsonProperty("header_routing_parser")
@Attributes(description = "The settings of the header routing parsing")
private HeaderRoutingParserDto headerRoutingParserDto;
public RoutingParserDto getRoutingParser() { public RoutingParserDto getRoutingParser() {
return routingParser; return routingParser;
} }
@@ -28,4 +37,20 @@ public class ParsingSettingsDto {
public void setSingleParser(ParserPropertiesDto singleParser) { public void setSingleParser(ParserPropertiesDto singleParser) {
this.singleParser = singleParser; this.singleParser = singleParser;
} }
public TopicRoutingParserDto getTopicRoutingParserDto() {
return topicRoutingParserDto;
}
public void setTopicRoutingParserDto(TopicRoutingParserDto topicRoutingParserDto) {
this.topicRoutingParserDto = topicRoutingParserDto;
}
public HeaderRoutingParserDto getHeaderRoutingParserDto() {
return headerRoutingParserDto;
}
public void setHeaderRoutingParserDto(HeaderRoutingParserDto headerRoutingParserDto) {
this.headerRoutingParserDto = headerRoutingParserDto;
}
} }

View File

@@ -0,0 +1,34 @@
package uk.co.gresearch.siembol.parsers.application.model;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.github.reinert.jjschema.Attributes;
import java.util.List;
@Attributes(title = "topic routing parser", description = "The specification for the topic routing parser")
public class TopicRoutingParserDto {
@JsonProperty("parsers")
@Attributes(description = "The list of parsers for the further parsing", required = true, minItems = 1)
private List<TopicRoutingParserPropertiesDto> parsers;
@JsonProperty("default_parser")
@Attributes(title = "default parser",
description = "The parser that should be used if no other parsers will be selected", required = true)
private ParserPropertiesDto defaultParser;
public List<TopicRoutingParserPropertiesDto> getParsers() {
return parsers;
}
public void setParsers(List<TopicRoutingParserPropertiesDto> parsers) {
this.parsers = parsers;
}
public ParserPropertiesDto getDefaultParser() {
return defaultParser;
}
public void setDefaultParser(ParserPropertiesDto defaultParser) {
this.defaultParser = defaultParser;
}
}

View File

@@ -0,0 +1,31 @@
package uk.co.gresearch.siembol.parsers.application.model;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.github.reinert.jjschema.Attributes;
@Attributes(title = "topic routing parser properties", description = "The properties of topic routing parser")
public class TopicRoutingParserPropertiesDto {
@JsonProperty("topic_name")
@Attributes(description = "The name of the topic for selecting the parser", required = true)
private String topicName;
@JsonProperty("parser_properties")
@Attributes(description = "The properties of the selected parser", required = true)
private ParserPropertiesDto parserProperties;
public String getTopicName() {
return topicName;
}
public void setTopicName(String topicName) {
this.topicName = topicName;
}
public ParserPropertiesDto getParserProperties() {
return parserProperties;
}
public void setParserProperties(ParserPropertiesDto parserProperties) {
this.parserProperties = parserProperties;
}
}

View File

@@ -36,6 +36,7 @@ public abstract class ParsingApplicationParser implements Serializable {
private static final String ERROR_MESSAGE = "Exception during parsing, parsing_app: {} message: {}, " + private static final String ERROR_MESSAGE = "Exception during parsing, parsing_app: {} message: {}, " +
"metadata: {}, exception: {}"; "metadata: {}, exception: {}";
private static final String MISSING_ARGUMENTS = "Missing arguments required for Parsing application parser"; private static final String MISSING_ARGUMENTS = "Missing arguments required for Parsing application parser";
private static final String UNKNOWN_SOURCE = "unknown";
private final EnumSet<Flags> flags; private final EnumSet<Flags> flags;
private final String name; private final String name;
@@ -71,9 +72,13 @@ public abstract class ParsingApplicationParser implements Serializable {
return msg.toString(); return msg.toString();
} }
protected abstract List<ParserResult> parseInternally(String metadata, byte[] message); protected abstract List<ParserResult> parseInternally(String source, String metadata, byte[] message);
public ArrayList<ParsingApplicationResult> parse(String metadata, byte[] message) { public ArrayList<ParsingApplicationResult> parse(String metadata, byte[] message) {
return parse(UNKNOWN_SOURCE, metadata, message);
}
public ArrayList<ParsingApplicationResult> parse(String source, String metadata, byte[] message) {
ArrayList<ParsingApplicationResult> ret = new ArrayList<>(); ArrayList<ParsingApplicationResult> ret = new ArrayList<>();
try { try {
Map<String, Object> metadataObject = flags.contains(Flags.PARSE_METADATA) Map<String, Object> metadataObject = flags.contains(Flags.PARSE_METADATA)
@@ -82,7 +87,7 @@ public abstract class ParsingApplicationParser implements Serializable {
long timestamp = timeProvider.getCurrentTimeInMs(); long timestamp = timeProvider.getCurrentTimeInMs();
for (ParserResult parserResult : parseInternally(metadata, message)) { for (ParserResult parserResult : parseInternally(source, metadata, message)) {
if (parserResult.getException() != null) { if (parserResult.getException() != null) {
ret.add(new ParsingApplicationResult( ret.add(new ParsingApplicationResult(
errorTopic, errorTopic,
@@ -91,7 +96,7 @@ public abstract class ParsingApplicationParser implements Serializable {
} }
List<Map<String, Object>> parsed = parserResult.getParsedMessages(); List<Map<String, Object>> parsed = parserResult.getParsedMessages();
parsed.removeIf(x -> x.isEmpty()); parsed.removeIf(Map::isEmpty);
if (parsed.isEmpty()) { if (parsed.isEmpty()) {
continue; continue;
} }

View File

@@ -33,7 +33,7 @@ public class RoutingParsingApplicationParser extends ParsingApplicationParser {
} }
@Override @Override
protected List<ParserResult> parseInternally(String metadata, byte[] message) { protected List<ParserResult> parseInternally(String source, String metadata, byte[] message) {
List<ParserResult> ret = new ArrayList<>(); List<ParserResult> ret = new ArrayList<>();
ParserResult routerResult = routerParser.parseToResult(metadata, message); ParserResult routerResult = routerParser.parseToResult(metadata, message);
if (routerResult.getException() != null) { if (routerResult.getException() != null) {
@@ -46,7 +46,7 @@ public class RoutingParsingApplicationParser extends ParsingApplicationParser {
if (!parsedMsg.containsKey(routingConditionField) if (!parsedMsg.containsKey(routingConditionField)
|| !parsedMsg.containsKey(routingMessageField)) { || !parsedMsg.containsKey(routingMessageField)) {
String errorMsg = String.format(MISSING_ROUTER_FIELDS, String errorMsg = String.format(MISSING_ROUTER_FIELDS,
routingConditionField, routingMessageField, parsedMsg.toString()); routingConditionField, routingMessageField, parsedMsg);
LOG.debug(errorMsg); LOG.debug(errorMsg);
routerResult.setException(new IllegalStateException(errorMsg)); routerResult.setException(new IllegalStateException(errorMsg));
ret.add(routerResult); ret.add(routerResult);
@@ -77,8 +77,9 @@ public class RoutingParsingApplicationParser extends ParsingApplicationParser {
} }
public static Builder<RoutingParsingApplicationParser> builder() { public static Builder<RoutingParsingApplicationParser> builder() {
return new Builder<RoutingParsingApplicationParser>() { return new Builder<>() {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
@Override @Override
public RoutingParsingApplicationParser build() { public RoutingParsingApplicationParser build() {
if (routerParser == null if (routerParser == null
@@ -123,19 +124,18 @@ public class RoutingParsingApplicationParser extends ParsingApplicationParser {
return this; return this;
} }
public Builder<T> routerParser(SerializableSiembolParser siembolParser) throws Exception { public Builder<T> routerParser(SerializableSiembolParser siembolParser) {
final RouterCondition alwaysMatch = x -> true; final RouterCondition alwaysMatch = x -> true;
this.routerParser = new SiembolParserWrapper(alwaysMatch, siembolParser, null); this.routerParser = new SiembolParserWrapper(alwaysMatch, siembolParser, null);
return this; return this;
} }
public Builder<T> defaultParser(String topic, SerializableSiembolParser siembolParser) throws Exception { public Builder<T> defaultParser(String topic, SerializableSiembolParser siembolParser) {
final RouterCondition alwaysMatch = x -> true; defaultParser = new SiembolParserWrapper(siembolParser, topic);
defaultParser = new SiembolParserWrapper(alwaysMatch, siembolParser, topic);
return this; return this;
} }
public Builder<T> addParser(String topic, SerializableSiembolParser siembolParser, String pattern) throws Exception { public Builder<T> addParser(String topic, SerializableSiembolParser siembolParser, String pattern) {
final Pattern conditionPattern = Pattern.compile(pattern, Pattern.DOTALL); final Pattern conditionPattern = Pattern.compile(pattern, Pattern.DOTALL);
final RouterCondition condition = x -> conditionPattern.matcher(x).matches(); final RouterCondition condition = x -> conditionPattern.matcher(x).matches();
parsers.add(new SiembolParserWrapper(condition, siembolParser, topic)); parsers.add(new SiembolParserWrapper(condition, siembolParser, topic));

View File

@@ -20,6 +20,10 @@ public class SiembolParserWrapper implements SiembolParser, Serializable {
this.topic = topic; this.topic = topic;
} }
public SiembolParserWrapper(SerializableSiembolParser parser, String topic) {
this(x -> true, parser, topic);
}
boolean checkCondition(String message) { boolean checkCondition(String message) {
return condition.apply(message); return condition.apply(message);
} }

View File

@@ -22,15 +22,16 @@ public class SingleApplicationParser extends ParsingApplicationParser {
} }
@Override @Override
protected List<ParserResult> parseInternally(String metadata, byte[] message) { protected List<ParserResult> parseInternally(String source, String metadata, byte[] message) {
List<ParserResult> ret = new ArrayList<>(); List<ParserResult> ret = new ArrayList<>();
ret.add(parser.parseToResult(metadata, message)); ret.add(parser.parseToResult(metadata, message));
return ret; return ret;
} }
public static Builder<SingleApplicationParser> builder() { public static Builder<SingleApplicationParser> builder() {
return new Builder<SingleApplicationParser>() { return new Builder<>() {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
@Override @Override
public SingleApplicationParser build() { public SingleApplicationParser build() {
if (parser == null) { if (parser == null) {
@@ -49,8 +50,7 @@ public class SingleApplicationParser extends ParsingApplicationParser {
protected SiembolParserWrapper parser; protected SiembolParserWrapper parser;
public Builder<T> parser(String topic, SerializableSiembolParser siembolParser) throws Exception { public Builder<T> parser(String topic, SerializableSiembolParser siembolParser) throws Exception {
final RouterCondition alwaysMatch = x -> true; parser = new SiembolParserWrapper(siembolParser, topic);
parser = new SiembolParserWrapper(alwaysMatch, siembolParser, topic);
return this; return this;
} }

View File

@@ -0,0 +1,76 @@
package uk.co.gresearch.siembol.parsers.application.parsing;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import uk.co.gresearch.siembol.parsers.common.ParserResult;
import uk.co.gresearch.siembol.parsers.common.SerializableSiembolParser;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
public class SourceRoutingApplicationParser extends ParsingApplicationParser {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory
.getLogger(MethodHandles.lookup().lookupClass());
static final String MISSING_ARGUMENTS = "Missing arguments in source routing application parser";
private final HashMap<String, SiembolParserWrapper> sourceToParserMap;
private final SiembolParserWrapper defaultParser;
protected SourceRoutingApplicationParser(Builder<?> builder) {
super(builder);
this.sourceToParserMap = builder.sourceToParserMap;
this.defaultParser = builder.defaultParser;
}
@Override
protected List<ParserResult> parseInternally(String source, String metadata, byte[] message) {
List<ParserResult> ret = new ArrayList<>();
SiembolParserWrapper parser = sourceToParserMap.getOrDefault(source, defaultParser);
ret.add(parser.parseToResult(metadata, message));
return ret;
}
public static Builder<SourceRoutingApplicationParser> builder() {
return new Builder<>() {
private static final long serialVersionUID = 1L;
@Override
public SourceRoutingApplicationParser build() {
if (defaultParser == null || sourceToParserMap == null) {
LOG.error(MISSING_ARGUMENTS);
throw new IllegalArgumentException(MISSING_ARGUMENTS);
}
return new SourceRoutingApplicationParser(this);
}
};
}
public static abstract class Builder<T extends SourceRoutingApplicationParser> extends
ParsingApplicationParser.Builder<T> {
private static final long serialVersionUID = 1L;
static final String DUPLICATE_SOURCE_MSG = "Parser with source %s already exists";
protected HashMap<String, SiembolParserWrapper> sourceToParserMap = new HashMap<>();
protected SiembolParserWrapper defaultParser;
public Builder<T> defaultParser(String topic, SerializableSiembolParser siembolParser) {
defaultParser = new SiembolParserWrapper(siembolParser, topic);
return this;
}
public Builder<T> addParser(String source, String topic, SerializableSiembolParser siembolParser) {
if (sourceToParserMap.containsKey(source)) {
throw new IllegalArgumentException(String.format(DUPLICATE_SOURCE_MSG, source));
}
var parser = new SiembolParserWrapper(siembolParser, topic);
sourceToParserMap.put(source, parser);
return this;
}
public abstract T build();
}
}

View File

@@ -2,6 +2,7 @@ package uk.co.gresearch.siembol.parsers.application.factory;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import uk.co.gresearch.siembol.parsers.application.model.ParsingApplicationTypeDto;
public class ParsingApplicationFactoryImplTest { public class ParsingApplicationFactoryImplTest {
private final ParsingApplicationFactory factory; private final ParsingApplicationFactory factory;
@@ -77,6 +78,96 @@ public class ParsingApplicationFactoryImplTest {
} }
"""; """;
private final String topicRoutingApplicationParser = """
{
"parsing_app_name": "test",
"parsing_app_version": 1,
"parsing_app_author": "dummy",
"parsing_app_description": "Description of parser application",
"parsing_app_settings": {
"input_topics": [
"secret",
"public"
],
"error_topic": "error",
"input_parallelism": 1,
"parsing_parallelism": 2,
"output_parallelism": 3,
"parsing_app_type": "topic_routing_parsing"
},
"parsing_settings": {
"topic_routing_parser" : {
"default_parser": {
"parser_name": "default",
"output_topic": "output_default"
},
"parsers": [
{
"topic_name": "secret",
"parser_properties": {
"parser_name": "single",
"output_topic": "out_secret"
}
},
{
"topic_name": "public",
"parser_properties": {
"parser_name": "single2",
"output_topic": "out_secret"
}
}
]
}
}
}
}
""";
private final String headerRoutingApplicationParser = """
{
"parsing_app_name": "test",
"parsing_app_version": 1,
"parsing_app_author": "dummy",
"parsing_app_description": "Description of parser application",
"parsing_app_settings": {
"input_topics": [
"secret",
"public"
],
"error_topic": "error",
"input_parallelism": 1,
"parsing_parallelism": 2,
"output_parallelism": 3,
"parsing_app_type": "header_routing_parsing"
},
"parsing_settings": {
"header_routing_parser" : {
"header_name" : "source_header",
"default_parser": {
"parser_name": "default",
"output_topic": "output_default"
},
"parsers": [
{
"source_header_value" : "secret",
"parser_properties": {
"parser_name": "single",
"output_topic": "out_secret"
}
},
{
"source_header_value": "public",
"parser_properties": {
"parser_name": "single2",
"output_topic": "out_secret"
}
}
]
}
}
}
}
""";
private final String testParsersConfigs = """ private final String testParsersConfigs = """
{ {
@@ -91,6 +182,15 @@ public class ParsingApplicationFactoryImplTest {
"parser_type": "generic" "parser_type": "generic"
} }
}, },
{
"parser_description": "for testing single app parser",
"parser_version": 2,
"parser_name": "single2",
"parser_author": "dummy",
"parser_attributes": {
"parser_type": "generic"
}
},
{ {
"parser_description": "for testing routing app parser", "parser_description": "for testing routing app parser",
"parser_version": 2, "parser_version": 2,
@@ -152,6 +252,8 @@ public class ParsingApplicationFactoryImplTest {
Assert.assertEquals(2, result.getAttributes().getParsingParallelism().intValue()); Assert.assertEquals(2, result.getAttributes().getParsingParallelism().intValue());
Assert.assertEquals(3, result.getAttributes().getOutputParallelism().intValue()); Assert.assertEquals(3, result.getAttributes().getOutputParallelism().intValue());
Assert.assertEquals("secret", result.getAttributes().getInputTopics().get(0)); Assert.assertEquals("secret", result.getAttributes().getInputTopics().get(0));
Assert.assertEquals(ParsingApplicationTypeDto.SINGLE_PARSER,
result.getAttributes().getApplicationType());
Assert.assertNotNull(result.getAttributes().getApplicationParser()); Assert.assertNotNull(result.getAttributes().getApplicationParser());
} }
@@ -203,6 +305,8 @@ public class ParsingApplicationFactoryImplTest {
Assert.assertEquals(3, result.getAttributes().getOutputParallelism().intValue()); Assert.assertEquals(3, result.getAttributes().getOutputParallelism().intValue());
Assert.assertEquals("secret", result.getAttributes().getInputTopics().get(0)); Assert.assertEquals("secret", result.getAttributes().getInputTopics().get(0));
Assert.assertNotNull(result.getAttributes().getApplicationParser()); Assert.assertNotNull(result.getAttributes().getApplicationParser());
Assert.assertEquals(ParsingApplicationTypeDto.ROUTER_PARSING,
result.getAttributes().getApplicationType());
} }
@Test @Test
@@ -244,4 +348,113 @@ public class ParsingApplicationFactoryImplTest {
Assert.assertSame(ParsingApplicationFactoryResult.StatusCode.ERROR, result.getStatusCode()); Assert.assertSame(ParsingApplicationFactoryResult.StatusCode.ERROR, result.getStatusCode());
Assert.assertTrue(result.getAttributes().getMessage().contains("Missing parser: single")); Assert.assertTrue(result.getAttributes().getMessage().contains("Missing parser: single"));
} }
@Test
public void validateSourceHeaderRoutingGood() {
ParsingApplicationFactoryResult result = factory.validateConfiguration(headerRoutingApplicationParser);
Assert.assertSame(ParsingApplicationFactoryResult.StatusCode.OK, result.getStatusCode());
}
@Test
public void validateHeaderRoutingInvalid() {
var invalid = headerRoutingApplicationParser.replace("\"default_parser\"",
"\"typo\"");
ParsingApplicationFactoryResult result = factory.validateConfiguration(invalid);
Assert.assertSame(ParsingApplicationFactoryResult.StatusCode.ERROR, result.getStatusCode());
}
@Test
public void createHeaderRoutingGood() {
ParsingApplicationFactoryResult result = factory.create(headerRoutingApplicationParser, testParsersConfigs);
Assert.assertSame(ParsingApplicationFactoryResult.StatusCode.OK, result.getStatusCode());
Assert.assertEquals("test", result.getAttributes().getName());
Assert.assertEquals(1, result.getAttributes().getInputParallelism().intValue());
Assert.assertEquals(2, result.getAttributes().getParsingParallelism().intValue());
Assert.assertEquals(3, result.getAttributes().getOutputParallelism().intValue());
Assert.assertEquals(2, result.getAttributes().getInputTopics().size());
Assert.assertEquals("secret", result.getAttributes().getInputTopics().get(0));
Assert.assertEquals("public", result.getAttributes().getInputTopics().get(1));
Assert.assertEquals("source_header", result.getAttributes().getSourceHeaderName());
Assert.assertEquals(ParsingApplicationTypeDto.HEADER_ROUTING_PARSING,
result.getAttributes().getApplicationType());
Assert.assertNotNull(result.getAttributes().getApplicationParser());
}
@Test
public void createHeaderRoutingMissingDefault() {
var missingDefault = headerRoutingApplicationParser.replace("\"default\"",
"\"typo\"");
ParsingApplicationFactoryResult result = factory.create(missingDefault, testParsersConfigs);
Assert.assertSame(ParsingApplicationFactoryResult.StatusCode.ERROR, result.getStatusCode());
}
@Test
public void createHeaderRoutingMissingRouted() {
var missingRouted = headerRoutingApplicationParser.replace("\"single\"",
"\"typo\"");
ParsingApplicationFactoryResult result = factory.create(missingRouted, testParsersConfigs);
Assert.assertSame(ParsingApplicationFactoryResult.StatusCode.ERROR, result.getStatusCode());
}
@Test
public void createHeaderRoutingMissingRouted2() {
var missingRouted = headerRoutingApplicationParser.replace("\"single2\"",
"\"typo\"");
ParsingApplicationFactoryResult result = factory.create(missingRouted, testParsersConfigs);
Assert.assertSame(ParsingApplicationFactoryResult.StatusCode.ERROR, result.getStatusCode());
}
@Test
public void validateTopicRoutingGood() {
ParsingApplicationFactoryResult result = factory.validateConfiguration(topicRoutingApplicationParser);
Assert.assertSame(ParsingApplicationFactoryResult.StatusCode.OK, result.getStatusCode());
}
@Test
public void validateTopicRoutingInvalid() {
var invalid = topicRoutingApplicationParser.replace("\"default_parser\"",
"\"typo\"");
ParsingApplicationFactoryResult result = factory.validateConfiguration(invalid);
Assert.assertSame(ParsingApplicationFactoryResult.StatusCode.ERROR, result.getStatusCode());
}
@Test
public void createTopicRoutingGood() {
ParsingApplicationFactoryResult result = factory.create(topicRoutingApplicationParser, testParsersConfigs);
Assert.assertSame(ParsingApplicationFactoryResult.StatusCode.OK, result.getStatusCode());
Assert.assertEquals("test", result.getAttributes().getName());
Assert.assertEquals(1, result.getAttributes().getInputParallelism().intValue());
Assert.assertEquals(2, result.getAttributes().getParsingParallelism().intValue());
Assert.assertEquals(3, result.getAttributes().getOutputParallelism().intValue());
Assert.assertEquals(2, result.getAttributes().getInputTopics().size());
Assert.assertEquals("secret", result.getAttributes().getInputTopics().get(0));
Assert.assertEquals("public", result.getAttributes().getInputTopics().get(1));
Assert.assertEquals(ParsingApplicationTypeDto.TOPIC_ROUTING_PARSING,
result.getAttributes().getApplicationType());
Assert.assertNotNull(result.getAttributes().getApplicationParser());
}
@Test
public void createTopicRoutingMissingDefault() {
var missingDefault = topicRoutingApplicationParser.replace("\"default\"",
"\"typo\"");
ParsingApplicationFactoryResult result = factory.create(missingDefault, testParsersConfigs);
Assert.assertSame(ParsingApplicationFactoryResult.StatusCode.ERROR, result.getStatusCode());
}
@Test
public void createTopicRoutingMissingRouted() {
var missingRouted = topicRoutingApplicationParser.replace("\"single\"",
"\"typo\"");
ParsingApplicationFactoryResult result = factory.create(missingRouted, testParsersConfigs);
Assert.assertSame(ParsingApplicationFactoryResult.StatusCode.ERROR, result.getStatusCode());
}
@Test
public void createTopicRoutingMissingRouted2() {
var missingRouted = topicRoutingApplicationParser.replace("\"single2\"",
"\"typo\"");
ParsingApplicationFactoryResult result = factory.create(missingRouted, testParsersConfigs);
Assert.assertSame(ParsingApplicationFactoryResult.StatusCode.ERROR, result.getStatusCode());
}
} }

View File

@@ -126,7 +126,7 @@ public class RoutingParsingApplicationParserTest {
when(routerParser.parseToResult(metadata, input)).thenReturn(routerParserResult); when(routerParser.parseToResult(metadata, input)).thenReturn(routerParserResult);
when(defaultParser.parseToResult(metadata, "dummy".getBytes())).thenReturn(routedParserResult1); when(defaultParser.parseToResult(metadata, "dummy".getBytes())).thenReturn(routedParserResult1);
List<ParsingApplicationResult> result = appParser.parse(metadata, input); List<ParsingApplicationResult> result = appParser.parse( metadata, input);
verify(timeProvider, times(1)).getCurrentTimeInMs(); verify(timeProvider, times(1)).getCurrentTimeInMs();
verify(routerParser, times(1)).parseToResult(metadata, input); verify(routerParser, times(1)).parseToResult(metadata, input);
verify(defaultParser, times(1)).parseToResult(metadata, "dummy".getBytes()); verify(defaultParser, times(1)).parseToResult(metadata, "dummy".getBytes());
@@ -158,7 +158,7 @@ public class RoutingParsingApplicationParserTest {
when(routerParser.parseToResult(metadata, input)).thenReturn(routerParserResult); when(routerParser.parseToResult(metadata, input)).thenReturn(routerParserResult);
List<ParsingApplicationResult> result = appParser.parse(metadata, input); List<ParsingApplicationResult> result = appParser.parse( metadata, input);
verify(timeProvider, times(1)).getCurrentTimeInMs(); verify(timeProvider, times(1)).getCurrentTimeInMs();
verify(routerParser, times(1)).parseToResult(metadata, input); verify(routerParser, times(1)).parseToResult(metadata, input);
@@ -182,7 +182,7 @@ public class RoutingParsingApplicationParserTest {
when(routerParser.parseToResult(metadata, input)).thenReturn(routerParserResult); when(routerParser.parseToResult(metadata, input)).thenReturn(routerParserResult);
when(defaultParser.parseToResult(metadata, "dummy".getBytes())).thenReturn(routedParserResult1); when(defaultParser.parseToResult(metadata, "dummy".getBytes())).thenReturn(routedParserResult1);
List<ParsingApplicationResult> result = appParser.parse(metadata, input); List<ParsingApplicationResult> result = appParser.parse( metadata, input);
verify(timeProvider, times(1)).getCurrentTimeInMs(); verify(timeProvider, times(1)).getCurrentTimeInMs();
verify(routerParser, times(1)).parseToResult(metadata, input); verify(routerParser, times(1)).parseToResult(metadata, input);
verify(defaultParser, times(1)).parseToResult(metadata, "dummy".getBytes()); verify(defaultParser, times(1)).parseToResult(metadata, "dummy".getBytes());
@@ -215,7 +215,7 @@ public class RoutingParsingApplicationParserTest {
routerParserResult.setException(new IllegalStateException("test_exception")); routerParserResult.setException(new IllegalStateException("test_exception"));
when(routerParser.parseToResult(metadata, input)).thenReturn(routerParserResult); when(routerParser.parseToResult(metadata, input)).thenReturn(routerParserResult);
List<ParsingApplicationResult> result = appParser.parse(metadata, input); List<ParsingApplicationResult> result = appParser.parse( metadata, input);
verify(timeProvider, times(1)).getCurrentTimeInMs(); verify(timeProvider, times(1)).getCurrentTimeInMs();
verify(routerParser, times(1)).parseToResult(metadata, input); verify(routerParser, times(1)).parseToResult(metadata, input);
Assert.assertEquals(1, result.size()); Assert.assertEquals(1, result.size());
@@ -242,7 +242,7 @@ public class RoutingParsingApplicationParserTest {
routerParserResult.getParsedMessages().get(1).remove(routingMessageField); routerParserResult.getParsedMessages().get(1).remove(routingMessageField);
when(routerParser.parseToResult(metadata, input)).thenReturn(routerParserResult); when(routerParser.parseToResult(metadata, input)).thenReturn(routerParserResult);
List<ParsingApplicationResult> result = appParser.parse(metadata, input); List<ParsingApplicationResult> result = appParser.parse( metadata, input);
verify(routerParser, times(1)).parseToResult(metadata, input); verify(routerParser, times(1)).parseToResult(metadata, input);
Assert.assertEquals(errorTopic, result.get(0).getTopic()); Assert.assertEquals(errorTopic, result.get(0).getTopic());
@@ -273,7 +273,7 @@ public class RoutingParsingApplicationParserTest {
when(routerParser.parseToResult(metadata, input)).thenReturn(routerParserResult); when(routerParser.parseToResult(metadata, input)).thenReturn(routerParserResult);
when(defaultParser.parseToResult(metadata, "dummy".getBytes())).thenReturn(routedParserResult1); when(defaultParser.parseToResult(metadata, "dummy".getBytes())).thenReturn(routedParserResult1);
List<ParsingApplicationResult> result = appParser.parse(metadata, input); List<ParsingApplicationResult> result = appParser.parse( metadata, input);
verify(timeProvider, times(1)).getCurrentTimeInMs(); verify(timeProvider, times(1)).getCurrentTimeInMs();
verify(routerParser, times(1)).parseToResult(metadata, input); verify(routerParser, times(1)).parseToResult(metadata, input);
verify(defaultParser, times(1)).parseToResult(metadata, "dummy".getBytes()); verify(defaultParser, times(1)).parseToResult(metadata, "dummy".getBytes());
@@ -307,7 +307,7 @@ public class RoutingParsingApplicationParserTest {
when(routedParser1.parseToResult(metadata, "dummy".getBytes())).thenReturn(routedParserResult1); when(routedParser1.parseToResult(metadata, "dummy".getBytes())).thenReturn(routedParserResult1);
when(routedParser2.parseToResult(metadata, "dummy".getBytes())).thenReturn(routedParserResult2); when(routedParser2.parseToResult(metadata, "dummy".getBytes())).thenReturn(routedParserResult2);
List<ParsingApplicationResult> result = appParser.parse(metadata, input); List<ParsingApplicationResult> result = appParser.parse( metadata, input);
verify(timeProvider, times(1)).getCurrentTimeInMs(); verify(timeProvider, times(1)).getCurrentTimeInMs();
verify(routedParser1, times(1)).parseToResult(metadata, "dummy".getBytes()); verify(routedParser1, times(1)).parseToResult(metadata, "dummy".getBytes());
verify(routedParser2, times(1)).parseToResult(metadata, "dummy".getBytes()); verify(routedParser2, times(1)).parseToResult(metadata, "dummy".getBytes());
@@ -340,7 +340,7 @@ public class RoutingParsingApplicationParserTest {
when(routedParser1.parseToResult(metadata, "dummy".getBytes())).thenReturn(routedParserResult1); when(routedParser1.parseToResult(metadata, "dummy".getBytes())).thenReturn(routedParserResult1);
when(routedParser2.parseToResult(metadata, "dummy".getBytes())).thenReturn(routedParserResult2); when(routedParser2.parseToResult(metadata, "dummy".getBytes())).thenReturn(routedParserResult2);
List<ParsingApplicationResult> result = appParser.parse(metadata, input); List<ParsingApplicationResult> result = appParser.parse( metadata, input);
verify(timeProvider, times(1)).getCurrentTimeInMs(); verify(timeProvider, times(1)).getCurrentTimeInMs();
verify(routedParser1, times(1)).parseToResult(metadata, "dummy".getBytes()); verify(routedParser1, times(1)).parseToResult(metadata, "dummy".getBytes());
verify(routedParser2, times(1)).parseToResult(metadata, "dummy".getBytes()); verify(routedParser2, times(1)).parseToResult(metadata, "dummy".getBytes());
@@ -376,7 +376,7 @@ public class RoutingParsingApplicationParserTest {
when(routedParser1.parseToResult(metadata, "dummy".getBytes())).thenReturn(routedParserResult1); when(routedParser1.parseToResult(metadata, "dummy".getBytes())).thenReturn(routedParserResult1);
when(routedParser2.parseToResult(metadata, "dummy".getBytes())).thenReturn(routedParserResult2); when(routedParser2.parseToResult(metadata, "dummy".getBytes())).thenReturn(routedParserResult2);
List<ParsingApplicationResult> result = appParser.parse(metadata, input); List<ParsingApplicationResult> result = appParser.parse( metadata, input);
verify(timeProvider, times(1)).getCurrentTimeInMs(); verify(timeProvider, times(1)).getCurrentTimeInMs();
verify(routedParser1, times(1)).parseToResult(metadata, "dummy".getBytes()); verify(routedParser1, times(1)).parseToResult(metadata, "dummy".getBytes());
verify(routedParser2, times(1)).parseToResult(metadata, "dummy".getBytes()); verify(routedParser2, times(1)).parseToResult(metadata, "dummy".getBytes());
@@ -414,7 +414,7 @@ public class RoutingParsingApplicationParserTest {
when(routedParser1.parseToResult(metadata, "dummy".getBytes())).thenReturn(routedParserResult1); when(routedParser1.parseToResult(metadata, "dummy".getBytes())).thenReturn(routedParserResult1);
when(routedParser2.parseToResult(metadata, "dummy".getBytes())).thenReturn(routedParserResult2); when(routedParser2.parseToResult(metadata, "dummy".getBytes())).thenReturn(routedParserResult2);
List<ParsingApplicationResult> result = appParser.parse(metadata, input); List<ParsingApplicationResult> result = appParser.parse( metadata, input);
verify(timeProvider, times(1)).getCurrentTimeInMs(); verify(timeProvider, times(1)).getCurrentTimeInMs();
verify(routedParser1, times(1)).parseToResult(metadata, "dummy".getBytes()); verify(routedParser1, times(1)).parseToResult(metadata, "dummy".getBytes());
verify(routedParser2, times(1)).parseToResult(metadata, "dummy".getBytes()); verify(routedParser2, times(1)).parseToResult(metadata, "dummy".getBytes());

View File

@@ -88,7 +88,7 @@ public class SingleApplicationParserTest {
.build(); .build();
when(siembolParser.parseToResult(metadata, input)).thenReturn(parserResult); when(siembolParser.parseToResult(metadata, input)).thenReturn(parserResult);
List<ParsingApplicationResult> result = appParser.parse(metadata, input); List<ParsingApplicationResult> result = appParser.parse( metadata, input);
verify(timeProvider, times(1)).getCurrentTimeInMs(); verify(timeProvider, times(1)).getCurrentTimeInMs();
verify(siembolParser, times(1)).parseToResult(metadata, input); verify(siembolParser, times(1)).parseToResult(metadata, input);
Assert.assertEquals(1, result.size()); Assert.assertEquals(1, result.size());
@@ -117,7 +117,7 @@ public class SingleApplicationParserTest {
parserResult.getParsedMessages().remove(1); parserResult.getParsedMessages().remove(1);
when(siembolParser.parseToResult(metadata, input)).thenReturn(parserResult); when(siembolParser.parseToResult(metadata, input)).thenReturn(parserResult);
List<ParsingApplicationResult> result = appParser.parse(metadata, input); List<ParsingApplicationResult> result = appParser.parse( metadata, input);
verify(timeProvider, times(1)).getCurrentTimeInMs(); verify(timeProvider, times(1)).getCurrentTimeInMs();
verify(siembolParser, times(1)).parseToResult(metadata, input); verify(siembolParser, times(1)).parseToResult(metadata, input);
Assert.assertEquals(1, result.size()); Assert.assertEquals(1, result.size());
@@ -143,7 +143,7 @@ public class SingleApplicationParserTest {
parserResult.getParsedMessages().remove(1); parserResult.getParsedMessages().remove(1);
when(siembolParser.parseToResult(metadata, input)).thenReturn(parserResult); when(siembolParser.parseToResult(metadata, input)).thenReturn(parserResult);
List<ParsingApplicationResult> result = appParser.parse(metadata, input); List<ParsingApplicationResult> result = appParser.parse( metadata, input);
verify(timeProvider, times(1)).getCurrentTimeInMs(); verify(timeProvider, times(1)).getCurrentTimeInMs();
verify(siembolParser, times(1)).parseToResult(metadata, input); verify(siembolParser, times(1)).parseToResult(metadata, input);
Assert.assertEquals(1, result.size()); Assert.assertEquals(1, result.size());
@@ -168,7 +168,7 @@ public class SingleApplicationParserTest {
parserResult.getParsedMessages().replaceAll(x-> new HashMap<>()); parserResult.getParsedMessages().replaceAll(x-> new HashMap<>());
when(siembolParser.parseToResult(metadata, input)).thenReturn(parserResult); when(siembolParser.parseToResult(metadata, input)).thenReturn(parserResult);
List<ParsingApplicationResult> result = appParser.parse(metadata, input); List<ParsingApplicationResult> result = appParser.parse( metadata, input);
Assert.assertTrue(result.isEmpty()); Assert.assertTrue(result.isEmpty());
} }
@@ -184,7 +184,7 @@ public class SingleApplicationParserTest {
parserResult.setParsedMessages(null); parserResult.setParsedMessages(null);
parserResult.setException(new IllegalStateException("test_exception")); parserResult.setException(new IllegalStateException("test_exception"));
when(siembolParser.parseToResult(metadata, input)).thenReturn(parserResult); when(siembolParser.parseToResult(metadata, input)).thenReturn(parserResult);
List<ParsingApplicationResult> result = appParser.parse(metadata, input); List<ParsingApplicationResult> result = appParser.parse( metadata, input);
verify(timeProvider, times(1)).getCurrentTimeInMs(); verify(timeProvider, times(1)).getCurrentTimeInMs();
verify(siembolParser, times(1)).parseToResult(metadata, input); verify(siembolParser, times(1)).parseToResult(metadata, input);
Assert.assertEquals(1, result.size()); Assert.assertEquals(1, result.size());
@@ -206,7 +206,7 @@ public class SingleApplicationParserTest {
.build(); .build();
when(siembolParser.parseToResult(metadata, input)).thenThrow(new RuntimeException("runtime_exception")); when(siembolParser.parseToResult(metadata, input)).thenThrow(new RuntimeException("runtime_exception"));
List<ParsingApplicationResult> result = appParser.parse(metadata, input); List<ParsingApplicationResult> result = appParser.parse( metadata, input);
verify(timeProvider, times(1)).getCurrentTimeInMs(); verify(timeProvider, times(1)).getCurrentTimeInMs();
verify(siembolParser, times(1)).parseToResult(metadata, input); verify(siembolParser, times(1)).parseToResult(metadata, input);
Assert.assertEquals(1, result.size()); Assert.assertEquals(1, result.size());
@@ -229,7 +229,7 @@ public class SingleApplicationParserTest {
parserResult.setParsedMessages(new ArrayList<>()); parserResult.setParsedMessages(new ArrayList<>());
when(siembolParser.parseToResult(metadata, input)).thenReturn(parserResult); when(siembolParser.parseToResult(metadata, input)).thenReturn(parserResult);
List<ParsingApplicationResult> result = appParser.parse(metadata, input); List<ParsingApplicationResult> result = appParser.parse( metadata, input);
verify(timeProvider, times(1)).getCurrentTimeInMs(); verify(timeProvider, times(1)).getCurrentTimeInMs();
verify(siembolParser, times(1)).parseToResult(metadata, input); verify(siembolParser, times(1)).parseToResult(metadata, input);
Assert.assertEquals(0, result.size()); Assert.assertEquals(0, result.size());
@@ -247,7 +247,7 @@ public class SingleApplicationParserTest {
.build(); .build();
when(siembolParser.parseToResult(metadata, input)).thenReturn(parserResult); when(siembolParser.parseToResult(metadata, input)).thenReturn(parserResult);
List<ParsingApplicationResult> result = appParser.parse(metadata, input); List<ParsingApplicationResult> result = appParser.parse( metadata, input);
verify(timeProvider, times(1)).getCurrentTimeInMs(); verify(timeProvider, times(1)).getCurrentTimeInMs();
verify(siembolParser, times(1)).parseToResult(metadata, input); verify(siembolParser, times(1)).parseToResult(metadata, input);
Assert.assertEquals(1, result.size()); Assert.assertEquals(1, result.size());
@@ -284,7 +284,7 @@ public class SingleApplicationParserTest {
.build(); .build();
when(siembolParser.parseToResult(metadata, input)).thenReturn(parserResult); when(siembolParser.parseToResult(metadata, input)).thenReturn(parserResult);
List<ParsingApplicationResult> result = appParser.parse(metadata, input); List<ParsingApplicationResult> result = appParser.parse( metadata, input);
verify(timeProvider, times(1)).getCurrentTimeInMs(); verify(timeProvider, times(1)).getCurrentTimeInMs();
verify(siembolParser, times(1)).parseToResult(metadata, input); verify(siembolParser, times(1)).parseToResult(metadata, input);
Assert.assertEquals(1, result.size()); Assert.assertEquals(1, result.size());
@@ -321,7 +321,7 @@ public class SingleApplicationParserTest {
.build(); .build();
when(siembolParser.parseToResult(metadata, input)).thenReturn(parserResult); when(siembolParser.parseToResult(metadata, input)).thenReturn(parserResult);
List<ParsingApplicationResult> result = appParser.parse(metadata, input); List<ParsingApplicationResult> result = appParser.parse( metadata, input);
verify(timeProvider, times(1)).getCurrentTimeInMs(); verify(timeProvider, times(1)).getCurrentTimeInMs();
verify(siembolParser, times(1)).parseToResult(metadata, input); verify(siembolParser, times(1)).parseToResult(metadata, input);
Assert.assertEquals(1, result.size()); Assert.assertEquals(1, result.size());

View File

@@ -0,0 +1,231 @@
package uk.co.gresearch.siembol.parsers.application.parsing;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import uk.co.gresearch.siembol.common.constants.SiembolMessageFields;
import uk.co.gresearch.siembol.common.utils.TimeProvider;
import uk.co.gresearch.siembol.parsers.common.ParserResult;
import uk.co.gresearch.siembol.parsers.common.SerializableSiembolParser;
import java.util.*;
import static org.mockito.Mockito.*;
public class SourceRoutingApplicationParserTest {
private final String metadata = """
{
"a": "string",
"b": 1,
"c": true
}
""";
private SerializableSiembolParser defaultParser;
private SerializableSiembolParser routedParser1;
private SerializableSiembolParser routedParser2;
private SourceRoutingApplicationParser appParser;
private final String errorTopic = "error";
private final String outputTopic = "output";
private final byte[] input = "test".getBytes();
private ParserResult defaultParserResult;
private ParserResult routedParserResult1;
private ParserResult routedParserResult2;
TimeProvider timeProvider;
long currentTime = 1L;
@Before
public void setUp() {
timeProvider = Mockito.mock(TimeProvider.class);
when(timeProvider.getCurrentTimeInMs()).thenReturn(currentTime);
defaultParser = Mockito.mock(SerializableSiembolParser.class);
when(defaultParser.getSourceType()).thenReturn("default-parser");
routedParser1 = Mockito.mock(SerializableSiembolParser.class);
when(routedParser1.getSourceType()).thenReturn("routed-parser1");
routedParser2 = Mockito.mock(SerializableSiembolParser.class);
when(routedParser2.getSourceType()).thenReturn("routed-parser2");
Map<String, Object> messageRoutedParser = new HashMap<>();
messageRoutedParser.put("output_field", "routed");
messageRoutedParser.put("original_string", "test");
messageRoutedParser.put("timestamp", 3);
routedParserResult1 = new ParserResult();
routedParserResult1.setParsedMessages(Arrays.asList(messageRoutedParser));
routedParserResult2 = new ParserResult();
routedParserResult2.setParsedMessages(Arrays.asList(new HashMap<>(messageRoutedParser)));
Map<String, Object> messageDefaultParser = new HashMap<>();
messageDefaultParser.put("output_field", "default");
messageDefaultParser.put("original_string", "test");
messageDefaultParser.put("timestamp", 3);
defaultParserResult = new ParserResult();
defaultParserResult.setParsedMessages(Arrays.asList(new HashMap<>(messageDefaultParser)));
}
@Test(expected = IllegalArgumentException.class)
public void testMissingArguments() {
SourceRoutingApplicationParser.builder()
.errorTopic(errorTopic)
.build();
}
@Test(expected = IllegalArgumentException.class)
public void testMissingArguments2() {
SourceRoutingApplicationParser.builder()
.defaultParser(outputTopic, defaultParser)
.name("test")
.errorTopic(errorTopic)
.timeProvider(null)
.build();
}
@Test(expected = IllegalArgumentException.class)
public void testDuplicatesSources() {
SourceRoutingApplicationParser.builder()
.defaultParser(outputTopic, defaultParser)
.addParser("a", "out", routedParser1)
.addParser("a", "out", routedParser2)
.name("test")
.errorTopic(errorTopic)
.timeProvider(timeProvider)
.build();
}
@Test
public void testParseOneMessageDefault() {
appParser = SourceRoutingApplicationParser.builder()
.defaultParser(outputTopic, defaultParser)
.addParser("a", "out", routedParser1)
.addParser("b", "out", routedParser2)
.name("test")
.errorTopic(errorTopic)
.timeProvider(timeProvider)
.build();
when(defaultParser.parseToResult(metadata, "dummy".getBytes())).thenReturn(defaultParserResult);
List<ParsingApplicationResult> result = appParser.parse("c", metadata, "dummy".getBytes());
verify(timeProvider, times(1)).getCurrentTimeInMs();
verify(defaultParser, times(1)).parseToResult(metadata, "dummy".getBytes());
Assert.assertEquals(outputTopic, result.get(0).getTopic());
Assert.assertEquals(1, result.size());
Assert.assertEquals(1, result.get(0).getMessages().size());
Assert.assertTrue(result.get(0).getMessages().get(0).contains(SiembolMessageFields.PARSING_TIME + "\":1"));
Assert.assertTrue(result.get(0).getMessages().get(0).contains("output_field" + "\":\"default"));
Assert.assertTrue(result.get(0).getMessages().get(0).contains(SiembolMessageFields.PARSING_TIME + "\":1"));
Assert.assertTrue(result.get(0).getMessages().get(0).contains("original_string" + "\":\"test"));
Assert.assertTrue(result.get(0).getMessages().get(0).contains("timestamp" + "\":3"));
Assert.assertTrue(result.get(0).getMessages().get(0).contains(
SiembolMessageFields.SENSOR_TYPE + "\":\"default-parser\""));
}
@Test
public void testParseOneMessageRouted() {
appParser = SourceRoutingApplicationParser.builder()
.defaultParser(outputTopic, defaultParser)
.addParser("a", "out", routedParser1)
.addParser("b", "out", routedParser2)
.name("test")
.errorTopic(errorTopic)
.timeProvider(timeProvider)
.build();
when(routedParser1.parseToResult(metadata, "dummy".getBytes())).thenReturn(routedParserResult1);
List<ParsingApplicationResult> result = appParser.parse("a", metadata, "dummy".getBytes());
verify(timeProvider, times(1)).getCurrentTimeInMs();
verify(routedParser1, times(1)).parseToResult(metadata, "dummy".getBytes());
Assert.assertEquals("out", result.get(0).getTopic());
Assert.assertEquals(1, result.size());
Assert.assertEquals(1, result.get(0).getMessages().size());
Assert.assertTrue(result.get(0).getMessages().get(0).contains(SiembolMessageFields.PARSING_TIME + "\":1"));
Assert.assertTrue(result.get(0).getMessages().get(0).contains("output_field" + "\":\"routed"));
Assert.assertTrue(result.get(0).getMessages().get(0).contains(SiembolMessageFields.PARSING_TIME + "\":1"));
Assert.assertTrue(result.get(0).getMessages().get(0).contains("original_string" + "\":\"test"));
Assert.assertTrue(result.get(0).getMessages().get(0).contains("timestamp" + "\":3"));
Assert.assertTrue(result.get(0).getMessages().get(0).contains(
SiembolMessageFields.SENSOR_TYPE + "\":\"routed-parser1\""));
}
@Test
public void testParseOneMessageRouted2() {
appParser = SourceRoutingApplicationParser.builder()
.defaultParser(outputTopic, defaultParser)
.addParser("a", "out", routedParser1)
.addParser("b", "out", routedParser2)
.name("test")
.errorTopic(errorTopic)
.timeProvider(timeProvider)
.build();
when(routedParser2.parseToResult(metadata, "dummy".getBytes())).thenReturn(routedParserResult1);
List<ParsingApplicationResult> result = appParser.parse("b", metadata, "dummy".getBytes());
verify(timeProvider, times(1)).getCurrentTimeInMs();
verify(routedParser2, times(1)).parseToResult(metadata, "dummy".getBytes());
Assert.assertEquals("out", result.get(0).getTopic());
Assert.assertEquals(1, result.size());
Assert.assertEquals(1, result.get(0).getMessages().size());
Assert.assertTrue(result.get(0).getMessages().get(0).contains(SiembolMessageFields.PARSING_TIME + "\":1"));
Assert.assertTrue(result.get(0).getMessages().get(0).contains("output_field" + "\":\"routed"));
Assert.assertTrue(result.get(0).getMessages().get(0).contains(SiembolMessageFields.PARSING_TIME + "\":1"));
Assert.assertTrue(result.get(0).getMessages().get(0).contains("original_string" + "\":\"test"));
Assert.assertTrue(result.get(0).getMessages().get(0).contains("timestamp" + "\":3"));
Assert.assertTrue(result.get(0).getMessages().get(0).contains(
SiembolMessageFields.SENSOR_TYPE + "\":\"routed-parser2\""));
}
@Test
public void testParseOneMessageFiltered() {
appParser = SourceRoutingApplicationParser.builder()
.defaultParser(outputTopic, defaultParser)
.addParser("a", "out", routedParser1)
.addParser("b", "out", routedParser2)
.name("test")
.errorTopic(errorTopic)
.timeProvider(timeProvider)
.build();
routedParserResult1.setParsedMessages(new ArrayList<>());
when(routedParser1.parseToResult(metadata, input)).thenReturn(routedParserResult1);
List<ParsingApplicationResult> result = appParser.parse("a", metadata, input);
verify(timeProvider, times(1)).getCurrentTimeInMs();
verify(routedParser1, times(1)).parseToResult(metadata, input);
Assert.assertTrue(result.isEmpty());
}
@Test
public void testParseOneMessageException() {
appParser = SourceRoutingApplicationParser.builder()
.defaultParser(outputTopic, defaultParser)
.addParser("a", "out", routedParser1)
.addParser("b", "out", routedParser2)
.name("test")
.errorTopic(errorTopic)
.timeProvider(timeProvider)
.build();
routedParserResult1.setException(new IllegalStateException("test_exception"));
when(routedParser1.parseToResult(metadata, input)).thenReturn(routedParserResult1);
List<ParsingApplicationResult> result = appParser.parse("a", metadata, input);
verify(timeProvider, times(1)).getCurrentTimeInMs();
verify(routedParser1, times(1)).parseToResult(metadata, input);
Assert.assertEquals(1, result.size());
Assert.assertEquals("error", result.get(0).getTopic());
Assert.assertTrue(result.get(0).getMessages().get(0).contains("\"failed_sensor_type\":\"routed-parser1\""));
}
}

View File

@@ -11,7 +11,7 @@
<parent> <parent>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>parsing</artifactId> <artifactId>parsing</artifactId>
<version>2.2.8-SNAPSHOT</version> <version>2.2.9-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>
<dependency> <dependency>
@@ -45,7 +45,7 @@
<dependency> <dependency>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>siembol-common</artifactId> <artifactId>siembol-common</artifactId>
<version>2.2.8-SNAPSHOT</version> <version>2.2.9-SNAPSHOT</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>joda-time</groupId> <groupId>joda-time</groupId>

View File

@@ -9,7 +9,7 @@
<parent> <parent>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>parsing</artifactId> <artifactId>parsing</artifactId>
<version>2.2.8-SNAPSHOT</version> <version>2.2.9-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>
<dependency> <dependency>
@@ -71,7 +71,7 @@
<dependency> <dependency>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>parsing-app</artifactId> <artifactId>parsing-app</artifactId>
<version>2.2.8-SNAPSHOT</version> <version>2.2.9-SNAPSHOT</version>
<exclusions> <exclusions>
<exclusion> <exclusion>
<groupId>org.slf4j</groupId> <groupId>org.slf4j</groupId>

View File

@@ -89,7 +89,7 @@ public class ParsingApplicationBolt extends BaseRichBolt {
private void updateParsers() { private void updateParsers() {
try { try {
ParsingApplicationFactory factory = new ParsingApplicationFactoryImpl(); ParsingApplicationFactory factory = new ParsingApplicationFactoryImpl();
LOG.info(PARSERS_UPDATE_START); LOG.info(PARSERS_UPDATE_START);
String parserConfigs = zooKeeperConnector.getData(); String parserConfigs = zooKeeperConnector.getData();
@@ -117,6 +117,7 @@ public class ParsingApplicationBolt extends BaseRichBolt {
public void execute(Tuple tuple) { public void execute(Tuple tuple) {
ParsingApplicationParser currentParser = parsingApplicationParser.get(); ParsingApplicationParser currentParser = parsingApplicationParser.get();
String source = tuple.getStringByField(ParsingApplicationTuples.SOURCE.toString());
String metadata = tuple.getStringByField(ParsingApplicationTuples.METADATA.toString()); String metadata = tuple.getStringByField(ParsingApplicationTuples.METADATA.toString());
Object logObj = tuple.getValueByField(ParsingApplicationTuples.LOG.toString()); Object logObj = tuple.getValueByField(ParsingApplicationTuples.LOG.toString());
if (!(logObj instanceof byte[])) { if (!(logObj instanceof byte[])) {
@@ -124,7 +125,7 @@ public class ParsingApplicationBolt extends BaseRichBolt {
} }
byte[] log = (byte[])logObj; byte[] log = (byte[])logObj;
ArrayList<ParsingApplicationResult> results = currentParser.parse(metadata, log); ArrayList<ParsingApplicationResult> results = currentParser.parse(source, metadata, log);
if (!results.isEmpty()) { if (!results.isEmpty()) {
KafkaBatchWriterMessages kafkaBatchWriterMessages = new KafkaBatchWriterMessages(); KafkaBatchWriterMessages kafkaBatchWriterMessages = new KafkaBatchWriterMessages();
results.forEach(x -> x.getMessages().forEach(y -> results.forEach(x -> x.getMessages().forEach(y ->

View File

@@ -1,6 +1,7 @@
package uk.co.gresearch.siembol.parsers.storm; package uk.co.gresearch.siembol.parsers.storm;
public enum ParsingApplicationTuples { public enum ParsingApplicationTuples {
SOURCE("source"),
METADATA("metadata"), METADATA("metadata"),
LOG("log"), LOG("log"),
PARSING_MESSAGES("messages"); PARSING_MESSAGES("messages");

View File

@@ -1,11 +1,13 @@
package uk.co.gresearch.siembol.parsers.storm; package uk.co.gresearch.siembol.parsers.storm;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.storm.Config; import org.apache.storm.Config;
import org.apache.storm.StormSubmitter; import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology; import org.apache.storm.generated.StormTopology;
import org.apache.storm.kafka.spout.Func;
import org.apache.storm.kafka.spout.KafkaSpout; import org.apache.storm.kafka.spout.KafkaSpout;
import org.apache.storm.kafka.spout.KafkaSpoutConfig; import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.topology.TopologyBuilder;
@@ -24,7 +26,9 @@ import uk.co.gresearch.siembol.parsers.application.factory.ParsingApplicationFac
import uk.co.gresearch.siembol.parsers.application.factory.ParsingApplicationFactoryResult; import uk.co.gresearch.siembol.parsers.application.factory.ParsingApplicationFactoryResult;
import java.lang.invoke.MethodHandles; import java.lang.invoke.MethodHandles;
import java.nio.charset.StandardCharsets;
import java.util.Base64; import java.util.Base64;
import java.util.List;
import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
@@ -43,9 +47,11 @@ public class StormParsingApplication {
"Base64 encoded storm attributes and parsing app attributes"; "Base64 encoded storm attributes and parsing app attributes";
private static final String SUBMIT_INFO_LOG = "Submitted parsing application storm topology: {} " + private static final String SUBMIT_INFO_LOG = "Submitted parsing application storm topology: {} " +
"with storm attributes: {}\nparsing application attributes: {}"; "with storm attributes: {}\nparsing application attributes: {}";
private static final String UNKNOWN_SOURCE = "unknown";
private static KafkaSpoutConfig<String, byte[]> createKafkaSpoutConfig( private static KafkaSpoutConfig<String, byte[]> createKafkaSpoutConfig(
StormParsingApplicationAttributesDto parsingApplicationAttributes) { StormParsingApplicationAttributesDto parsingApplicationAttributes,
ParsingApplicationFactoryAttributes parsingAttributes) {
StormAttributesDto stormAttributes = parsingApplicationAttributes.getStormAttributes(); StormAttributesDto stormAttributes = parsingApplicationAttributes.getStormAttributes();
stormAttributes.getKafkaSpoutProperties().getRawMap() stormAttributes.getKafkaSpoutProperties().getRawMap()
.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); .put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
@@ -53,8 +59,28 @@ public class StormParsingApplication {
.put(VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); .put(VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
return StormHelper.createKafkaSpoutConfig(stormAttributes, return StormHelper.createKafkaSpoutConfig(stormAttributes,
r -> new Values(r.key(), r.value()), createConsumerRecordFunction(parsingAttributes),
new Fields(ParsingApplicationTuples.METADATA.toString(), ParsingApplicationTuples.LOG.toString())); new Fields(ParsingApplicationTuples.SOURCE.toString(),
ParsingApplicationTuples.METADATA.toString(),
ParsingApplicationTuples.LOG.toString()));
}
private static <K, V> Func<ConsumerRecord<K,V>, List<Object>> createConsumerRecordFunction(
ParsingApplicationFactoryAttributes parsingAttributes) {
switch (parsingAttributes.getApplicationType()) {
case SINGLE_PARSER:
case ROUTER_PARSING:
return r -> new Values(UNKNOWN_SOURCE, r.key(), r.value());
case TOPIC_ROUTING_PARSING:
return r -> new Values(r.topic(), r.key(), r.value());
case HEADER_ROUTING_PARSING:
final String headerName = parsingAttributes.getSourceHeaderName();
return r -> new Values(new String(r.headers().lastHeader(headerName).value(), StandardCharsets.UTF_8),
r.key(),
r.value());
default:
throw new IllegalArgumentException();
}
} }
public static StormTopology createTopology(StormParsingApplicationAttributesDto stormAppAttributes, public static StormTopology createTopology(StormParsingApplicationAttributesDto stormAppAttributes,
@@ -68,7 +94,7 @@ public class StormParsingApplication {
TopologyBuilder builder = new TopologyBuilder(); TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(KAFKA_SPOUT, builder.setSpout(KAFKA_SPOUT,
new KafkaSpout<>(createKafkaSpoutConfig(stormAppAttributes)), new KafkaSpout<>(createKafkaSpoutConfig(stormAppAttributes, parsingAttributes)),
parsingAttributes.getInputParallelism()); parsingAttributes.getInputParallelism());
builder.setBolt(parsingAttributes.getName(), builder.setBolt(parsingAttributes.getName(),

View File

@@ -11,7 +11,7 @@
<parent> <parent>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>siembol</artifactId> <artifactId>siembol</artifactId>
<version>2.2.8-SNAPSHOT</version> <version>2.2.9-SNAPSHOT</version>
</parent> </parent>
<modules> <modules>
<module>parsing-core</module> <module>parsing-core</module>

View File

@@ -6,7 +6,7 @@
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>siembol</artifactId> <artifactId>siembol</artifactId>
<name>siembol</name> <name>siembol</name>
<version>2.2.8-SNAPSHOT</version> <version>2.2.9-SNAPSHOT</version>
<description>A scalable, advanced security analytics framework based on open-source big data technologies.</description> <description>A scalable, advanced security analytics framework based on open-source big data technologies.</description>
<inceptionYear>2019</inceptionYear> <inceptionYear>2019</inceptionYear>
<url>https://siembol.io/</url> <url>https://siembol.io/</url>

View File

@@ -11,7 +11,7 @@
<parent> <parent>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>siembol</artifactId> <artifactId>siembol</artifactId>
<version>2.2.8-SNAPSHOT</version> <version>2.2.9-SNAPSHOT</version>
</parent> </parent>
<modules> <modules>
<module>responding-core</module> <module>responding-core</module>

View File

@@ -11,7 +11,7 @@
<parent> <parent>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>responding</artifactId> <artifactId>responding</artifactId>
<version>2.2.8-SNAPSHOT</version> <version>2.2.9-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>
<dependency> <dependency>
@@ -35,12 +35,12 @@
<dependency> <dependency>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>siembol-common</artifactId> <artifactId>siembol-common</artifactId>
<version>2.2.8-SNAPSHOT</version> <version>2.2.9-SNAPSHOT</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>alerting-core</artifactId> <artifactId>alerting-core</artifactId>
<version>2.2.8-SNAPSHOT</version> <version>2.2.9-SNAPSHOT</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.jayway.jsonpath</groupId> <groupId>com.jayway.jsonpath</groupId>

View File

@@ -9,7 +9,7 @@
<parent> <parent>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>responding</artifactId> <artifactId>responding</artifactId>
<version>2.2.8-SNAPSHOT</version> <version>2.2.9-SNAPSHOT</version>
</parent> </parent>
<dependencyManagement> <dependencyManagement>
<dependencies> <dependencies>
@@ -51,7 +51,7 @@
<dependency> <dependency>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>siembol-common</artifactId> <artifactId>siembol-common</artifactId>
<version>2.2.8-SNAPSHOT</version> <version>2.2.9-SNAPSHOT</version>
<exclusions> <exclusions>
<exclusion> <exclusion>
<groupId>org.slf4j</groupId> <groupId>org.slf4j</groupId>
@@ -62,7 +62,7 @@
<dependency> <dependency>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>responding-core</artifactId> <artifactId>responding-core</artifactId>
<version>2.2.8-SNAPSHOT</version> <version>2.2.9-SNAPSHOT</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.kafka</groupId> <groupId>org.apache.kafka</groupId>

View File

@@ -9,7 +9,7 @@
<parent> <parent>
<groupId>uk.co.gresearch.siembol</groupId> <groupId>uk.co.gresearch.siembol</groupId>
<artifactId>siembol</artifactId> <artifactId>siembol</artifactId>
<version>2.2.8-SNAPSHOT</version> <version>2.2.9-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>
<dependency> <dependency>