This article provides steps for one method to test avro ingestion locally using the Imply distribution. The example sends nested avro using parser type: avro_stream and avroBytesDecoder type: schema_registry.
1. Add extension "druid-avro-extensions" to the loadList in conf-quickstart/druid/_common/common.runtime.properties if running in quickstart mode, and conf/druid/_common/common.runtime.properties if running in cluster mode.
2. Download Confluent quickstart, install: https://docs.confluent.io/current/quickstart/cos-quickstart.html . We will be using Confluent's supplied Kafka and schema-registry, so make sure no other Kafka process is currently running (port :9092).
3. Start Confluent from the Confluent directory. Your terminal should look something like this:
➜ bin/confluent start
Using CONFLUENT_CURRENT: /var/folders/yy/dk4fdc7n2zdf2gn1n5sblq980000gn/T/confluent.ZLnYVjH0
Starting zookeeper
zookeeper is [UP]
Starting kafka
kafka is [UP]
Starting schema-registry
schema-registry is [UP]
Starting kafka-rest
kafka-rest is [UP]
Starting connect
connect is [UP]
Starting ksql-server
ksql-server is [UP]
If one of the services doesn't start, you may have port conflicts. After fixing conflicts, you may need to cycle the affected service, e.g.:
➜ bin/kafka-rest-stop
➜ bin/kafka-rest-start
➜ bin/zookeeper-server-stop
➜ bin/zookeeper-server-start
4. If running Confluent and Imply on different machines, skip this step.
Before starting the Imply quickstart, change the ports for the coordinator, broker, overlord, and historical in conf-quickstart/druid/broker/runtime.properties, conf-quickstart/druid/coordinator/runtime.properties, etc, to avoid conflicts with Confluent ports. Change the zookeeper port in conf-quickstart/zk/zoo.cfg.
If you plan to use the Imply UI and you have never launched localhost:9095, it will be sufficient to modify the coordinator and broker ports in:conf-quickstart/pivot/config.yaml, where "host" = the broker. If you have previously used this Imply UI distribution, modify the port values in the UI under Settings -> My Druid.
If you don't want to change Imply ports, you will need to change ports in the Confluent distribution.
5. Create a json record file that represents the Avro you will be ingesting. For our example this record (location.json) is:
{"hilltop":{"timestamp":"2018-06-21T18:00:00Z","view":"ocean"}}
6. Create a schema that will describe your data. The schema for location.json is:
{
"type": "record",
"name": "location",
"fields": [
{
"name": "hilltop",
"type": {
"type": "record",
"name": "anotherNameDescribingType",
"fields": [
{
"name": "timestamp",
"type": "string",
"doc": "Local time",
"default": ""
},
{
"name": "view",
"type": "string",
"doc": "doYouSeeWhatISee",
"default": ""
}
]
}
}
]
}
7. Create a Kafka topic ("avro-test") and send the schema registry and sample file to Confluent Kafka:
➜ bin/kafka-avro-console-producer --broker-list localhost:9092 --topic avro-test --property value.schema='{"type":"record","name":"location","fields":[{"name":"hilltop","type":{"type":"record","name":"anotherNameDescribingType","fields":[{"name":"timestamp","type":"string","doc":"Local time","default":""},{"name":"view","type":"string","doc":"doYouSeeWhatISee","default":""}]}}]}' < location.json
8. In a different terminal window, check that the topic has been created:
➜ bin/kafka-topics --list --zookeeper localhost:2181
__confluent.support.metrics
__consumer_offsets
_confluent-ksql-default__command_topic
_schemas
avro-test
9. Ingest the record into Druid by submitting the supervisor spec from the Imply directory. If running Confluent and Imply on separate machines, edit the "url" and "bootstrap.servers" in the spec.
➜ curl -XPOST -H'Content-Type: application/json' -d @quickstart/avroSupervisor.json http://localhost:8090/druid/indexer/v1/supervisor
{"id":"location"}% <- the output id will be the name of the dataSource.
This supervisor spec looks like this:
{
"type": "kafka",
"dataSchema": {
"dataSource": "location",
"parser": {
"type": "avro_stream",
"avroBytesDecoder": {
"type": "schema_registry",
"url": "http://localhost:8081",
"capacity": 100
},
"parseSpec": {
"format": "avro",
"timestampSpec": {
"column": "timestamp",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": [
{ "name": "view", "type": "string" }
],
"dimensionExclusions": []
},
"flattenSpec": {
"useFieldDiscovery": true,
"fields": [
{
"type": "path",
"name": "timestamp",
"expr": "$.hilltop.timestamp"
},
{
"type": "path",
"name": "view",
"expr": "$.hilltop.view"
}
]
}
}
},
"metricsSpec": [],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "hour",
"queryGranularity": "NONE",
"rollup": false
}
},
"ioConfig": {
"topic": "avro-test",
"useEarliestOffset": true,
"consumerProperties": {
"bootstrap.servers": "localhost:9092"
}
},
"tuningConfig": {
"type": "kafka",
"reportParseExceptions": true
}
}
Note that the parseSpec format is "avro". The Confluent process is taking the json file and translating it into avro, and Druid is consuming the avro data.
10. Check that Druid has received the data, either from the terminal:
➜ bin/dsql -H localhost:8084 # broker port
Welcome to dsql, the command-line client for Druid SQL.
Type "\h" for help.
dsql> \d
┌──────────────┬─────────────────────────┐
│ TABLE_SCHEMA │ TABLE_NAME │
├──────────────┼─────────────────────────┤
│ druid │ location │
│ druid │ wikipedia │
└──────────────┴─────────────────────────┘
Retrieved 4 rows in 0.03s.
dsql> select * from "location";
┌──────────────────────────┬───────┐
│ __time │ view │
├──────────────────────────┼───────┤
│ 2018-06-21T18:00:00.000Z │ ocean │
└──────────────────────────┴───────┘
Retrieved 1 row in 0.14s.
or directly in the Imply UI (localhost:9095).
Comments
0 comments
Please sign in to leave a comment.