Skip to content

Conversation

@sigram
Copy link
Contributor

@sigram sigram commented Jan 19, 2026

This PR replaces Dropwizard JSON metrics with Prometheus metrics in the CrossDC Consumer, using directly the Prometheus client_java API. It also removes the Dropwizard dependency.

Copy link
Contributor

@mlbiscoc mlbiscoc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you post a sample of all these metrics? Either dump it here or in a txt file? It would be easier to review the names and labels on the metrics.

Counter.builder()
.name("consumer_input_total")
.help("Total number of input messages")
.labelNames("type", "subtype")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I question most of these metrics really need type label. What is the cardinality of it and possible different combinations? I see in the test UPDATE is one. Is there also QUERY or something along those lines?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, there's ADMIN and CONFIGSET.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm ok. I am not a fan of naming this label being called type. I think it should have some kind of context what it means as type and subtype can be very generic. Is it an operation or message_type maybe? Then what can subtype be? In core, I made it category but it is debateable if we should just remove that label/attribute all together from metrics. If you move type to something more specific then maybe you can just move off subtype to type. Again seeing an sample text output of these metrics would help if you can.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a request type - there are currently three types: UPDATE, ADMIN and CONFIGSET. Sub-type is primarily for UPDATE (add, dbi, dbq) and ADMIN (path).

Here's a sample output:

# HELP crossdc_consumer_input_total Total number of input messages
# TYPE crossdc_consumer_input_total counter
crossdc_consumer_input_total{otel_scope_name="org.apache.solr",subtype="add",type="UPDATE"} 1.0
# HELP crossdc_consumer_output_total Total number of output requests
# TYPE crossdc_consumer_output_total counter
crossdc_consumer_output_total{otel_scope_name="org.apache.solr",result="handled",type="UPDATE"} 1.0
# HELP crossdc_consumer_output_batch_size Histogram of output batch sizes
# TYPE crossdc_consumer_output_batch_size histogram
crossdc_consumer_output_batch_size_bucket{otel_scope_name="org.apache.solr",subtype="add",type="UPDATE",le="0.0"} 0
crossdc_consumer_output_batch_size_bucket{otel_scope_name="org.apache.solr",subtype="add",type="UPDATE",le="5.0"} 1
crossdc_consumer_output_batch_size_bucket{otel_scope_name="org.apache.solr",subtype="add",type="UPDATE",le="10.0"} 1
crossdc_consumer_output_batch_size_bucket{otel_scope_name="org.apache.solr",subtype="add",type="UPDATE",le="25.0"} 1
crossdc_consumer_output_batch_size_bucket{otel_scope_name="org.apache.solr",subtype="add",type="UPDATE",le="50.0"} 1
crossdc_consumer_output_batch_size_bucket{otel_scope_name="org.apache.solr",subtype="add",type="UPDATE",le="75.0"} 1
crossdc_consumer_output_batch_size_bucket{otel_scope_name="org.apache.solr",subtype="add",type="UPDATE",le="100.0"} 1
crossdc_consumer_output_batch_size_bucket{otel_scope_name="org.apache.solr",subtype="add",type="UPDATE",le="250.0"} 1
crossdc_consumer_output_batch_size_bucket{otel_scope_name="org.apache.solr",subtype="add",type="UPDATE",le="500.0"} 1
crossdc_consumer_output_batch_size_bucket{otel_scope_name="org.apache.solr",subtype="add",type="UPDATE",le="750.0"} 1
crossdc_consumer_output_batch_size_bucket{otel_scope_name="org.apache.solr",subtype="add",type="UPDATE",le="1000.0"} 1
crossdc_consumer_output_batch_size_bucket{otel_scope_name="org.apache.solr",subtype="add",type="UPDATE",le="2500.0"} 1
crossdc_consumer_output_batch_size_bucket{otel_scope_name="org.apache.solr",subtype="add",type="UPDATE",le="5000.0"} 1
crossdc_consumer_output_batch_size_bucket{otel_scope_name="org.apache.solr",subtype="add",type="UPDATE",le="7500.0"} 1
crossdc_consumer_output_batch_size_bucket{otel_scope_name="org.apache.solr",subtype="add",type="UPDATE",le="10000.0"} 1
crossdc_consumer_output_batch_size_bucket{otel_scope_name="org.apache.solr",subtype="add",type="UPDATE",le="+Inf"} 1
crossdc_consumer_output_batch_size_count{otel_scope_name="org.apache.solr",subtype="add",type="UPDATE"} 1
crossdc_consumer_output_batch_size_sum{otel_scope_name="org.apache.solr",subtype="add",type="UPDATE"} 1.0
# HELP crossdc_consumer_output_first_attempt_time_nanoseconds Histogram of first attempt request times
# TYPE crossdc_consumer_output_first_attempt_time_nanoseconds histogram
crossdc_consumer_output_first_attempt_time_nanoseconds_bucket{otel_scope_name="org.apache.solr",type="UPDATE",le="0.0"} 0
crossdc_consumer_output_first_attempt_time_nanoseconds_bucket{otel_scope_name="org.apache.solr",type="UPDATE",le="5000.0"} 0
crossdc_consumer_output_first_attempt_time_nanoseconds_bucket{otel_scope_name="org.apache.solr",type="UPDATE",le="10000.0"} 0
crossdc_consumer_output_first_attempt_time_nanoseconds_bucket{otel_scope_name="org.apache.solr",type="UPDATE",le="25000.0"} 0
crossdc_consumer_output_first_attempt_time_nanoseconds_bucket{otel_scope_name="org.apache.solr",type="UPDATE",le="50000.0"} 0
crossdc_consumer_output_first_attempt_time_nanoseconds_bucket{otel_scope_name="org.apache.solr",type="UPDATE",le="100000.0"} 0
crossdc_consumer_output_first_attempt_time_nanoseconds_bucket{otel_scope_name="org.apache.solr",type="UPDATE",le="250000.0"} 0
crossdc_consumer_output_first_attempt_time_nanoseconds_bucket{otel_scope_name="org.apache.solr",type="UPDATE",le="500000.0"} 0
crossdc_consumer_output_first_attempt_time_nanoseconds_bucket{otel_scope_name="org.apache.solr",type="UPDATE",le="1000000.0"} 0
crossdc_consumer_output_first_attempt_time_nanoseconds_bucket{otel_scope_name="org.apache.solr",type="UPDATE",le="2500000.0"} 0
crossdc_consumer_output_first_attempt_time_nanoseconds_bucket{otel_scope_name="org.apache.solr",type="UPDATE",le="5000000.0"} 0
crossdc_consumer_output_first_attempt_time_nanoseconds_bucket{otel_scope_name="org.apache.solr",type="UPDATE",le="2.5E7"} 0
crossdc_consumer_output_first_attempt_time_nanoseconds_bucket{otel_scope_name="org.apache.solr",type="UPDATE",le="1.0E8"} 0
crossdc_consumer_output_first_attempt_time_nanoseconds_bucket{otel_scope_name="org.apache.solr",type="UPDATE",le="1.0E9"} 0
crossdc_consumer_output_first_attempt_time_nanoseconds_bucket{otel_scope_name="org.apache.solr",type="UPDATE",le="+Inf"} 1
crossdc_consumer_output_first_attempt_time_nanoseconds_count{otel_scope_name="org.apache.solr",type="UPDATE"} 1
crossdc_consumer_output_first_attempt_time_nanoseconds_sum{otel_scope_name="org.apache.solr",type="UPDATE"} 1.7667254470782164E18
# HELP crossdc_consumer_output_time_milliseconds Histogram of output request times
# TYPE crossdc_consumer_output_time_milliseconds histogram
crossdc_consumer_output_time_milliseconds_bucket{otel_scope_name="org.apache.solr",type="UPDATE",le="0.0"} 0
crossdc_consumer_output_time_milliseconds_bucket{otel_scope_name="org.apache.solr",type="UPDATE",le="5.0"} 0
crossdc_consumer_output_time_milliseconds_bucket{otel_scope_name="org.apache.solr",type="UPDATE",le="10.0"} 0
crossdc_consumer_output_time_milliseconds_bucket{otel_scope_name="org.apache.solr",type="UPDATE",le="25.0"} 1
crossdc_consumer_output_time_milliseconds_bucket{otel_scope_name="org.apache.solr",type="UPDATE",le="50.0"} 1
crossdc_consumer_output_time_milliseconds_bucket{otel_scope_name="org.apache.solr",type="UPDATE",le="75.0"} 1
crossdc_consumer_output_time_milliseconds_bucket{otel_scope_name="org.apache.solr",type="UPDATE",le="100.0"} 1
crossdc_consumer_output_time_milliseconds_bucket{otel_scope_name="org.apache.solr",type="UPDATE",le="250.0"} 1
crossdc_consumer_output_time_milliseconds_bucket{otel_scope_name="org.apache.solr",type="UPDATE",le="500.0"} 1
crossdc_consumer_output_time_milliseconds_bucket{otel_scope_name="org.apache.solr",type="UPDATE",le="750.0"} 1
crossdc_consumer_output_time_milliseconds_bucket{otel_scope_name="org.apache.solr",type="UPDATE",le="1000.0"} 1
crossdc_consumer_output_time_milliseconds_bucket{otel_scope_name="org.apache.solr",type="UPDATE",le="2500.0"} 1
crossdc_consumer_output_time_milliseconds_bucket{otel_scope_name="org.apache.solr",type="UPDATE",le="5000.0"} 1
crossdc_consumer_output_time_milliseconds_bucket{otel_scope_name="org.apache.solr",type="UPDATE",le="7500.0"} 1
crossdc_consumer_output_time_milliseconds_bucket{otel_scope_name="org.apache.solr",type="UPDATE",le="10000.0"} 1
crossdc_consumer_output_time_milliseconds_bucket{otel_scope_name="org.apache.solr",type="UPDATE",le="+Inf"} 1
crossdc_consumer_output_time_milliseconds_count{otel_scope_name="org.apache.solr",type="UPDATE"} 1
crossdc_consumer_output_time_milliseconds_sum{otel_scope_name="org.apache.solr",type="UPDATE"} 13.0
# TYPE target_info gauge
target_info{service_name="unknown_service:java",telemetry_sdk_language="java",telemetry_sdk_name="opentelemetry",telemetry_sdk_version="1.56.0"} 1

}
if (dbqCount > 0) {
outputBatchSizeHistogram.record(
dbiCount, attr(ATTR_TYPE, type.name(), ATTR_SUBTYPE, "delete_by_query"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be dbqCount.

public void recordOutputBatchSize(MirroredSolrRequest.Type type, SolrRequest<?> solrRequest) {
if (type != MirroredSolrRequest.Type.UPDATE) {
outputBatchSizeHistogram.record(
1, attr(ATTR_TYPE, type.name(), ATTR_SUBTYPE, solrRequest.getPath()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

solrRequest.getPath() -> what is the cardinality of this? Probably not that concerning and I assume it is low but if there is many different paths this can get big.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be very small, basically only /admin/collections or /admin/configs. OTOH I can make sure it stays small by using a fixed value for subtype.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OTOH a more useful label here would be the action performed so that you can report what kind of ADMIN / CONFIGSET requests were seen.

Comment on lines +80 to +90
outputTimeHistogram =
metricsContext.longHistogram(
NAME_PREFIX + "output_time",
"Histogram of output request times",
OtelUnit.MILLISECONDS);

outputFirstAttemptHistogram =
metricsContext.longHistogram(
NAME_PREFIX + "output_first_attempt_time",
"Histogram of first attempt request times",
OtelUnit.NANOSECONDS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A nitpick but I think we should try to keep request times either one or the other as best as possible unless needed. Making dashboard or aggregations on different metrics of nanoseconds and milliseconds can get annoying. This is not a hard blocker though if you think we need nano and milli.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the first attempt time to use millis, this should be enough.


outputBackoffHistogram =
metricsContext.longHistogram(
NAME_PREFIX + "output_backoff_size", "Histogram of output backoff sleep times");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a time but you called it size.

Comment on lines +62 to +63
input =
metricsContext.longCounter(NAME_PREFIX + "input_total", "Total number of input messages");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not liking this metric very much. It is a bit confusing. For any non-update I see above you are just incrementing by 1 but at the same time this metric is incrementing by number of docs and/or deleted docs? The help message doesn't reflect that.

I think it should be consistent otherwise the way these increment is confusing. Make 2 metrics names if one is number of requests and the other is the number of docs/deleted docs. Maybe call it inputItems and other requests/messages

Comment on lines +536 to +539
metrics.incrementOutputCounter(type.name(), "failed-dlq");
} else {
kafkaMirroringSink.submit(item);
metrics.counter(MetricRegistry.name(type.name(), "failed-resubmit")).inc();
metrics.incrementOutputCounter(type.name(), "failed-resubmit");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do underscores instead of hyphens. I think that is the standard for OTEL and proemtheus

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also output looks to be more of a result rather than type. I think the name for this key should be result

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other thing I you could potentially do is change this metric. Instead have 2 success/failure. If success then successfully_outputs or some name like that then for failures or errors call is failed_outputs with a type label differentiating between the failures. Failed submit or failed rety etc.

public static final String ATTR_SUBTYPE = "subtype";
public static final String ATTR_RESULT = "result";

protected final Map<String, Attributes> attributesCache = new ConcurrentHashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this for?

Comment on lines +171 to +183
public ConsumerTimer startOutputTimeTimer(final String requestType) {
final RTimer timer =
new RTimer(TimeUnit.MILLISECONDS) {
@Override
public double stop() {
double elapsedTime = super.stop();
outputTimeHistogram.record(
Double.valueOf(elapsedTime).longValue(), attr(ATTR_TYPE, requestType));
return elapsedTime;
}
};
return () -> timer.stop();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had created these wrappers for doing this kind of thing. See AttributedLongTimer. Could you not use this?

Comment on lines +58 to +73
// List<MetricSnapshot> snapshotList =
// metricManager.getPrometheusMetricReaders().values().stream()
// .flatMap(r -> r.collect().stream())
// .toList();
// MetricSnapshots snapshots = MetricSnapshots.of(snapshotList.toArray(new
// MetricSnapshot[0]));
// String output;
// try {
// ByteArrayOutputStream baos = new ByteArrayOutputStream();
// new PrometheusTextFormatWriter(false).write(baos, snapshots);
// output = baos.toString();
// } catch (Exception e) {
// log.error("Error while writing final metrics", e);
// output = snapshots.toString();
// }
// log.info("#### Consumer Metrics: ####\n{}", output);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delete this?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants