Java Example to Use Kafka Streams for Real-time Data
Overview
This guide explains how to implement a Java Kafka consumer to receive onchain data streams from Bitquery in real-time using the Apache Kafka library. The consumer is secured with SSL and SASL, subscribing to a Kafka topic and logging messages to the console.
You can find the complete code here.
Quick Start
- Create Your Java Project: Set up a new Java project with the Apache Kafka client library.
- Prepare SSL Certificates: Convert your PEM certificates to JKS format for Java compatibility.
- Configure Kafka Properties: Update your Kafka properties file with SSL, SASL, and broker details.
- Write the Consumer Code: Use the provided Kafka consumer code to consume and process messages.
Prerequisites
Ensure that you have the following components in place before running the code:
- Kafka Cluster: Accessible Kafka brokers from Bitquery.
- Username and Password: For authentication with the Kafka brokers.
- Topic name(s): The topic you wish to subscribe to.
- Java: JDK 8 or higher installed.
- Apache Kafka Client: The Kafka client library for Java must be included in your project dependencies.
- Certificates: Client and server certificates to secure communication between your consumer and the Kafka cluster.
Step 1: Create Your Java Project
To begin, create a new Java project in your preferred Integrated Development Environment (IDE) such as IntelliJ IDEA or Eclipse.
Add Kafka Client Library Dependency
Include the Kafka client library in your project. If you are using Maven for dependency management, add the following to your pom.xml
file:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.4.0</version> <!-- Use the latest version -->
</dependency>
Save the file, and Maven will download the required dependencies.
Step 2: Prepare SSL Certificates for Kafka
To enable secure communication between your Kafka consumer and the Kafka cluster, you need to convert your PEM certificates to Java-compatible JKS format.
Convert PEM to PKCS12 Format
Use the openssl
tool to convert your PEM certificates (client.cer.pem
and client.key.pem
) into a PKCS12 file.
Run the following command in your terminal:
openssl pkcs12 -export -in client.cer.pem -inkey client.key.pem -out client.p12 -name mykey -CAfile ca.cer.pem -caname root -password pass:123456
This command creates a file named client.p12
that contains the client certificates in PKCS12 format. The -password
flag sets the password for the PKCS12 file.
Convert PKCS12 to JKS Format
Now, use the keytool
utility to convert the PKCS12 file into a JKS keystore:
keytool -importkeystore -deststorepass 123456 -destkeypass 123456 -destkeystore keystore.jks -srckeystore client.p12 -srcstoretype PKCS12 -srcstorepass 123456 -alias mykey
This creates a keystore.jks
file, which is compatible with Java.
Create a Truststore Containing the Broker's CA Certificate
Retrieve the Broker's Certificate: Use
openssl
to retrieve the broker's certificate:echo | openssl s_client -connect rpk0.bitquery.io:9093 -servername rpk0.bitquery.io -showcerts > broker_certs.pem
This saves the broker's certificates into a file named
broker_certs.pem
.Import Certificates into Truststore: Use
keytool
to import the broker's certificates into a truststore:keytool -import -alias brokerCert -file broker_certs.pem -keystore clienttruststore.jks -storepass truststorepassword
This step creates a clienttruststore.jks
file, which contains the broker's certificate.
Step 3: Configure Kafka Properties
In your Java application, configure the Kafka client properties to use the generated keystore.jks
and clienttruststore.jks
files for SSL authentication.
Here’s an example configuration:
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.config.SaslConfigs;
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"rpk0.bitquery.io:9093,rpk1.bitquery.io:9093,rpk2.bitquery.io:9093");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-group-1");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
props.put("ssl.endpoint.identification.algorithm", "");
props.put(SaslConfigs.SASL_JAAS_CONFIG,
"org.apache.kafka.common.security.scram.ScramLoginModule required username=\"your-username\" password=\"your-password\";");
props.put("ssl.keystore.location", "src/main/resources/keystore.jks");
props.put("ssl.keystore.password", "123456");
props.put("ssl.key.password", "123456");
props.put("ssl.truststore.location", "src/main/resources/clienttruststore.jks");
props.put("ssl.truststore.password", "truststorepassword");
Ensure the file paths and passwords match your setup.
Step 4: Write the Kafka Consumer Code
Write the Kafka consumer code to connect to the Kafka topic and process messages.
Initialize the Kafka Consumer
Create and configure a KafkaConsumer
object:
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("tron.broadcasted.transactions"));
Poll and Process Messages
Use the poll
method to retrieve messages from the Kafka topic and process them:
while (true) {
ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<byte[], byte[]> record : records) {
String value = new String(record.value());
System.out.println(value);
}
}
Graceful Shutdown
Add a shutdown hook to close the consumer cleanly when the application is terminated:
Runtime.getRuntime().addShutdownHook(new Thread(consumer::wakeup));
try {
// Main consumer loop
} catch (WakeupException e) {
// Ignore for shutdown
} finally {
consumer.close();
}
Execution Workflow
- Kafka Client Initialization: The Kafka client is configured with SSL and SASL properties.
- Subscribe to Kafka Topic: The consumer connects to the specified topic.
- Message Processing: Messages are retrieved, processed, and logged to the console.
- Graceful Shutdown: The consumer shuts down cleanly when terminated.