All Articles

How to Read and Write Avro Files

Avro is a file type that is often use because it is highly compact and fast to read. It is used by Apache Kafka, Apache Hadoop, and other data intensive applications. Boomi integrations are not currently able to read and write avro data. Although, this is possible with Boomi Data Catalog and Prep.

The avro file generally has two parts to it. The first part is the schema and this part is optional. The second part is the binary data. If you view an avro file with a text editor, you will see the two parts on the first line and based on the data there could be more. The first part of the first line would be the schema (and generally starts with Obj). The second part of the first line is the binary data. Part of that data you will be able to read, and other parts will look like random characters. The avro file needs to be confirmed into a file type that Boomi is able to read and write. In this example we are using json as that file type. The scripts below have been successful ran on local atoms but has not been tested on cloud atoms. You will also need to install the Apache Avro jar files. The files can be located here: Apache Avro Library, and is also v1.10.2 is attached to this article. The scripts below have been tested against v1.10.2, which is the most current version at the time the article was written.

The scripts below are meant to be a starting point in your development. The first script (Arvo to Json with a schema) can be used as it is presented. The next script (Arvo to Json without a schema) needs to have an arvo schema created and populated within the script. The last script (Json to Arvo) has some specific parameters with how it was setup and a schema will need to be provided.

To install the Apache Avro library: Boomi Custom Library Documentation

  1. Download the Apache Avro Library.
  2. Log into the Boomi Platform and go to Integration -> Settings -> Account Information and Settings.
  3. Click on Account Libraries and click on Upload a file.
  4. Next go into the Build tab, create a custom library, set it to scripting, and add the avro jar file.
  5. Finally, deploy the custom library to the environment that you are using.

Reading Avro Files with a Schema

The script below will read an avro file and will create one or more json documents from it.

import java.util.Properties 
import java.io.InputStream 
import com.boomi.execution.ExecutionUtil 
import org.apache.avro.Schema
import org.apache.avro.file.DataFileStream 
import org.apache.avro.generic.GenericDatumReader 
import org.apache.avro.generic.GenericRecord
import org.apache.avro.io.DatumReader

logger = ExecutionUtil.getBaseLogger()

for (int i = 0; i < dataContext.getDataCount(); i++) { 
	InputStream is = dataContext.getStream(i) 
    Properties props = dataContext.getProperties(i) //Set the schema 
    
    DatumReader<GenericRecord> datumReader = new 
    GenericDatumReader<GenericRecord>(); DataFileStream<GenericRecord> 
    dataFileStream = new DataFileStream<GenericRecord>(is, datumReader)
    Schema schema = dataFileStream.getSchema(); logger.info("Schema used: " + schema)
    // Write the data to json 
    GenericRecord record = null
    while (dataFileStream.hasNext()) { 
    	record = dataFileStream.next(record)
        is = new ByteArrayInputStream(record.toString().getBytes('UTF-8')) 	
        dataContext.storeStream(is, props) 
	}
}

Reading Avro Files without a Schema

If the schema is not included within the avro file, a few additional steps need to occur to tell the interpreter how to interpret the binary avro data. You will need to create the schema that is being used. In this example I am hard coding an avro schema (the variable is named schemaString), but it could also be set other ways.

import java.util.Properties 
import java.io.InputStream 
import com.boomi.execution.ExecutionUtil 
import org.apache.avro.Schema
import org.apache.avro.file.DataFileStream
import org.apache.avro.generic.GenericDatumReader
import org.apache.avro.generic.GenericRecord
import org.apache.avro.io.DatumReader

logger = ExecutionUtil.getBaseLogger()
String schemaString = '{"type":"record","name":"episodes","namespace":"testing.hive.avro.serde",' + '"fields":[{"name":"title","type":"string","doc":"episode title"},{"name":"air_date","type":"string","doc":"initial date"},{"name":"doctor","type":"int","doc":"main actor playing the Doctor in episode"}]}' 

for (int i = 0; i < dataContext.getDataCount(); i++) { 
	InputStream is = dataContext.getStream(i) 
    Properties props = dataContext.getProperties(i) 
    
    //Set the schema 
    DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>()
    DataFileStream<GenericRecord> dataFileStream = new DataFileStream<GenericRecord>(is, datumReader)
    Schema schema = Schema.parse(schemaString) 
    logger.info("Schema used: " + schema)
    
    // Write the data to json 
    GenericRecord record = null
    while (dataFileStream.hasNext()) { 
    	record = dataFileStream.next(record) 
        is = new ByteArrayInputStream(record.toString().getBytes('UTF-8')) 	
        dataContext.storeStream(is, props) 
    } 
} 

Write an Avro File

Lastly, this script is setup to to create an avro file and can be sent to an endpoint. The script needs a schema, as we used earlier, and is set as the variable schemaString. The incoming json is parsed based on that schema. Multiple documents can be processed and will be written to one avro file.

In the example below, you can write multiple documents to a single avro file. Multiple documents have to to hit the data process shape at the same time. If there are branches or decision shapes within the document flow, then multiple avro files will be created.

Example json used with the example schema:

{ "title": "Castrolava", "air_date": "4 January 1982", "doctor": 5 }

The script will take multiple json documents above and process them based on the provided avro schema.

import java.util.Properties 
import java.io.InputStream 
import com.boomi.execution.ExecutionUtil 
import org.apache.avro.Schema
import org.apache.avro.generic.GenericDatumReader
import org.apache.avro.generic.GenericDatumWriter
import org.apache.avro.generic.GenericRecord
import org.apache.avro.file.DataFileWriter
import org.apache.avro.io.DecoderFactory
import org.apache.avro.io.Decoder
import org.apache.avro.io.DatumReader

logger = ExecutionUtil.getBaseLogger()
String schemaString = '{"type":"record","name":"episodes","namespace":"org.apache.avro.file.Header",' + '"fields":[{"name":"title","type":"string","doc":"episode title"},{"name":"air_date","type":"string","doc":"initial date"},{"name":"doctor","type":"int","doc":"main actor playing the Doctor in episode"}]}' 
logger.info("schema used: " + schemaString) 

// the last doc's properties are set on the output 
Properties props = null
DataFileWriter<GenericRecord> writer = null
ByteArrayOutputStream output = new ByteArrayOutputStream()
Schema schema = new Schema.Parser().parse(schemaString)
DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema)
GenericRecord datum
writer = new DataFileWriter<GenericRecord>(new GenericDatumWriter<GenericRecord>())
writer.create(schema, output)

for (int i = 0; i < dataContext.getDataCount(); i++) { 
	InputStream is = dataContext.getStream(i) 
    props = dataContext.getProperties(i) 
    
    Decoder decoder = DecoderFactory.get().jsonDecoder(schema, is) 
    
    // Keep reading datum until EOF exception occurs and then continue 
    while (true) { 
    	try { 
        	datum = reader.read(null, decoder)
        } catch (EOFException eofe) {
        	break
        } 
    	writer.append(datum) 
	} 
}

writer.flush()
dataContext.storeStream(new ByteArrayInputStream(output.toByteArray()), props)

Article originally posted at Boomi Community.

Published Jul 10, 2021

Developing a better world.© All rights reserved.