Examples¶
SchemaRegistry comes with examples of integration into Kafka. You can find the code here.
Running Kafka Producer with AvroSerializer¶
- Login into one of the Kafka broker hosts
bin/kafka-topics.sh --create --bootstrap-server <kafka host>:9092 --replication-factor 1 --partitions 2 --topic truck_events_stream
- On registry host;
cd /opt/cloudera/parcels/CDH/lib/schemaregistry/examples/schema-registry/avro/
- Edit data/kafka-producer.props
topic=truck_events_stream
bootstrap.servers=<kafka_host1>:9092,<kafka_host2>:9092
schema.registry.url=http://<regisry_host>:7788/api/v1
security.protocol=PLAINTEXT
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=com.hortonworks.registries.schemaregistry.serdes.avro.kafka.KafkaAvroSerializer
ignoreInvalidMessages=true
- The following command will register truck_events schema in
data/truck_events.avsc
into registry and ingests 200 messages into topic “truck_events_stream”
java -jar avro-examples-0.*.jar -d data/truck_events_json -p data/kafka-producer.props -sm -s data/truck_events.avsc
(java is installed in /usr/java/default/bin/java
)
To run the producer in Secure cluster:¶
- Issue ACLs on the topic you are trying to ingest
- create kafka topic:
Make sure you replaceprincipal_name
with the username you are trying to ingest
bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=<zookeeper_host>:2181 --add --allow-principal User:principal_name --allow-host "*" --operation All --topic truck_events_stream
- On registry host;
cd /opt/cloudera/parcels/CDH/lib/schemaregistry/examples/schema-registry/avro/
edit data/kafka-producer.props
, add “security.protocol=SASL_PLAINTEXT”
topic=truck_events_stream
bootstrap.servers=<kafka_host1>:9092,<kafka_host2>:9092
schema.registry.url=http://<regisry_host>:7788/api/v1
security.protocol=SASL_PLAINTEXT
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=com.hortonworks.registries.schemaregistry.serdes.avro.kafka.KafkaAvroSerializer
- Create following
/etc/kafka/conf/kafka_client_jaas.conf
to pass to the Kafka Producer’s JVM
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useTicketCache=true
renewTicket=true
serviceName="kafka";
};
In the above config we are expecting Kafka brokers running with principal kafka
.
kinit -kt your.keytab principal@EXAMPLE.COM
Make sure you gave ACLs to the principal refer to [2]
- java -Djava.security.auth.login.config=/etc/kafka/conf/kafka_client_jaas.conf -jar avro-examples-0.*.jar -d data/truck_events_json -p data/kafka-producer.props -sm -s data/truck_events.avsc
To run the producer in Secure cluster using dynamic JAAS configuration:¶
- Issue ACLs on the topic you are trying to ingest
- create kafka topic
Make sure you replace “principal_name” with the username you are trying to ingest
bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=<zookeeper_host>:2181 --add --allow-principal User:principal_name --allow-host "*" --operation All --topic truck_events_stream
- On registry host;
cd /opt/cloudera/parcels/CDH/lib/schemaregistry/examples/schema-registry/avro/
edit data/kafka-producer.props
, add security.protocol=SASL_PLAINTEXT
and sasl.jaas.config
parameter
topic=truck_events_stream
bootstrap.servers=<kafka_host1>:9092,<kafka_host2>:9092
schema.registry.url=http://<regisry_host>:7788/api/v1
security.protocol=SASL_PLAINTEXT
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=com.hortonworks.registries.schemaregistry.serdes.avro.kafka.KafkaAvroSerializer
sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useTicketCache=true renewTicket=true serviceName="kafka";
kinit -kt your.keytab principal@EXAMPLE.COM
Make sure you gave ACLs to the principal refer to [2]
java -jar avro-examples-0.*.jar -d data/truck_events_json -p data/kafka-producer.props -sm -s data/truck_events.avsc
Running Kafka Consumer with AvroDeserializer¶
- On registry host;
cd /opt/cloudera/parcels/CDH/lib/schemaregistry/examples/schema-registry/avro/
Edit data/kafka-consumer.props
topic=truck_events_stream
bootstrap.servers=<kafka_host1>:9092,<kafka_host2>:9092
schema.registry.url=http://<regisry_host>:7788/api/v1
security.protocol=PLAINTEXT
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=com.hortonworks.registries.schemaregistry.serdes.avro.kafka.KafkaAvroDeserializer
group.id=truck_group
auto.offset.reset=earliest
java -jar avro-examples-0.*.jar -c data/kafka-consumer.props -cm -s data/truck_events.avsc
To run the consumer in Secure cluster:¶
- On registry host;
cd /opt/cloudera/parcels/CDH/lib/schemaregistry/examples/schema-registry/avro/
Edit data/kafka-consumer.props
topic=truck_events_stream
bootstrap.servers=<kafka_host1>:9092,<kafka_host2>:9092
schema.registry.url=http://<regisry_host>:7788/api/v1
security.protocol=SASL_PLAINTEXT
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=com.hortonworks.registries.schemaregistry.serdes.avro.kafka.KafkaAvroDeserializer
group.id=truck_group
auto.offset.reset=earliest
- Create following
/etc/kafka/conf/kafka_client_jaas.conf
to pass to the Kafka Producer’s JVM
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useTicketCache=true
renewTicket=true
serviceName="kafka";
};
In the above config we are expecting Kafka brokers running with principal “kafka”.
kinit -kt your.keytab principal@EXAMPLE.COM
Make sure you gave ACLs to the pricncipal refer to [2]
java -Djava.security.auth.login.config=/etc/kafka/conf/kafka_client_jaas.conf -jar avro-examples-0.*.jar -c data/kafka-consumer.props -cm
To run the consumer in Secure cluster using dynamic JAAS configuration:¶
- On registry host;
cd /opt/cloudera/parcels/CDH/lib/schemaregistry/examples/schema-registry/avro/
Edit data/kafka-consumer.props
topic=truck_events_stream
bootstrap.servers=<kafka_host1>:9092,<kafka_host2>:9092
schema.registry.url=http://<regisry_host>:7788/api/v1
security.protocol=SASL_PLAINTEXT
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=com.hortonworks.registries.schemaregistry.serdes.avro.kafka.KafkaAvroDeserializer
group.id=truck_group
auto.offset.reset=earliest
sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useTicketCache=true renewTicket=true serviceName="kafka";
kinit -kt your.keytab principal@EXAMPLE.COM
Make sure you gave ACLs to the pricncipal refer to [2]
java -jar avro-examples-0.*.jar -c data/kafka-consumer.props -cm