Avro is a file type that is often use because it is highly compact and fast to read. It is used by Apache Kafka, Apache Hadoop, and other data intensive applications. Boomi integrations are not currently able to read and write avro data. Although, this is possible with Boomi Data Catalog and Prep.
The avro file generally has two parts to it. The first part is the schema and this part is optional. The second part is the binary data. If you view an avro file with a text editor, you will see the two parts on the first line and based on the data there could be more. The first part of the first line would be the schema (and generally starts with Obj). The second part of the first line is the binary data. Part of that data you will be able to read, and other parts will look like random characters. The avro file needs to be confirmed into a file type that Boomi is able to read and write. In this example we are using json as that file type. The scripts below have been successful ran on local atoms but has not been tested on cloud atoms. You will also need to install the Apache Avro jar files. The files can be located here: Apache Avro Library, and is also v1.10.2 is attached to this article. The scripts below have been tested against v1.10.2, which is the most current version at the time the article was written.
The scripts below are meant to be a starting point in your development. The first script (Arvo to Json with a schema) can be used as it is presented. The next script (Arvo to Json without a schema) needs to have an arvo schema created and populated within the script. The last script (Json to Arvo) has some specific parameters with how it was setup and a schema will need to be provided.
To install the Apache Avro library: Boomi Custom Library Documentation
The script below will read an avro file and will create one or more json documents from it.
import java.util.Properties
import java.io.InputStream
import com.boomi.execution.ExecutionUtil
import org.apache.avro.Schema
import org.apache.avro.file.DataFileStream
import org.apache.avro.generic.GenericDatumReader
import org.apache.avro.generic.GenericRecord
import org.apache.avro.io.DatumReader
logger = ExecutionUtil.getBaseLogger()
for (int i = 0; i < dataContext.getDataCount(); i++) {
InputStream is = dataContext.getStream(i)
Properties props = dataContext.getProperties(i) //Set the schema
DatumReader<GenericRecord> datumReader = new
GenericDatumReader<GenericRecord>(); DataFileStream<GenericRecord>
dataFileStream = new DataFileStream<GenericRecord>(is, datumReader)
Schema schema = dataFileStream.getSchema(); logger.info("Schema used: " + schema)
// Write the data to json
GenericRecord record = null
while (dataFileStream.hasNext()) {
record = dataFileStream.next(record)
is = new ByteArrayInputStream(record.toString().getBytes('UTF-8'))
dataContext.storeStream(is, props)
}
}
If the schema is not included within the avro file, a few additional steps need to occur to tell the interpreter how to interpret the binary avro data. You will need to create the schema that is being used. In this example I am hard coding an avro schema (the variable is named schemaString), but it could also be set other ways.
import java.util.Properties
import java.io.InputStream
import com.boomi.execution.ExecutionUtil
import org.apache.avro.Schema
import org.apache.avro.file.DataFileStream
import org.apache.avro.generic.GenericDatumReader
import org.apache.avro.generic.GenericRecord
import org.apache.avro.io.DatumReader
logger = ExecutionUtil.getBaseLogger()
String schemaString = '{"type":"record","name":"episodes","namespace":"testing.hive.avro.serde",' + '"fields":[{"name":"title","type":"string","doc":"episode title"},{"name":"air_date","type":"string","doc":"initial date"},{"name":"doctor","type":"int","doc":"main actor playing the Doctor in episode"}]}'
for (int i = 0; i < dataContext.getDataCount(); i++) {
InputStream is = dataContext.getStream(i)
Properties props = dataContext.getProperties(i)
//Set the schema
DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>()
DataFileStream<GenericRecord> dataFileStream = new DataFileStream<GenericRecord>(is, datumReader)
Schema schema = Schema.parse(schemaString)
logger.info("Schema used: " + schema)
// Write the data to json
GenericRecord record = null
while (dataFileStream.hasNext()) {
record = dataFileStream.next(record)
is = new ByteArrayInputStream(record.toString().getBytes('UTF-8'))
dataContext.storeStream(is, props)
}
}
Lastly, this script is setup to to create an avro file and can be sent to an endpoint. The script needs a schema, as we used earlier, and is set as the variable schemaString. The incoming json is parsed based on that schema. Multiple documents can be processed and will be written to one avro file.
In the example below, you can write multiple documents to a single avro file. Multiple documents have to to hit the data process shape at the same time. If there are branches or decision shapes within the document flow, then multiple avro files will be created.
Example json used with the example schema:
{ "title": "Castrolava", "air_date": "4 January 1982", "doctor": 5 }
The script will take multiple json documents above and process them based on the provided avro schema.
import java.util.Properties
import java.io.InputStream
import com.boomi.execution.ExecutionUtil
import org.apache.avro.Schema
import org.apache.avro.generic.GenericDatumReader
import org.apache.avro.generic.GenericDatumWriter
import org.apache.avro.generic.GenericRecord
import org.apache.avro.file.DataFileWriter
import org.apache.avro.io.DecoderFactory
import org.apache.avro.io.Decoder
import org.apache.avro.io.DatumReader
logger = ExecutionUtil.getBaseLogger()
String schemaString = '{"type":"record","name":"episodes","namespace":"org.apache.avro.file.Header",' + '"fields":[{"name":"title","type":"string","doc":"episode title"},{"name":"air_date","type":"string","doc":"initial date"},{"name":"doctor","type":"int","doc":"main actor playing the Doctor in episode"}]}'
logger.info("schema used: " + schemaString)
// the last doc's properties are set on the output
Properties props = null
DataFileWriter<GenericRecord> writer = null
ByteArrayOutputStream output = new ByteArrayOutputStream()
Schema schema = new Schema.Parser().parse(schemaString)
DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema)
GenericRecord datum
writer = new DataFileWriter<GenericRecord>(new GenericDatumWriter<GenericRecord>())
writer.create(schema, output)
for (int i = 0; i < dataContext.getDataCount(); i++) {
InputStream is = dataContext.getStream(i)
props = dataContext.getProperties(i)
Decoder decoder = DecoderFactory.get().jsonDecoder(schema, is)
// Keep reading datum until EOF exception occurs and then continue
while (true) {
try {
datum = reader.read(null, decoder)
} catch (EOFException eofe) {
break
}
writer.append(datum)
}
}
writer.flush()
dataContext.storeStream(new ByteArrayInputStream(output.toByteArray()), props)
Article originally posted at Boomi Community.