diff --git a/dagger-common/src/main/java/com/gotocompany/dagger/common/configuration/Configuration.java b/dagger-common/src/main/java/com/gotocompany/dagger/common/configuration/Configuration.java index c819475a1..eec8ce16d 100644 --- a/dagger-common/src/main/java/com/gotocompany/dagger/common/configuration/Configuration.java +++ b/dagger-common/src/main/java/com/gotocompany/dagger/common/configuration/Configuration.java @@ -3,6 +3,7 @@ import org.apache.flink.api.java.utils.ParameterTool; import java.io.Serializable; +import java.util.Arrays; public class Configuration implements Serializable { private final ParameterTool param; @@ -23,6 +24,15 @@ public String getString(String configKey, String defaultValue) { return param.get(configKey, defaultValue); } + public String[] getStringArray(String configKey, String[] defaultValue) { + String value = param.get(configKey); + if (value == null || value.trim().isEmpty()) { + return defaultValue; + } + + return Arrays.stream(value.split(",")).map(String::trim).toArray(String[]::new); + } + public Integer getInteger(String configKey, Integer defaultValue) { return param.getInt(configKey, defaultValue); } diff --git a/dagger-common/src/test/java/com/gotocompany/dagger/common/configuration/ConfigurationTest.java b/dagger-common/src/test/java/com/gotocompany/dagger/common/configuration/ConfigurationTest.java index 2119f25cf..3546436dd 100644 --- a/dagger-common/src/test/java/com/gotocompany/dagger/common/configuration/ConfigurationTest.java +++ b/dagger-common/src/test/java/com/gotocompany/dagger/common/configuration/ConfigurationTest.java @@ -6,6 +6,7 @@ import org.junit.Test; import org.mockito.Mock; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.mockito.Mockito.when; @@ -36,6 +37,28 @@ public void shouldGetNullIfParamIsNotSet() { assertNull(configuration.getString("config_not_exist")); } + @Test + public void shouldGetStringArrayFromParamTool() { + when(parameterTool.get("config_array_key")).thenReturn("test_value, test_value_2"); + + assertArrayEquals(new String[]{"test_value", "test_value_2"}, configuration.getStringArray("config_array_key", new String[]{"default_not_used"})); + } + + @Test + public void shouldGetNullStringArrayIfParamIsNotSet() { + String[] defaultValue = new String[]{"default"}; + + assertArrayEquals(defaultValue, configuration.getStringArray("config_not_exist", defaultValue)); + } + + @Test + public void shouldGetEmptyStringArrayForBlankValue() { + String[] defaultValue = new String[]{"default"}; + when(parameterTool.get("config_array_key")).thenReturn(" "); + + assertArrayEquals(defaultValue, configuration.getStringArray("config_array_key", defaultValue)); + } + @Test public void shouldGetIntegerFromParamTool() { when(parameterTool.getInt("test_config", 1)).thenReturn(2); diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/DaggerSqlJobBuilder.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/DaggerSqlJobBuilder.java index 7114cf829..991ff4ac1 100644 --- a/dagger-core/src/main/java/com/gotocompany/dagger/core/DaggerSqlJobBuilder.java +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/DaggerSqlJobBuilder.java @@ -232,5 +232,4 @@ private void addSink(StreamInfo streamInfo) { sinkOrchestrator.addSubscriber(telemetryExporter); streamInfo.getDataStream().sinkTo(sinkOrchestrator.getSink(configuration, streamInfo.getColumnNames(), stencilClientOrchestrator, daggerStatsDReporter)); } - } diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/SinkOrchestrator.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/SinkOrchestrator.java index 05997e445..49ba96b19 100644 --- a/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/SinkOrchestrator.java +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/SinkOrchestrator.java @@ -52,7 +52,8 @@ public SinkOrchestrator(MetricsTelemetryExporter telemetryExporter) { * @columnNames columnNames the column names * @StencilClientOrchestrator stencilClientOrchestrator the stencil client orchestrator */ - public Sink getSink(Configuration configuration, String[] columnNames, StencilClientOrchestrator stencilClientOrchestrator, DaggerStatsDReporter daggerStatsDReporter) { + public Sink getSink(Configuration configuration, String[] columnNames, StencilClientOrchestrator stencilClientOrchestrator, + DaggerStatsDReporter daggerStatsDReporter, String influxMeasurementOverrideName) { String sinkType = configuration.getString("SINK_TYPE", "influx"); addMetric(TelemetryTypes.SINK_TYPE.getValue(), sinkType); Sink sink; @@ -85,12 +86,18 @@ public Sink getSink(Configuration configuration, String[] columnNames, StencilCl .build(); break; default: - sink = new InfluxDBSink(new InfluxDBFactoryWrapper(), configuration, columnNames, new ErrorHandler()); + sink = new InfluxDBSink(new InfluxDBFactoryWrapper(), configuration, columnNames, new ErrorHandler(), influxMeasurementOverrideName); } notifySubscriber(); return sink; } + public Sink getSink(Configuration configuration, String[] columnNames, StencilClientOrchestrator stencilClientOrchestrator, + DaggerStatsDReporter daggerStatsDReporter) { + String influxMeasurementOverrideName = null; + return getSink(configuration, columnNames, stencilClientOrchestrator, daggerStatsDReporter, influxMeasurementOverrideName); + } + private void reportTelemetry(KafkaSerializerBuilder kafkaSchemaBuilder) { TelemetryPublisher pub = (TelemetryPublisher) kafkaSchemaBuilder; pub.addSubscriber(telemetryExporter); diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/influx/InfluxDBSink.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/influx/InfluxDBSink.java index 9d961fa9d..d5307b005 100644 --- a/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/influx/InfluxDBSink.java +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/influx/InfluxDBSink.java @@ -25,12 +25,15 @@ public class InfluxDBSink implements Sink { private String[] columnNames; private ErrorHandler errorHandler; private ErrorReporter errorReporter; + private final String influxMeasurementOverrideName; - public InfluxDBSink(InfluxDBFactoryWrapper influxDBFactory, Configuration configuration, String[] columnNames, ErrorHandler errorHandler) { + public InfluxDBSink(InfluxDBFactoryWrapper influxDBFactory, Configuration configuration, String[] columnNames, + ErrorHandler errorHandler, String influxMeasurementOverrideName) { this.influxDBFactory = influxDBFactory; this.configuration = configuration; this.columnNames = columnNames; this.errorHandler = errorHandler; + this.influxMeasurementOverrideName = influxMeasurementOverrideName; } @Override @@ -46,7 +49,7 @@ public SinkWriter createWriter(InitContext context, List errorReporter = ErrorReporterFactory.getErrorReporter(context.metricGroup(), configuration); } - InfluxDBWriter influxDBWriter = new InfluxDBWriter(configuration, influxDB, columnNames, errorHandler, errorReporter); + InfluxDBWriter influxDBWriter = new InfluxDBWriter(configuration, influxDB, columnNames, errorHandler, errorReporter, influxMeasurementOverrideName); return influxDBWriter; } diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/influx/InfluxDBWriter.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/influx/InfluxDBWriter.java index 371ce3b86..578e06ea4 100644 --- a/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/influx/InfluxDBWriter.java +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/influx/InfluxDBWriter.java @@ -36,10 +36,15 @@ public class InfluxDBWriter implements SinkWriter { private ErrorReporter errorReporter; private boolean useRowFieldNames; - public InfluxDBWriter(Configuration configuration, InfluxDB influxDB, String[] columnNames, ErrorHandler errorHandler, ErrorReporter errorReporter) { + public InfluxDBWriter(Configuration configuration, InfluxDB influxDB, String[] columnNames, ErrorHandler errorHandler, + ErrorReporter errorReporter, String influxMeasurementOverrideName) { databaseName = configuration.getString(Constants.SINK_INFLUX_DB_NAME_KEY, Constants.SINK_INFLUX_DB_NAME_DEFAULT); retentionPolicy = configuration.getString(Constants.SINK_INFLUX_RETENTION_POLICY_KEY, Constants.SINK_INFLUX_RETENTION_POLICY_DEFAULT); - measurementName = configuration.getString(Constants.SINK_INFLUX_MEASUREMENT_NAME_KEY, Constants.SINK_INFLUX_MEASUREMENT_NAME_DEFAULT); + if (Strings.isNullOrEmpty(influxMeasurementOverrideName)) { + measurementName = configuration.getString(Constants.SINK_INFLUX_MEASUREMENT_NAME_KEY, Constants.SINK_INFLUX_MEASUREMENT_NAME_DEFAULT); + } else { + measurementName = influxMeasurementOverrideName; + } useRowFieldNames = configuration.getBoolean(Constants.SINK_INFLUX_USING_ROW_FIELD_NAMES_KEY, Constants.SINK_INFLUX_USING_ROW_FIELD_NAMES_DEFAULT); this.influxDB = influxDB; this.columnNames = columnNames; diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/utils/Constants.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/utils/Constants.java index 94ed1a80f..c92a66b6e 100644 --- a/dagger-core/src/main/java/com/gotocompany/dagger/core/utils/Constants.java +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/utils/Constants.java @@ -156,6 +156,13 @@ public class Constants { public static final String SINK_INFLUX_RETENTION_POLICY_DEFAULT = ""; public static final String SINK_INFLUX_MEASUREMENT_NAME_KEY = "SINK_INFLUX_MEASUREMENT_NAME"; public static final String SINK_INFLUX_MEASUREMENT_NAME_DEFAULT = ""; + + // A custom job can use this configuration to get all Influx measurement names as a list + // and configure them in the job builder pipeline accordingly. + // The initial design assumed custom job authors would know the sink targets and hardcode them. + // If measurement names need to change, they can now be updated through configuration without changing the code. + public static final String SINK_INFLUX_MEASUREMENTS_LIST_KEY = "SINK_INFLUX_MEASUREMENTS_LIST"; + public static final String SINK_INFLUX_URL_KEY = "SINK_INFLUX_URL"; public static final String SINK_INFLUX_URL_DEFAULT = ""; public static final String SINK_INFLUX_USERNAME_KEY = "SINK_INFLUX_USERNAME"; diff --git a/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/SinkOrchestratorTest.java b/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/SinkOrchestratorTest.java index 1a660c543..277aac960 100644 --- a/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/SinkOrchestratorTest.java +++ b/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/SinkOrchestratorTest.java @@ -25,6 +25,7 @@ import static org.mockito.MockitoAnnotations.initMocks; public class SinkOrchestratorTest { + private final String influxMeasurementOverrideName = ""; private static final String SINK_KAFKA_PRODUCER_CONFIG_SASL_LOGIN_CALLBACK_HANDLER_CLASS = "SINK_KAFKA_PRODUCER_CONFIG_SASL_LOGIN_CALLBACK_HANDLER_CLASS"; private static final String SASL_LOGIN_CALLBACK_HANDLER_CLASS_VALUE = "com.gotocompany.dagger.core.utils.SinkKafkaConfigUtil"; @@ -51,7 +52,7 @@ public void setup() { @Test public void shouldGiveInfluxSinkWhenConfiguredToUseInflux() throws Exception { when(configuration.getString(eq("SINK_TYPE"), anyString())).thenReturn("influx"); - Sink sinkFunction = sinkOrchestrator.getSink(configuration, new String[]{}, stencilClientOrchestrator, daggerStatsDReporter); + Sink sinkFunction = sinkOrchestrator.getSink(configuration, new String[]{}, stencilClientOrchestrator, daggerStatsDReporter, influxMeasurementOverrideName); assertThat(sinkFunction, instanceOf(InfluxDBSink.class)); } @@ -59,7 +60,7 @@ public void shouldGiveInfluxSinkWhenConfiguredToUseInflux() throws Exception { @Test public void shouldGiveLogSinkWhenConfiguredToUseLog() throws Exception { when(configuration.getString(eq("SINK_TYPE"), anyString())).thenReturn("log"); - Sink sinkFunction = sinkOrchestrator.getSink(configuration, new String[]{}, stencilClientOrchestrator, daggerStatsDReporter); + Sink sinkFunction = sinkOrchestrator.getSink(configuration, new String[]{}, stencilClientOrchestrator, daggerStatsDReporter, influxMeasurementOverrideName); assertThat(sinkFunction, instanceOf(LogSink.class)); } @@ -67,7 +68,7 @@ public void shouldGiveLogSinkWhenConfiguredToUseLog() throws Exception { @Test public void shouldGiveInfluxWhenConfiguredToUseNothing() throws Exception { when(configuration.getString(eq("SINK_TYPE"), anyString())).thenReturn(""); - Sink sinkFunction = sinkOrchestrator.getSink(configuration, new String[]{}, stencilClientOrchestrator, daggerStatsDReporter); + Sink sinkFunction = sinkOrchestrator.getSink(configuration, new String[]{}, stencilClientOrchestrator, daggerStatsDReporter, influxMeasurementOverrideName); assertThat(sinkFunction, instanceOf(InfluxDBSink.class)); } @@ -107,7 +108,7 @@ public void shouldReturnSinkMetrics() { when(configuration.getString(eq("SINK_TYPE"), anyString())).thenReturn("influx"); - sinkOrchestrator.getSink(configuration, new String[]{}, stencilClientOrchestrator, daggerStatsDReporter); + sinkOrchestrator.getSink(configuration, new String[]{}, stencilClientOrchestrator, daggerStatsDReporter, influxMeasurementOverrideName); assertEquals(expectedMetrics, sinkOrchestrator.getTelemetry()); } @@ -116,7 +117,7 @@ public void shouldReturnBigQuerySink() { when(configuration.getString(eq("SINK_TYPE"), anyString())).thenReturn("bigquery"); when(configuration.getString("SINK_CONNECTOR_SCHEMA_PROTO_MESSAGE_CLASS", "")).thenReturn("some.class"); when(configuration.getParam()).thenReturn(ParameterTool.fromMap(Collections.emptyMap())); - Sink sinkFunction = sinkOrchestrator.getSink(configuration, new String[]{}, stencilClientOrchestrator, daggerStatsDReporter); + Sink sinkFunction = sinkOrchestrator.getSink(configuration, new String[]{}, stencilClientOrchestrator, daggerStatsDReporter, influxMeasurementOverrideName); assertThat(sinkFunction, instanceOf(BigQuerySink.class)); } } diff --git a/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/influx/InfluxDBSinkTest.java b/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/influx/InfluxDBSinkTest.java index 257422878..84d90ed7e 100644 --- a/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/influx/InfluxDBSinkTest.java +++ b/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/influx/InfluxDBSinkTest.java @@ -29,7 +29,7 @@ import static org.mockito.MockitoAnnotations.initMocks; public class InfluxDBSinkTest { - + private final String influxMeasurementOverrideName = ""; private static final int SINK_INFLUX_BATCH_SIZE = 100; private static final int INFLUX_FLUSH_DURATION = 1000; @@ -74,7 +74,7 @@ public void setUp() throws Exception { @Test public void shouldCallInfluxDbFactoryWhileCreatingWriter() throws Exception { - InfluxDBSink influxDBSink = new InfluxDBSink(influxDBFactory, configuration, new String[]{}, errorHandler); + InfluxDBSink influxDBSink = new InfluxDBSink(influxDBFactory, configuration, new String[]{}, errorHandler, influxMeasurementOverrideName); List state = new ArrayList<>(); influxDBSink.createWriter(context, state); @@ -83,7 +83,7 @@ public void shouldCallInfluxDbFactoryWhileCreatingWriter() throws Exception { @Test public void shouldCreateInfluxWriter() throws IOException { - InfluxDBSink influxDBSink = new InfluxDBSink(influxDBFactory, configuration, new String[]{}, errorHandler); + InfluxDBSink influxDBSink = new InfluxDBSink(influxDBFactory, configuration, new String[]{}, errorHandler, influxMeasurementOverrideName); List state = new ArrayList<>(); SinkWriter writer = influxDBSink.createWriter(context, state); @@ -92,7 +92,7 @@ public void shouldCreateInfluxWriter() throws IOException { @Test public void shouldCallBatchModeOnInfluxWhenBatchSettingsExist() throws Exception { - InfluxDBSink influxDBSink = new InfluxDBSink(influxDBFactory, configuration, new String[]{}, errorHandler); + InfluxDBSink influxDBSink = new InfluxDBSink(influxDBFactory, configuration, new String[]{}, errorHandler, influxMeasurementOverrideName); List state = new ArrayList<>(); influxDBSink.createWriter(context, state); verify(influxDb).enableBatch(eq(SINK_INFLUX_BATCH_SIZE), eq(INFLUX_FLUSH_DURATION), eq(TimeUnit.MILLISECONDS), any(ThreadFactory.class), any(BiConsumer.class)); diff --git a/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/influx/InfluxDBWriterTest.java b/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/influx/InfluxDBWriterTest.java index b86056337..6f5a99791 100644 --- a/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/influx/InfluxDBWriterTest.java +++ b/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/influx/InfluxDBWriterTest.java @@ -38,6 +38,8 @@ public class InfluxDBWriterTest { + private final String influxMeasurementOverrideName = ""; + @Mock private Configuration configuration; @@ -110,14 +112,14 @@ private Point getPoint() { public void shouldWriteToConfiguredInfluxDatabase() throws Exception { Row row = new Row(1); row.setField(0, "some field"); - InfluxDBWriter influxDBWriter = new InfluxDBWriter(configuration, influxDb, new String[]{"some_field_name"}, errorHandler, errorReporter); + InfluxDBWriter influxDBWriter = new InfluxDBWriter(configuration, influxDb, new String[]{"some_field_name"}, errorHandler, errorReporter, influxMeasurementOverrideName); influxDBWriter.write(row, context); verify(influxDb).write(eq("dagger_test"), eq("two_day_policy"), any()); } @Test - public void shouldWriteRowToInfluxAsfields() throws Exception { + public void shouldWriteRowToInfluxAsFields() throws Exception { final int numberOfRows = 3; final String expectedFieldZeroValue = "abc"; final int expectedFieldOneValue = 100; @@ -132,7 +134,7 @@ public void shouldWriteRowToInfluxAsfields() throws Exception { .addField(rowColumns[1], expectedFieldOneValue) .time(now.toEpochMilli(), TimeUnit.MILLISECONDS).build(); - InfluxDBWriter influxDBWriter = new InfluxDBWriter(configuration, influxDb, rowColumns, errorHandler, errorReporter); + InfluxDBWriter influxDBWriter = new InfluxDBWriter(configuration, influxDb, rowColumns, errorHandler, errorReporter, influxMeasurementOverrideName); influxDBWriter.write(simpleFieldsRow, context); ArgumentCaptor pointArg = ArgumentCaptor.forClass(Point.class); verify(influxDb).write(any(), any(), pointArg.capture()); @@ -140,6 +142,29 @@ public void shouldWriteRowToInfluxAsfields() throws Exception { assertEquals(expectedPoint.lineProtocol(), pointArg.getValue().lineProtocol()); } + @Test + public void shouldWriteRowToInfluxAsFieldsWithOverrideName() throws Exception { + final int numberOfRows = 3; + final String expectedFieldZeroValue = "abc"; + final int expectedFieldOneValue = 100; + Instant now = Instant.now(); + Row simpleFieldsRow = new Row(numberOfRows); + simpleFieldsRow.setField(0, expectedFieldZeroValue); + simpleFieldsRow.setField(1, expectedFieldOneValue); + simpleFieldsRow.setField(2, LocalDateTime.ofInstant(now, ZoneOffset.UTC)); + String[] rowColumns = {"field1", "field2", "window_timestamp"}; + Point expectedPoint = Point.measurement("override_measurement") + .addField(rowColumns[0], expectedFieldZeroValue) + .addField(rowColumns[1], expectedFieldOneValue) + .time(now.toEpochMilli(), TimeUnit.MILLISECONDS).build(); + + InfluxDBWriter influxDBWriter = new InfluxDBWriter(configuration, influxDb, rowColumns, errorHandler, errorReporter, "override_measurement"); + influxDBWriter.write(simpleFieldsRow, context); + ArgumentCaptor pointArg = ArgumentCaptor.forClass(Point.class); + verify(influxDb).write(any(), any(), pointArg.capture()); + + assertEquals(expectedPoint.lineProtocol(), pointArg.getValue().lineProtocol()); + } @Test public void shouldNotWriteNullColumnsInRowToInfluxAsfields() throws Exception { @@ -156,7 +181,7 @@ public void shouldNotWriteNullColumnsInRowToInfluxAsfields() throws Exception { .addField(rowColumns[0], integerValue) .time(Timestamp.from(now).getTime(), TimeUnit.MILLISECONDS).build(); - InfluxDBWriter influxDBWriter = new InfluxDBWriter(configuration, influxDb, rowColumns, errorHandler, errorReporter); + InfluxDBWriter influxDBWriter = new InfluxDBWriter(configuration, influxDb, rowColumns, errorHandler, errorReporter, influxMeasurementOverrideName); influxDBWriter.write(simpleFieldsRow, context); ArgumentCaptor pointArg = ArgumentCaptor.forClass(Point.class); verify(influxDb).write(any(), any(), pointArg.capture()); @@ -180,7 +205,7 @@ public void shouldWriteRowWithTagColumns() throws Exception { .addField(rowColumns[1], expectedFieldOneValue) .time(Timestamp.from(now).getTime(), TimeUnit.MILLISECONDS).build(); - InfluxDBWriter influxDBWriter = new InfluxDBWriter(configuration, influxDb, rowColumns, errorHandler, errorReporter); + InfluxDBWriter influxDBWriter = new InfluxDBWriter(configuration, influxDb, rowColumns, errorHandler, errorReporter, influxMeasurementOverrideName); influxDBWriter.write(simpleFieldsRow, context); ArgumentCaptor pointArg = ArgumentCaptor.forClass(Point.class); verify(influxDb).write(any(), any(), pointArg.capture()); @@ -204,7 +229,7 @@ public void shouldWriteRowWithTagColumnsOfTypeInteger() throws Exception { .addField(rowColumns[1], expectedFieldOneValue) .time(Timestamp.from(now).getTime(), TimeUnit.MILLISECONDS).build(); - InfluxDBWriter influxDBWriter = new InfluxDBWriter(configuration, influxDb, rowColumns, errorHandler, errorReporter); + InfluxDBWriter influxDBWriter = new InfluxDBWriter(configuration, influxDb, rowColumns, errorHandler, errorReporter, influxMeasurementOverrideName); influxDBWriter.write(simpleFieldsRow, context); ArgumentCaptor pointArg = ArgumentCaptor.forClass(Point.class); verify(influxDb).write(any(), any(), pointArg.capture()); @@ -228,7 +253,7 @@ public void shouldWriteRowWithLabelColumns() throws Exception { .addField(rowColumns[1], expectedFieldOneValue) .time(Timestamp.from(now).getTime(), TimeUnit.MILLISECONDS).build(); - InfluxDBWriter influxDBWriter = new InfluxDBWriter(configuration, influxDb, rowColumns, errorHandler, errorReporter); + InfluxDBWriter influxDBWriter = new InfluxDBWriter(configuration, influxDb, rowColumns, errorHandler, errorReporter, influxMeasurementOverrideName); influxDBWriter.write(simpleFieldsRow, context); ArgumentCaptor pointArg = ArgumentCaptor.forClass(Point.class); verify(influxDb).write(any(), any(), pointArg.capture()); @@ -244,7 +269,7 @@ public void shouldThrowIfExceptionInWrite() throws Exception { doThrow(new RuntimeException()).when(influxDb).write(any(), any(), any()); - InfluxDBWriter influxDBWriter = new InfluxDBWriter(configuration, influxDb, columns, errorHandler, errorReporter); + InfluxDBWriter influxDBWriter = new InfluxDBWriter(configuration, influxDb, columns, errorHandler, errorReporter, influxMeasurementOverrideName); influxDBWriter.write(row, context); } @@ -258,7 +283,7 @@ public void shouldReportIncaseOfFatalError() throws Exception { errorHandler.init(initContext); errorHandler.getExceptionHandler().accept(points, new RuntimeException("exception from handler")); - InfluxDBWriter influxDBWriter = new InfluxDBWriter(configuration, influxDb, rowColumns, errorHandler, errorReporter); + InfluxDBWriter influxDBWriter = new InfluxDBWriter(configuration, influxDb, rowColumns, errorHandler, errorReporter, influxMeasurementOverrideName); Exception exception = assertThrows(Exception.class, () -> influxDBWriter.write(getRow(), context)); assertEquals("java.lang.RuntimeException: exception from handler", exception.getMessage()); @@ -277,7 +302,7 @@ public void shouldReportInCaseOfMaxSeriesExceeded() throws Exception { errorHandler.getExceptionHandler().accept(points, new InfluxDBException("{\"error\":\"partial write:" + " max-values-per-tag limit exceeded (100453/100000)")); - InfluxDBWriter influxDBWriter = new InfluxDBWriter(configuration, influxDb, rowColumns, errorHandler, errorReporter); + InfluxDBWriter influxDBWriter = new InfluxDBWriter(configuration, influxDb, rowColumns, errorHandler, errorReporter, influxMeasurementOverrideName); Exception exception = assertThrows(Exception.class, () -> influxDBWriter.write(getRow(), context)); assertEquals("org.influxdb.InfluxDBException: {\"error\":\"partial write: max-values-per-tag limit exceeded (100453/100000)", exception.getMessage()); @@ -296,7 +321,7 @@ public void shouldNotReportInCaseOfFailedRecordFatalError() throws Exception { errorHandler.getExceptionHandler().accept(points, new InfluxDBException("{\"error\":\"partial write: points beyond retention policy dropped=11\"}")); - InfluxDBWriter influxDBWriter = new InfluxDBWriter(configuration, influxDb, rowColumns, errorHandler, errorReporter); + InfluxDBWriter influxDBWriter = new InfluxDBWriter(configuration, influxDb, rowColumns, errorHandler, errorReporter, influxMeasurementOverrideName); influxDBWriter.write(getRow(), context); verify(errorReporter, times(0)).reportFatalException(any(InfluxWriteException.class)); } @@ -312,7 +337,7 @@ public void invokeShouldThrowErrorSetByHandler() throws Exception { errorHandler.init(initContext); errorHandler.getExceptionHandler().accept(points, new RuntimeException("exception from handler")); - InfluxDBWriter influxDBWriter = new InfluxDBWriter(configuration, influxDb, rowColumns, errorHandler, errorReporter); + InfluxDBWriter influxDBWriter = new InfluxDBWriter(configuration, influxDb, rowColumns, errorHandler, errorReporter, influxMeasurementOverrideName); InfluxWriteException exception = assertThrows(InfluxWriteException.class, () -> influxDBWriter.write(getRow(), context)); @@ -322,7 +347,7 @@ public void invokeShouldThrowErrorSetByHandler() throws Exception { @Test public void failSnapshotStateOnInfluxError() throws Exception { String[] rowColumns = {"tag_field1", "field2", "window_timestamp"}; - InfluxDBWriter influxDBWriter = new InfluxDBWriter(configuration, influxDb, rowColumns, errorHandler, errorReporter); + InfluxDBWriter influxDBWriter = new InfluxDBWriter(configuration, influxDb, rowColumns, errorHandler, errorReporter, influxMeasurementOverrideName); errorHandler.init(initContext); errorHandler.getExceptionHandler().accept(new ArrayList(), new RuntimeException("exception from handler")); @@ -335,7 +360,7 @@ public void failSnapshotStateOnInfluxError() throws Exception { @Test public void failSnapshotStateOnFlushFailure() throws Exception { String[] rowColumns = {"tag_field1", "field2", "window_timestamp"}; - InfluxDBWriter influxDBWriter = new InfluxDBWriter(configuration, influxDb, rowColumns, errorHandler, errorReporter); + InfluxDBWriter influxDBWriter = new InfluxDBWriter(configuration, influxDb, rowColumns, errorHandler, errorReporter, influxMeasurementOverrideName); Mockito.doThrow(new RuntimeException("exception from flush")).when(influxDb).flush(); Exception exception = assertThrows(Exception.class, diff --git a/version.txt b/version.txt index 26acbf080..aa22d3ce3 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -0.12.2 +0.12.3