SchemaRegistry

SchemaRegistry provides a central repository for a message’s metadata. A schema specifies the message structure and type. Schema Registry will allow us to store these schemas efficiently and provides a pluggable serializer/deserializer interfaces and run-time provision of serializer/deserializer implementations based on incoming messages. Schema registry will also enable reuse/discovery/authoring/collaboration related to schemas.

Each Schema is mainly represented with metadata like

name - Name of the schema which is unique across the schema registry.

type - Represents the type of schema. For ex Avro, ProtoBuf, Json etc

schemaGroup - Group of schemas in which this schema belongs to. It can be like Kafka, Hive, Spark or system log etc

compatibility - Compatibility between different versions of the schema.

description - Description about the different versions of a schema.

Each of these schemas can evolve with multiple versions. Each version of the Schema can have

schemaText - Textual representation of schema

description - Description about this version

Compatibility

Compatibility of different versions of a schema can be configured with any of the below values

Backward - It indicates that new version of a schema would be compatible with earlier version of that schema. That means the data written from earlier version of the schema, can be deserialized with a new version of the schema.

Forward - It indicates that an existing schema is compatible with subsequent versions of the schema. That means the data written from new version of the schema can still be read with old version of the schema.

Full - It indicates that a new version of the schema provides both backward and forward compatibilities.

None - There is no compatibility between different versions of a schema.

Quickstart

Installation

  1. Download the latest release from here
  2. Registry server can be started with in-memory store or a persistent store like mysql. To setup with mysql please follow the instructions here.
  3. To start with in-memory store.
cp $REGISTRY_HOME/conf/registry-inmemory-example.yaml $REGISTRY_HOME/conf/registry.yaml
# start the server in fore-ground
$REGISTRY_HOME/bin/registry-server-start conf/registry.yaml
# To start in daemon mode
sudo ./bin/registry start
  1. Access the UI at http://host.name:9090
SchemaRegistry UI

SchemaRegistry UI

Running Kafka Example

SchemaRegistry makes it very easy to integrate with Kafka, Storm and Nifi and any other systems. We’ve an example code on how to integrate with kafka here.

To run this example, follow the steps below

Download and Start Apache Kafka

  1. Download kafka 0.10.0.1 or higher from here.
  2. $KAFKA_HOME/bin/zoookeeper-server-start.sh config/zookeeper.properties
  3. $KAFKA_HOME/bin/kafka-server-start.sh config/server.properties
  4. $KAFKA_HOME/bin/kafka-topics.sh –zookeeper localhost:2181 –topic truck_events_stream –partitions 1 –replication-factor 1 –create

Run producer to register schema and send data

  1. cd $REGISTRY_HOME/examples/schema-registry/avro
  2. To send messages to topic “truck_events_stream”
java -jar avro-examples-0.1.0-SNAPSHOT.jar -d data/truck_events.csv -p data/kafka-producer.props -sm -s data/truck_events.avsc

Kafka Producer Integration with SchemaRegistry

  1. Any client integration code must make a dependency on schema-registry-serdes
<dependency>
  <groupId>com.hortonworks.registries</groupId>
  <artifactId>schema-registry-serdes</artifactId>
</dependency>
  1. For KafkaProducer, user need to add the following config
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name(), props.get(SCHEMA_REGISTRY_URL));
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name(), props.get(SCHEMA_REGISTRY_URL));
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
config.put(SchemaRegistryClient.Configuration.AUTH_USERNAME, "user1");
config.put(SchemaRegistryClient.Configuration.AUTH_PASSWORD, "password");

Important settings from the above are schema.registry.url:

This should be set to where the registry server is running ex: http://localhost:9090/api/v1
schema.registry.auth.username:
If the schema registry service is behind a proxy that supports Basic Authentication, the user name part of the credentials can be provided here.
schema.registry.auth.password:
If the schema registry service is behind a proxy that supports Basic Authentication, the password part of the credentials can be provided here.
key.serializer:
StringSerializer is used in the above example.
value.serializer:
com.hortonworks.registries.schemaregistry.serdes.avro.kafka.KafkaAvroSerializer is used in the above example. This serializer has integration with schema registry. It will take the producer config and retrieves schema.registry.url and the topic name to find out the schema. If there is no schema defined it will publish a first version of that schema.
  1. For KafkaProducer, to save the schema version information in the Record Header, user need to include the following config:
config.put(KafkaAvroSerializer.STORE_SCHEMA_VERSION_ID_IN_HEADER, "true");
config.put(KafkaAvroSerde.KEY_SCHEMA_VERSION_ID_HEADER_NAME, "key.schema.version.id"); // optional
config.put(KafkaAvroSerde.VALUE_SCHEMA_VERSION_ID_HEADER_NAME, "value.schema.version.id"); // optional

store.schema.version.id.in.header By default, this is set to ‘false’ to maintain backward compatibility. User needs to enable it to save the schema version information in the header.

key_schema_version_id_header_name Configurable header name to save the Record Key schema version information. This configuration is applicable only when key.serializer is set to ‘KafkaAvroSerializer’.

value_schema_version_id_header_name Configurable header name to save the Record Value schema version information. This configuration is applicable only when value.serializer is set to ‘KafkaAvroSerializer’.

Run consumer to retrieve schema and deserialze the messages

  1. cd $REGISTRY_HOME/examples/schema-registry/avro
  2. To consume messages from topic “truck_events_stream”
java -jar avro-examples-0.5.0-SNAPSHOT.jar -cm -c data/kafka-consumer.props
press ctrl + c to stop

Kafka Consumer Integration with SchemaRegistry

  1. Any client integration code must make a dependency on schema-registry-serdes
<dependency>
  <groupId>com.hortonworks.registries</groupId>
  <artifactId>schema-registry-serdes</artifactId>
</dependency>
  1. For KafkaConsumer, user need to add the following to config
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name(), props.get(SCHEMA_REGISTRY_URL));
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name(), props.get(SCHEMA_REGISTRY_URL));
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
config.put(SchemaRegistryClient.Configuration.AUTH_USERNAME, "user1");
config.put(SchemaRegistryClient.Configuration.AUTH_PASSWORD, "password");

Important settings from the above are

schema.registry.url:
This should be set to where the registry server is running ex: http://localhost:9090/api/v1
schema.registry.auth.username:
If the schema registry service is behind a proxy that supports Basic Authentication, the user name part of the credentials can be provided here.
schema.registry.auth.password:
If the schema registry service is behind a proxy that supports Basic Authentication, the password part of the credentials can be provided here.
key.deserializer:
StringDeserializer is used in the above example.
value.deserializer:
com.hortonworks.registries.schemaregistry.serdes.avro.kafka.KafkaAvroDeserializer is used in the above example.

This deserializer tries to find schema.id in the message payload.

3. For KafkaConsumer, to retrieve the schema version information from the Record Header, user may have to include the below configs, if they were supplied in the KafkaProducer.

config.put(KafkaAvroSerde.KEY_SCHEMA_VERSION_ID_HEADER_NAME, "key.schema.version.id"); // optional
config.put(KafkaAvroSerde.VALUE_SCHEMA_VERSION_ID_HEADER_NAME, "value.schema.version.id"); // optional

KafkaAvroDeserializer tries to find schema.id from the message header. If it’s not available, tries to find the schema.id from the message payload. So, a topic can contain messages that can hold version information (schema.id) either in the header / payload.

If the deserializer finds schema.id, makes a call to schema registry to fetch the avro schema. If it doesn’t find schema.id it falls back to using topic name to fetch a schema.

API examples

Default serializer and deserializer APIs.

Default serializer and deserializer for a given schema provider can be retrieved with the below APIs.

// for avro,
AvroSnapshotSerializer serializer = schemaRegistryClient.getDefaultSerializer(AvroSchemaProvider.TYPE);
AvroSnapshotDeserializer deserializer = schemaRegistryClient.getDefaultDeserializer(AvroSchemaProvider.TYPE);