Adding metrics into Siembol (#572)

* inti commit for siembol metrics

* adding missed files

* fixing parsing application bolt

* Reworking response metrics to use common library

* fixing metrics names

* using rule name in alertign metrics instead of full rule name

* improving parsing storm tests

* improving alerting tests

* improving enrichment tests

* improving response tests

* increasing app version

* change siembol version

* increasing siembol app version

* renaming metrics

* bug fixing
This commit is contained in:
Marian Novotny
2022-03-21 15:42:57 +00:00
committed by GitHub
parent dd5bde5eb9
commit d281c86c4a
90 changed files with 1503 additions and 627 deletions

View File

@@ -85,19 +85,25 @@ public abstract class ParsingApplicationParser implements Serializable {
? JSON_READER.readValue(metadata.trim())
: null;
long timestamp = timeProvider.getCurrentTimeInMs();
for (ParserResult parserResult : parseInternally(source, metadata, message)) {
var currentResult = new ParsingApplicationResult(parserResult.getSourceType());
if (parserResult.getException() != null) {
ret.add(new ParsingApplicationResult(
errorTopic,
getErrorMessage(parserResult.getException(), parserResult.getSourceType(), message)));
currentResult.setResultType(ParsingApplicationResult.ResultType.ERROR);
currentResult.setTopic(errorTopic);
currentResult.setMessage(getErrorMessage(
parserResult.getException(), parserResult.getSourceType(), message));
ret.add(currentResult);
continue;
}
List<Map<String, Object>> parsed = parserResult.getParsedMessages();
currentResult.setTopic(parserResult.getTopic());
var parsed = parserResult.getParsedMessages();
parsed.removeIf(Map::isEmpty);
if (parsed.isEmpty()) {
currentResult.setResultType(ParsingApplicationResult.ResultType.FILTERED);
ret.add(currentResult);
continue;
}
@@ -123,12 +129,19 @@ public abstract class ParsingApplicationParser implements Serializable {
}
})
.collect(Collectors.toCollection(ArrayList::new));
ret.add(new ParsingApplicationResult(parserResult.getTopic(), serialised));
currentResult.setMessages(serialised);
currentResult.setResultType(ParsingApplicationResult.ResultType.PARSED);
ret.add(currentResult);
}
return ret;
} catch (Exception e) {
LOG.debug(ERROR_MESSAGE, name, new String(message), metadata, ExceptionUtils.getMessage(e));
ret.add(new ParsingApplicationResult(errorTopic, getErrorMessage(e, sourceType, message)));
var errorResult = new ParsingApplicationResult(sourceType);
errorResult.setTopic(errorTopic);
errorResult.setMessage(getErrorMessage(e, sourceType, message));
errorResult.setResultType(ParsingApplicationResult.ResultType.ERROR);
ret.add(errorResult);
return ret;
}
}

View File

@@ -4,18 +4,19 @@ import java.io.Serializable;
import java.util.ArrayList;
public class ParsingApplicationResult implements Serializable {
private static final long serialVersionUID = 1L;
private final String topic;
private final ArrayList<String> messages;
public ParsingApplicationResult(String topic, ArrayList<String> messages) {
this.topic = topic;
this.messages = messages;
public enum ResultType implements Serializable {
PARSED,
ERROR,
FILTERED
}
public ParsingApplicationResult(String topic, String message) {
this.topic = topic;
messages = new ArrayList<>();
messages.add(message);
private static final long serialVersionUID = 1L;
private final String sourceType;
private String topic;
private ArrayList<String> messages;
private ResultType resultType = ResultType.PARSED;
public ParsingApplicationResult(String sourceType) {
this.sourceType = sourceType;
}
public String getTopic() {
@@ -26,4 +27,28 @@ public class ParsingApplicationResult implements Serializable {
return messages;
}
public String getSourceType() {
return sourceType;
}
public void setMessages(ArrayList<String> messages) {
this.messages = messages;
}
public void setMessage(String message) {
this.messages = new ArrayList<>();
this.messages.add(message);
}
public void setTopic(String topic) {
this.topic = topic;
}
public ResultType getResultType() {
return resultType;
}
public void setResultType(ResultType resultType) {
this.resultType = resultType;
}
}

View File

@@ -169,7 +169,8 @@ public class SingleApplicationParserTest {
parserResult.getParsedMessages().replaceAll(x-> new HashMap<>());
when(siembolParser.parseToResult(metadata, input)).thenReturn(parserResult);
List<ParsingApplicationResult> result = appParser.parse( metadata, input);
Assert.assertTrue(result.isEmpty());
Assert.assertEquals(1, result.size());
Assert.assertNull(result.get(0).getMessages());
}
@Test
@@ -232,7 +233,9 @@ public class SingleApplicationParserTest {
List<ParsingApplicationResult> result = appParser.parse( metadata, input);
verify(timeProvider, times(1)).getCurrentTimeInMs();
verify(siembolParser, times(1)).parseToResult(metadata, input);
Assert.assertEquals(0, result.size());
Assert.assertEquals(1, result.size());
Assert.assertNull(result.get(0).getMessages());
}
@Test

View File

@@ -203,7 +203,9 @@ public class SourceRoutingApplicationParserTest {
verify(timeProvider, times(1)).getCurrentTimeInMs();
verify(routedParser1, times(1)).parseToResult(metadata, input);
Assert.assertTrue(result.isEmpty());
Assert.assertFalse(result.isEmpty());
Assert.assertEquals(1, result.size());
Assert.assertNull(result.get(0).getMessages());
}
@Test