SQream Platform
GPU Powered Data & Analytics Acceleration
Enterprise (Private Deployment) SQL on GPU for Large & Complex Queries
Public Cloud (GCP, AWS) GPU Powered Data Lakehouse
No Code Data Solution for Small & Medium Business
By SQream
In this article, we’re going to take a look at Apache Avro Data Format, which is a data serialization system that converts an object in memory to a stream of bytes.
Visit SQream documentation to learn how to insert data from Avro into SQream
Let’s start with an example. Say we have a software application dealing with orders. Each order has data like the ID, date, customer ID, type, etc. This data is loaded in memory as an object/struct as per the programming language. Below are some examples of when we might want to serialize this object:
And what will we do with the serialized data? At some later point, we’ll need to deserialize it or read it back from the stream of bytes to an object. This implies that the serialization needs to have a particular format so that we are able to deserialize it. There are already some formats in use for this (each with their own drawback):
Why use Avro when there are already other formats present? There are quite a few reasons. Avro:
The Avro format consists of two parts: the schema and the serialized data. Both need to be present when serializing/writing or deserializing/reading data.
The schema specifies the structure of the serialized format. It defines the fields that the serialized format contains, their data types (whether optional or not), default values, etc. The schema itself is in JSON format. A schema can be any of the below:
The built-in data types are as follows:
The complex types each have different attributes (e.g., record has fields, array has items). There are some attributes common to them though.
The above is a simplified explanation of the schema. The full Avro format specification can be found here.
As an example, the schema for the order data that we saw above would be as follows:
{ "namespace" : "org.example", "type": "record", "name": "order", "fields" : [ {"name": "orderId", "type": "long"}, {"name": "orderDate", "type": "int", "logicalType" : "date"}, {"name": "customerId", "type": "string"}, {"name": "orderType", "type":{ "type" : "enum", "name" : "orderType", "symbols" : ["NORMAL", "INTERNAL", "CUSTOM"] }}, {"name": "notes", "type": [ "null", "string"], "default" : null } ] }
Here are some things to note about the above example:
The serialized data can be created in three formats/encodings. See this reference for details.
Avro also provides a format for storing Avro records as a file. The records are stored in the binary format. The schema used to write the records is also included in the file. The records are stored in blocks and can be compressed. Also, the blocks make the file easier to split, which is useful for distributed processing like Map-Reduce. This format is supported by many tools/frameworks like Hadoop, Spark, Pig, and Hive.
Many popular languages have APIs for working with Avro. Here, we’re going to see an example of writing to Avro in Python. Note how the objects we’re writing are just generic dictionaries and not classes with strongly typed methods like getOrderId(). We need to convert the date into an int containing the number of days since epoch, as Avro uses this for dates. At the end, we’ll have printed the number for bytes used for encoding the objects.
Here we’re using the DatumWriter to write individual records; we’re not using the Avro object container file format that also contains the schema. This will be the case when we’re writing individual items to a message queue or sending them as params to a service. The DataFileWriter should be used if we want to write an Avro object container file.
from avro.io import DatumWriter, DatumReader, BinaryEncoder, BinaryDecoder import avro.schema from io import BytesIO from datetime import datetime schemaStr = ”’ { "type": "record", "name": "order", "fields" : [ {"name": "orderId", "type": "long"}, {"name": "orderDate", "type": "int", "logicalType" : "date"}, {"name": "customerId", "type": "string"}, {"name": "orderType", "type": { "type" : "enum", "name" : "orderType", "symbols" : ["NORMAL", "INTERNAL", "CUSTOM"] } }, {"name": "notes", "type": [ "null", "string"], "default" : null } ] } ''' def daysSinceEpoch( dateStr) : inpDate = datetime.strptime( dateStr, '%Y-%m-%d') epochDate = datetime.utcfromtimestamp(0) return( inpDate - epochDate ).days schema = avro.schema.parse( schemaStr) wbuff = BytesIO() avroEncoder = BinaryEncoder(wbuff) avroWriter = DatumWriter(schema) avroWriter.write( { "orderId":11, "orderDate":daysSinceEpoch( "2022-01-27"), "customerId":"CST223", "orderType":"INTERNAL" }, avroEncoder) avroWriter.write( { "orderId":12, "orderDate":daysSinceEpoch( "2022-02-27"), "customerId":"MST001", "orderType":"NORMAL", "notes" : "Urgent" }, avroEncoder) print( wbuff.tell())
from avro.io import DatumWriter, DatumReader, BinaryEncoder, BinaryDecoder
import avro.schema
from io import BytesIO
from datetime import datetime
schemaStr = ”’
{ "type": "record", "name": "order", "fields" : [ {"name": "orderId", "type": "long"}, {"name": "orderDate", "type": "int", "logicalType" : "date"}, {"name": "customerId", "type": "string"}, {"name": "orderType", "type": { "type" : "enum", "name" : "orderType", "symbols" : ["NORMAL", "INTERNAL", "CUSTOM"] } }, {"name": "notes", "type": [ "null", "string"], "default" : null } ] }
''' def daysSinceEpoch( dateStr) : inpDate = datetime.strptime( dateStr, '%Y-%m-%d') epochDate = datetime.utcfromtimestamp(0) return( inpDate - epochDate ).days schema = avro.schema.parse( schemaStr) wbuff = BytesIO() avroEncoder = BinaryEncoder(wbuff) avroWriter = DatumWriter(schema) avroWriter.write( { "orderId":11, "orderDate":daysSinceEpoch( "2022-01-27"), "customerId":"CST223", "orderType":"INTERNAL" }, avroEncoder) avroWriter.write( { "orderId":12, "orderDate":daysSinceEpoch( "2022-02-27"), "customerId":"MST001", "orderType":"NORMAL", "notes" : "Urgent" }, avroEncoder) print( wbuff.tell())
Writing to Avro is straightforward. We have the schema and data to be written. Reading Avro, however, may involve two schemas: the writer schema, which was used to write the message, and the reader schema that is going to read the message. This is necessary if there are different schema versions (maybe fields have been added or removed), and so the writer’s schema is different from the reader’s schema. It could also happen that the reader needs to read only a subset of the fields written. Below is the code in Python, appended to the one we already have for the writer. In this case, we’ve used a different schema for the reader, which contains only the orderId and a new field, extraInfo. Since extraInfo wasn’t in the written data, its default value will be used by the reader.
Copy Code Copied Use a different Browser schemaStrRdr = ''' { "type": "record", "name": "order", "fields" : [ {"name": "orderId", "type": "long"}, {"name": "extraInfo", "type": "string", "default" : "NA" } ] } ''' schemaRdr = avro.schema.parse( schemaStrRdr) rbuff = BytesIO(wbuff.getvalue()) avroDecoder = BinaryDecoder(rbuff) avroReader = DatumReader(schema, schemaRdr) # Use both schemas msg = None while True : try : msg = avroReader.read(avroDecoder) except Exception as ex: msg = None print ( ex) # no proper EOF handling for DatumReader if msg == None : break print(msg, rbuff.tell())
schemaStrRdr = ''' { "type": "record", "name": "order", "fields" : [ {"name": "orderId", "type": "long"}, {"name": "extraInfo", "type": "string", "default" : "NA" } ] } ''' schemaRdr = avro.schema.parse( schemaStrRdr) rbuff = BytesIO(wbuff.getvalue()) avroDecoder = BinaryDecoder(rbuff) avroReader = DatumReader(schema, schemaRdr) # Use both schemas msg = None while True : try : msg = avroReader.read(avroDecoder) except Exception as ex: msg = None print ( ex) # no proper EOF handling for DatumReader if msg == None : break print(msg, rbuff.tell())
By using both the writer and reader schemas when reading data, Avro allows the schema to evolve in a limited way—for example, when adding/removing fields or when fields are changed in a way that is allowed (e.g. int to long, float, or double; string to and from bytes). We’ve already seen an example of this in the reader code above. Note that if a field is missing from the data being read and does not have a default value in the reader’s schema, an error will be thrown. More here.
Given the many potential factors (e.g., that Avro messages could be consumed by many different applications, each is going to need a schema to read/write the messages, the schemas could change, and there could be multiple schema versions in use), it makes sense to keep the schemas versioned and stored in a central registry, the Schema Registry. Typically, this would be a distributed, REST API service that manages and returns schema definitions.
We’ve seen what the Avro data format is, how to define schemas, and how to read and write objects to and from Avro. If you want a fast, compact, and well-supported format that’s easy to use and validate, you should consider Avro. And if you’re looking for some useful tools to help you with Avro, check out this Avro schema validator.
This post was written by Manoj Mokashi. Manoj has more than 25 years of experience as a developer, mostly on java, web technologies, and databases. He’s also worked on PHP, Python, Spark, AI/ML, and Solr and he really enjoys learning new things.
Avro is a popular binary row-based serialized textual format. It can be seen as a binary alternative to JSON – drawing inspiration from its flexibility and nesting, while offering a much more efficient storage method. Avro data consists of a JSON-formatted schema and a set of data payloads, which are either serialized in the Avro binary format or in JSON – the latter mostly used for debugging, and not supported by itself in all Avro-consuming products. SQream supports Avro with simple data ingestion capabilities for both batch and streaming (for example, Kafka). Once ingested, (see image below) Avro object(s) are mapped into rows. All existing data types can be represented in JSON fields, including JSONPath support and encoding. It is a simple plug and play solution for Avro objects/files to reduce the complexity of and accelerate TTTI.