IntentHQ LogoEngineering Blog

Dr. Pucketlove - Or, How I Learned to Stop Worrying and Love Parquet (partitioning)

Pucket is a Partitioning System For Parquet

Parquet + Bucket = Pucket.

Pucket is a Scala library which provides a simple partitioning system for Parquet. But what is Parquet and why does it need partitioning when it already supports filtering? In this post I will attempt to explain Parquet, partitioning in Hadoop, and the motivation and design of Pucket. If you’re not interested in the background, you can skip straight to some simple code examples or go to the GitHub repository.

Background

What is Parquet and Why Should I Care?

Parquet is a binary columnar storage format for Hadoop created by Cloudera and Twitter designed to store large volumes of analytics data. Here’s a description plucked straight from the website:

Apache Parquet is a columnar storage format available to any project in the Hadoop ecosystem, regardless of the choice of data processing framework, data model or programming language.1

This a little bit vague, so let me try to answer the important question: Why do we want to use Parquet to store data in Hadoop? Parquet stores data as columnar records in files. Columns in this case are essentially nested key values with strongly typed primitive values. Parquet also encourages compressing of the dataset, doing this will reduce your storage footprint and increase your IO throughput at the cost of CPU cycles2. Parquet files also contain metadata which promote efficient seeking within the data3. Both of these properties combined enable Parquet to have a small storage footprint and quick access time when compared to simple sequence files.

Essentially Parquet is a NoSQL column store without the need for a query engine runtime.

Why Might Data Need Partitioning?

Partition

The point of partitioning is to be able to quickly isolate a portion of your dataset so that you don’t have to seek through the whole lot to find the part of the data you are interested in. Partitioning generally falls into two types; horizontal and vertical. Horizontal partitioning splits your data by values, and vertical partitioning splits your data by field structure. The pattern for doing this is known as a partitioning scheme.

For example a partitioning scheme could be created to partition the data by date and data type, combining both horizontal and vertical partitioning types.

Given that Parquet provides filtering natively, why should it be necessary to use partitions in the filesystem itself? First, let me explain what a partition means in the Hadoop filesystem world:

What do Partitions Look Like on a Hadoop File System?

Very simply partitions are directories in the filesystem under which data is stored. That’s it. The structure of these directories should take on a consistent pattern based on the partitioning scheme.

Does Parquet Actually Need Partitioning?

As with a lot of questions, the answer to this is “it depends”. If you want to perform operations involving every piece of data in your entire dataset then you would not need partitioning, or indeed filtering. If you want to perform lots of complex filtering operations ad-hoc, there’s little point partitioning your data. However if your operations follow a similar pattern, like a certain set of operations which always run on a certain portion of the data, it makes sense to partition your data accordingly.

So if you think partitioning is for you then read on…

Pucket Design

Pucket has been designed to be a simple wrapper around Parquet, following the design principles below

With Pucket we aim to provide the following high-level features, broken down into single responsibility modules:

Pucket High-Level Concepts and Usage Considerations

Pucket’s implementation is centered around a few key concepts described below. These may be a one-time implementation in the core functionality or is partially implemented and requires specific implementation per data format. Current Pucket supports Avro and Thrift, but can be easily extended to support Protocol Buffers.

Pucket

(Implementation per format)

This is a partially implemented trait which contains information on the data in a certain directory (or bucket) on the filesystem. It has a few simple operations for creating a new instance:

Create - create a new Pucket by writing a descriptor to the filesystem: if a pucket already exists at the location an error will be returned

Find (apply function) - return a Pucket which is known to be at a certain location on the filesystem, if it does not exist an error will be returned

Find or create - return an existing Pucket or create a new one: will return an error if the existing Pucket’s descriptor does not match the one provided

Once an instance is created the following operations can be performed on it:

Reader - obtain a reader for the Pucket

Writer - obtain a simple writer for the Pucket

Absorb - move another Pucket’s data into this one (provided they are the same structure)

List files - ronseal

It also holds configuration for the default Parquet block size: i.e. the amount of data the underlying Parquet writer will hold in memory before flushing it to disk.

Descriptor

The Pucket descriptor is a class which describes the structure of the partitions and the data within the Pucket. It is written to the filesystem as JSON on creation and read when the Pucket is located. The descriptor on disk contains the following information:

Partitioner

The partitioner is a trait which describes the partitioning scheme, to be implemented by the user according to the requirements for partitioning their data. The class name of the partitioner is stored in the descriptor, so the implementation can change. While it is not recommended to change the implementation on an existing Pucket, your data will still be accessible for reading in the old scheme.

Writer

There are a few implementations of writer for Pucket, each performing a different type of writing functionality. Each type is described below and code examples can be seen in the TL;DR section.

Simple Writer (Implementation per format) - a functional wrapper around the standard implementations of Parquet writers

Incremental Writer - a wrapper around the simple writer, which rolls a file when a configured number of objects have been written. It keeps a checkpoint of the point at which a file was last finalised. It is important to tune the roll limit based on expected size of each data object. This should be a balance of the number of objects you are prepared to lose per checkpoint versus the number of small files on the filesystem, given Hadoop’s default block size.

Partitioned Writer - a wrapper around the simple writer which uses the partitioner implementation to write data out to sub directories of the Pucket. The writer instances are kept in a cache with a configurable size. When the cache is full the least recently used writer will be removed and closed. It is important to make sure you balance the writer cache size with your memory constraints and number of partitions you expect to be writing to concurrently, as opening and closing writers is an expensive operation. You should also be aware of the Parquet block size configuration in the Pucket. By default each writer will hold 50mb in memory before flushing to disk.

Incremental Partitioned Writer - a wrapper around the incremental writer which provides partitioning. The same tuning constraints apply to this as with the incremental and partitioned writers.

Reader

On setting out to implement Pucket there were no plans to implement a reader, however the standard Parquet reader cannot read files in subdirectories and assumes that all files in a given directory are Parquet format. Therefore we had to clone the functionality in the main Parquet reader and change it to allow reading of files in subdirectories. Note that the Parquet input format for MapReduce can cope with parquet files in subdirectories so does not need to use this reader.

Alternatives to Pucket

There’s always a wheel to be redesigned in the world so why not Hadoop storage partitioning? There are surprisingly few projects which do this, probably because it is a relatively simple problem. A discussion of two major alternatives to Pucket are discussed below with reasons as to why we are not using, or contributing to them.

Pail

We have been heavy users of Pail in the past for partitioning our data, however this project is rarely updated and appears to be abandonware. It also uses version 1 of the MapReduce api. So why not modify it to work with Parquet?

KiteSDK

KiteSDK is the major leader in working with Parquet, it is developed and maintained by Cloudera. It provides quite a large set of functionality for maintaining Parquet data in Hadoop and Hive. One major issue we had with KiteSDK was that it only supports Avro for schema definition.

We had been using Thrift for our schema in Pail, and seriously considered switching to Avro just so that we could use KiteSDK. We found that switching to Avro would mean that one of the features of Thrift, which we love and use heavily, was not supported in the same way in Avro: unions. Looking into the KiteSDK source code, decoupling Avro from the core functionality would be a lot of work and it would be impossible to implement in an abstract way given that the functionality relies on Avro features.

Here are the reasons we chose not to contribute to KiteSDK:

Given the issues we had with both of these libraries and our own requirements we opted for our own implementation. This has left us free to perform the implementation in Scala and keep to our aforementioned design principles.


TL;DR. Show Me Some Code

The Scalaz implementation of Either, known as disjunction4 (\/), is used heavily within the Pucket code. This is a proper Monad which allows it to be used in a flat map or for comprehension. This enables a happy path and sad path to be accounted for when performing any side-effecting operation. Every operation requiring interaction with the Hadoop filesystem will return a disjunction.

In the examples below disjunction is used with the implicit either class in Scalaz syntax package, which allows .left or .right operations to lift objects into the appropriate side of the disjunction. To use this the following imports must be included in implementing classes:

import scalaz.\/
import scalaz.syntax.either._

Creating a Pucket

The following examples use the imports listed below:

import com.intenthq.pucket.thrift.ThriftPucket
import com.intenthq.pucket.thrift.ThriftPucketDescriptor
import com.intenthq.pucket.avro.AvroPucket
import com.intenthq.pucket.avro.AvroPucketDescriptor
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.apache.hadoop.fs.{FileSystem, Path}
import scalaz.\/
import scalaz.syntax.either._

You should also make sure you have created the following classes:

import your.thrift.ThriftData
import your.pucket.ThriftPartitioner
import your.avro.AvroData
import your.pucket.AvroPartitioner

The following values have also been created:

val fs = FileSystem.get()
val path = new Path("/path/to/Pucket")

Create or find a Thrift Pucket:


val thriftDescriptor: ThriftPucketDescriptor[ThriftData] =
  ThriftPucketDescsriptor[ThriftData](classOf[ThriftData],
                                      CompressionCodecName.SNAPPY,
                                      Some(ThriftPartitioner))

// Create a new Pucket on /path/to/Pucket, writing the descriptor in place
// Operation will fail if a Pucket already exists on that path
val newThriftPucket: Throwable \/ Pucket[ThriftData] =
  ThriftPucket.create[ThriftData](path, fs, thriftDescriptor)

// Find an existing Pucket at a certain path
// Operation will fail if no Pucket exists on that path or the schema does not match the one provided
val existingThriftPucket: Throwable \/ Pucket[ThriftData] =
  ThriftPucket[ThriftData](path, fs, classOf[ThriftData])

// Find an existing or create a new Pucket on a certain path
// Operation will fail if the Pucket exists and the Pucket descriptor on the filesystem matches the one provided
val maybeExistingThriftPucket: Throwable \/ Pucket[ThriftData] =
  ThriftPucket.findOrCreate[ThriftData](path, fs, thriftDescriptor)

Create or find an Avro Pucket:


val avroDescriptor: AvroPucketDescriptor[AvroData] =
  AvroPucketDescriptor[AvroData](AvroData.getClassSchema,
                                 CompressionCodecName.SNAPPY,
                                 Some(AvroPartitioner))

val newAvroPucket: Throwable \/ Pucket[AvroData] =
  AvroPucket.create[AvroData](path, fs, avroDescriptor)

val existingAvroPucket: Throwable \/ Pucket[AvroData]
  AvroPucket[AvroData](path, fs, AvroData.getClassSchema)

val maybeExistingAvroPucket: Throwable \/ Pucket[AvroData] =
  AvroPucket.findOrCreate[AvroData](path, fs, avroDescriptor)


Writing to a Pucket

// Write function which fails fast on error
def write[Ex](data: Seq[T],
              writer:  Ex \/ Writer[T, Ex]): Ex \/ Writer[T, Ex] =
  data.foldLeft(writer)( (w, i) =>
    w.fold(ex => return ex.left, _.write(i))
  )

Plain Writer

def writeMeSomeData[T](data: Seq[T],
                       Pucket: Throwable \/ Pucket[T]): Throwable \/ Unit =
  for {
    p <- pucket
    writer <- p.writer
    finishedWriter <- write[Throwable](data, writer)
    _ <- finishedWriter.close
  } yield ()

Incremental Writer

def writeMeSomeDataIncrementally[T](data: Seq[T],
                                    Pucket: Throwable \/ Pucket[T]): (Long, Throwable) \/ Unit =
  for {
    p <- pucket.leftMap((0, _))
    writer <- IncrementalWriter(p, 100) // 100 indicates the number of writes before the file is rolled
    finishedWriter <- write[(Long, Throwable)](data, writer)
    _ <- finishedWriter.close
  } yield ()

Partitioned Writer

def writeMeSomePartitionedData[T](data: Seq[T],
                                  Pucket: Throwable \/ Pucket[T]): Throwable \/ Unit =
  for {
    p <- pucket
    writer <- PartitionedWriter(p).right
    finishedWriter <- write[Throwable](data, writer)
    _ <- finishedWriter.close
  } yield ()

Incremental Partitioned Writer

def writeMeSomePartitionedDataIncrementally[T](data: Seq[T],
                                              Pucket: Throwable \/ Pucket[T]): (Long, Throwable) \/ Unit =
  for {
    p <- pucket.leftMap((0, _))
    writer <- IncrementalPartitionedWriter(p, 100).right
    finishedWriter <- write[Throwable](data, writer)
    _ <- finishedWriter.close
  } yield ()

Reading From a Pucket

The reader behaves in a similar way to the writer in that each read returns a new instance of the reader with an updated state. However as well as a new reader instance, an option of the data item is returned. The example below is an implementation which will read a certain number of records into a scala Seq or fail with a Throwable. If there is an error encountered in the read process then the code will fail fast and return the throwable in the left side of the disjunction. If the output from the pucket is exhausted then it will close the reader and return the result in the right side of the disjunction.

def readData[T](count: Int, pucket: Pucket[T]): Throwable \/ Seq[T] =
  pucket.reader.flatMap(reader =>
    0.to(count).foldLeft((Seq[T](), reader).right[Throwable])( (acc, _) =>
      acc.fold(ex => return ex.left,
       dataAndReader => dataAndReader._2.read.fold(ex => return ex.left,
         optionalDataAndReader => (
           optionalDataAndReader._1.fold(
             // if the output is exhausted, then close the reader and return the state
             return optionalDataAndReader._2.close.map(_ => dataAndReader._1)  
              //if the data is present in the output then append the data to the state and include the updated writer state
            )(dataAndReader._1 ++ Seq(_)), optionalDataAndReader._2).right[Throwable]
        )
      )
    )
  ).flatMap(dataAndReader => dataAndReader._2.close.map(_ => dataAndReader._1))

More examples

For more detailed examples such as working with MapReduce and Spark please see the Pucket GitHub repository.

References

Cropped image of partition by Thomas Leth-Olsen is licensed under CC BY 2.0

Join the discussion in hacker news.