SQream Platform
GPU Powered Data & Analytics Acceleration
Enterprise (Private Deployment) SQL on GPU for Large & Complex Queries
Public Cloud (GCP, AWS) GPU Powered Data Lakehouse
No Code Data Solution for Small & Medium Business
Scale your ML and AI with Production-Sized Models
By Oren Askarov
In this article, we’re going to take a look at Apache Avro Data Format, which is a data serialization system that converts an object in memory to a stream of bytes.
The world of online shopping changed dramatically after COVID-19. Let’s examine the technology that enabled these changes. Imagine a software application that manages orders. Each order contains data such as the ID, date, customer ID, type, and more. Depending on the programming language used, this information is typically loaded in memory as an object or structure. Below are a few scenarios in which we might want to serialize this object:
– **CSV**: Lacks support for complex data types such as structures and arrays. – **XML**: Can be verbose and cumbersome. – **JSON**: Still somewhat verbose, as including field names and string representations for various data types can take up significant space.
Avro is a row-oriented remote procedure call and data serialization framework developed within Apache’s Hadoop project. It uses JSON to define data types and protocols and serializes data in a compact binary format. Its primary use is in Apache Hadoop, where it can provide both a serialization format for persistent data and a wire format for communication between Hadoop nodes and from client programs to the Hadoop services. Avro uses a schema to structure the data that is being encoded. It has two different types of schema languages: one for human editing (Avro IDL) and another which is more machine-readable based on JSON.
It is similar to Thrift and Protocol Buffers but does not require running a code-generation program when a schema changes (unless desired for statically typed languages).
Apache Spark SQL can access Avro as a data source.
The schema specifies the structure of the serialized format. It defines the fields that the serialized format contains, their data types (whether optional or not), default values, etc. The schema itself is in JSON format. A schema can be any of the below:
The built-in data types are as follows:
The complex types each have different attributes (e.g., record has fields, array has items). There are some attributes common to them though.
The above is a simplified explanation of the schema. The full Avro format specification can be found here.
As an example, the schema for the order data that we saw above would be as follows:
{ "namespace" : "org.example", "type": "record", "name": "order", "fields" : [ {"name": "orderId", "type": "long"}, {"name": "orderDate", "type": "int", "logicalType" : "date"}, {"name": "customerId", "type": "string"}, {"name": "orderType", "type":{ "type" : "enum", "name" : "orderType", "symbols" : ["NORMAL", "INTERNAL", "CUSTOM"] }}, {"name": "notes", "type": [ "null", "string"], "default" : null } ] }
Here are some things to note about the above example:
The serialized data can be created in three formats/encodings. See this reference for details.
Avro also provides a format for storing Avro records as a file. The records are stored in the binary format. The schema used to write the records is also included in the file. The records are stored in blocks and can be compressed. Also, the blocks make the file easier to split, which is useful for distributed processing like Map-Reduce. This format is supported by many tools/frameworks like Hadoop, Spark, Pig, and Hive.
Many popular languages have APIs for working with Avro. Here, we’re going to see an example of writing to Avro in Python. Note how the objects we’re writing are just generic dictionaries and not classes with strongly typed methods like getOrderId(). We need to convert the date into an int containing the number of days since epoch, as Avro uses this for dates. At the end, we’ll have printed the number for bytes used for encoding the objects.
Here we’re using the DatumWriter to write individual records; we’re not using the Avro object container file format that also contains the schema. This will be the case when we’re writing individual items to a message queue or sending them as params to a service. The DataFileWriter should be used if we want to write an Avro object container file.
from avro.io import DatumWriter, DatumReader, BinaryEncoder, BinaryDecoder import avro.schema from io import BytesIO from datetime import datetime schemaStr = ”’ { "type": "record", "name": "order", "fields" : [ {"name": "orderId", "type": "long"}, {"name": "orderDate", "type": "int", "logicalType" : "date"}, {"name": "customerId", "type": "string"}, {"name": "orderType", "type": { "type" : "enum", "name" : "orderType", "symbols" : ["NORMAL", "INTERNAL", "CUSTOM"] } }, {"name": "notes", "type": [ "null", "string"], "default" : null } ] } ''' def daysSinceEpoch( dateStr) : inpDate = datetime.strptime( dateStr, '%Y-%m-%d') epochDate = datetime.utcfromtimestamp(0) return( inpDate - epochDate ).days schema = avro.schema.parse( schemaStr) wbuff = BytesIO() avroEncoder = BinaryEncoder(wbuff) avroWriter = DatumWriter(schema) avroWriter.write( { "orderId":11, "orderDate":daysSinceEpoch( "2022-01-27"), "customerId":"CST223", "orderType":"INTERNAL" }, avroEncoder) avroWriter.write( { "orderId":12, "orderDate":daysSinceEpoch( "2022-02-27"), "customerId":"MST001", "orderType":"NORMAL", "notes" : "Urgent" }, avroEncoder) print( wbuff.tell())
from avro.io import DatumWriter, DatumReader, BinaryEncoder, BinaryDecoder
import avro.schema
from io import BytesIO
from datetime import datetime
schemaStr = ”’
{ "type": "record", "name": "order", "fields" : [ {"name": "orderId", "type": "long"}, {"name": "orderDate", "type": "int", "logicalType" : "date"}, {"name": "customerId", "type": "string"}, {"name": "orderType", "type": { "type" : "enum", "name" : "orderType", "symbols" : ["NORMAL", "INTERNAL", "CUSTOM"] } }, {"name": "notes", "type": [ "null", "string"], "default" : null } ] }
''' def daysSinceEpoch( dateStr) : inpDate = datetime.strptime( dateStr, '%Y-%m-%d') epochDate = datetime.utcfromtimestamp(0) return( inpDate - epochDate ).days schema = avro.schema.parse( schemaStr) wbuff = BytesIO() avroEncoder = BinaryEncoder(wbuff) avroWriter = DatumWriter(schema) avroWriter.write( { "orderId":11, "orderDate":daysSinceEpoch( "2022-01-27"), "customerId":"CST223", "orderType":"INTERNAL" }, avroEncoder) avroWriter.write( { "orderId":12, "orderDate":daysSinceEpoch( "2022-02-27"), "customerId":"MST001", "orderType":"NORMAL", "notes" : "Urgent" }, avroEncoder) print( wbuff.tell())
Writing to Avro is straightforward. We have the schema and data to be written. Reading Avro, however, may involve two schemas: the writer schema, which was used to write the message, and the reader schema that is going to read the message. This is necessary if there are different schema versions (maybe fields have been added or removed), and so the writer’s schema is different from the reader’s schema. It could also happen that the reader needs to read only a subset of the fields written. Below is the code in Python, appended to the one we already have for the writer. In this case, we’ve used a different schema for the reader, which contains only the orderId and a new field, extraInfo. Since extraInfo wasn’t in the written data, its default value will be used by the reader.
schemaStrRdr = ”’ { “type”: “record”, “name”: “order”, “fields” : [ {“name”: “orderId”, “type”: “long”}, {“name”: “extraInfo”, “type”: “string”, “default” : “NA”} ] } ”’ schemaRdr = avro.schema.parse(schemaStrRdr) rbuff = BytesIO(wbuff.getvalue()) avroDecoder = BinaryDecoder(rbuff) avroReader = DatumReader(schema, schemaRdr) # Use both schemas msg = None while True: try: msg = avroReader.read(avroDecoder) except Exception as ex: msg = None print(ex) # No proper EOF handling for DatumReader if msg is None: break print(msg, rbuff.tell())
msg = None while True: try: msg = avroReader.read(avroDecoder) except Exception as ex: msg = None print(ex) # No proper EOF handling for DatumReader if msg is None: break print(msg, rbuff.tell())
By using both the writer and reader schemas when reading data, Avro allows the schema to evolve in a limited way—for example, when adding/removing fields or when fields are changed in a way that is allowed (e.g. int to long, float, or double; string to and from bytes). We’ve already seen an example of this in the reader code above. Note that if a field is missing from the data being read and does not have a default value in the reader’s schema, an error will be thrown. More here.
Given the many potential factors (e.g., that Avro messages could be consumed by many different applications, each is going to need a schema to read/write the messages, the schemas could change, and there could be multiple schema versions in use), it makes sense to keep the schemas versioned and stored in a central registry, the Schema Registry. Typically, this would be a distributed, REST API service that manages and returns schema definitions.
Why use Avro when there are already other formats present? There are quite a few reasons. Avro:
We’ve seen what the Avro data format is, how to define schemas, and how to read and write objects to and from Avro. If you want a fast, compact, and well-supported format that’s easy to use and validate, you should consider Avro. And if you’re looking for some useful tools to help you with Avro, check out this Avro schema validator.
This post was written by Manoj Mokashi. Manoj has more than 25 years of experience as a developer, mostly on java, web technologies, and databases. He’s also worked on PHP, Python, Spark, AI/ML, and Solr and he really enjoys learning new things.
Avro is a popular binary row-based serialized textual format. It can be seen as a binary alternative to JSON – drawing inspiration from its flexibility and nesting, while offering a much more efficient storage method. Avro data consists of a JSON-formatted schema and a set of data payloads, which are either serialized in the Avro binary format or in JSON – the latter mostly used for debugging, and not supported by itself in all Avro-consuming products. SQream supports Avro with simple data ingestion capabilities for both batch and streaming (for example, Kafka). Once ingested, (see image below) Avro object(s) are mapped into rows. All existing data types can be represented in JSON fields, including JSONPath support and encoding. It is a simple plug and play solution for Avro objects/files to reduce the complexity of and accelerate TTTI.