Kafka Topics
Maestro can be configured to listen to Kafka topics for various operations. This page explains how to set up Kafka integration and use different message types.
Configuring the Kafka Integration
To enable Kafka integration, add the following configuration to your Maestro application properties or YAML file:
###############################################################################
# Spring Configuration (Kafka)
# Including Kafka integration with song
###############################################################################
spring:
config:
useLegacyProcessing: true
application:
name: maestro
output.ansi.enabled: ALWAYS
cloud:
stream:
kafka: # remove this key to disable kafka
binder:
brokers: localhost:29092
bindings:
songInput:
consumer:
enableDlq: true
dlqName: maestro_song_analysis_dlq
autoCommitOnError: true
autoCommitOffset: true
input:
consumer:
enableDlq: true
dlqName: maestro_index_requests_dlq
autoCommitOnError: true
autoCommitOffset: true
bindings:
songInput:
destination: song-analysis
group: songConsumerGrp
consumer:
maxAttempts: 1
input:
# We don't specify content type because @StreamListener will handle that
destination: maestro_index_requests
group: requestsConsumerGrp
consumer:
maxAttempts: 1
For more details about the configuration, click here
Spring Configuration
spring.config.useLegacyProcessing
: Enables legacy configuration processing modespring.application.name
: Sets application identifier as "maestro"spring.output.ansi.enabled
: Controls ANSI color output in logs- Possible values:
ALWAYS
,NEVER
,DETECT
- Possible values:
Kafka Configuration
spring.cloud.stream.kafka.binder.brokers
: Kafka broker connection URLspring.cloud.stream.kafka.bindings.songInput.consumer
: Song analysis consumer settingsenableDlq
: Enables dead letter queue (see relevant confluent developer documentation here)dlqName
: DLQ name for failed song analysis messagesautoCommitOnError
: Auto-commits offsets on errorautoCommitOffset
: Auto-commits processed message offsets- For more information on these configuration see Kafka Consumers docs(https://docs.confluent.io/platform/current/clients/consumer.html)
- For more details on offset commit behavior, see:
Stream Bindings
-
spring.cloud.stream.bindings.songInput
: -
destination
: Target Kafka topic for song analysis -
group
: Consumer group name -
maxAttempts
: Maximum retry attempts for message processing- Possible values: Integer > 0
-
spring.cloud.stream.bindings.input
:destination
: Topic for maestro index requestsgroup
: Consumer group for index requestsmaxAttempts
: Maximum processing retry attempts- Possible values: Integer > 0
Kafka Topics
Maestro listens to two main Kafka topics:
maestro_index_requests
: For on-demand indexing requestssong-analysis
: For Song analysis updates
maestro_index_requests Topic
This topic is used for sending on-demand indexing requests to Maestro. Messages should be in JSON format and can be of three types:
-
Analysis Indexing
{
"value": {
"repositoryCode": "collab",
"studyId": "PEK-AB",
"analysisId": "EGAZ000",
"remove": true
}
}remove
: Set totrue
for deletion,false
or omit for indexing/updating
-
Study Indexing
{
"value": {
"repositoryCode": "collab",
"studyId": "PEK-AB"
}
} -
Full Song Repository Indexing
{
"value": {
"repositoryCode": "aws"
}
}
song-analysis Topic
This topic receives messages about Song analysis updates. The message schema is defined by Song, but typically looks like this:
{
"value": {
"analysisId": "12314124",
"studyId": "PEK-AB",
"songServerId": "collab",
"state": "PUBLISHED"
}
}