Pyarrow

Open Questions

  • How do you specify a schema if you don't want it to just be inferred?

Memory and IO Interfaces 🔗

Referencing and Allocating Memory

pyarrow.Buffer

  • API docs
  • pyarrow.Buffer wraps the C++ arrow::Buffer type
  • A Buffer can be created from any Python object implementing the buffer protocol by calling the py_buffer() function.
  • This is how to encode data from native Python types to Arrow buffers
    • arrowBuf = pa.py_buffer(pythonBytesObjectOrSomething)
    • Creating a Buffer in this way does not allocate any memory; it is a zero-copy view on the memory exported from the bytes object.
  • Buffers can be used in circumstances where a Python buffer or memoryview is required, and such conversions are zero-copy
    • Good to remember; an Arrow Buffer can be used in place of a native Python buffer
  • The Buffer’s to_pybytes() method converts the Buffer’s data to a Python bytestring (thus making a copy of the data)

Input and Output

The Arrow C++ libraries have several abstract interfaces for different kinds of IO objects:

  • Read-only streams
  • Read-only files supporting random access
  • Write-only streams
  • Write-only files supporting random access
  • File supporting reads, writes, and random access

In the interest of making these objects behave more like Python’s built-in file objects, we have defined a NativeFile base class which implements the same API as regular Python file objects.

  • Like built-in file APIs but with zero-copy capabilities.
  • Lots of options, probably relevant when looking to really optimize performance

High-Level Streams APIs

  • pa.input_stream(buf) will return a BufferReader when passed native Python buffers or memoryviews
  • If passed a string or file path, input_stream will open the given file on disk for reading, creating a OSFile. Optionally, the file can be compressed: if its filename ends with a recognized extension such as .gz, its contents will automatically be decompressed on reading.
    • 🤯
  • output_stream() is the equivalent function for output streams and allows creating a writable NativeFile.
    • Definitely don't grok this

In-Memory Reading and Writing

To assist with serialization and deserialization of in-memory data, we have file interfaces that can read and write to Arrow Buffers.

writer = pa.BufferOutputStream()
writer.write(b'hello, friends')

buf = writer.getvalue()
print(buf)

reader = pa.BufferReader(buf)
reader.seek(7)
reader.read(7)

Streaming, Serialization, and IPC 🔗

  • Arrow defines two types of binary formats for serializing record batches:
    • Streaming format: for sending an arbitrary length sequence of record batches. The format must be processed from start to end, and does not support random access
    • File or Random Access format: for serializing a fixed number of record batches. Supports random access, and thus is very useful when used with memory maps
# create a small record batch
import pyarrow as pa

data = [
    pa.array([1, 2, 3, 4]),
    pa.array(['foo', 'bar', 'baz', None]),
    pa.array([True, None, False, True])
]

batch = pa.record_batch(data, names=['f0', 'f1', 'f2'])

Now, we can begin writing a stream containing some number of these batches. For this we use RecordBatchStreamWriter, which can write to a writeable NativeFile object or a writeable Python object. For convenience, this one can be created with new_stream():

sink = pa.BufferOutputStream()
writer = pa.ipc.new_stream(sink, batch.schema)

When creating the StreamWriter, we pass the schema, since the schema (column names and types) must be the same for all of the batches sent in this particular stream. Now we can do:

# write multiple record batches into the StreamWriter
for i in range(5):
   writer.write_batch(batch)

writer.close()

buf = sink.getvalue()

Now buf contains the complete stream as an in-memory byte buffer. We can read such a stream with RecordBatchStreamReader or the convenience function pyarrow.ipc.open_stream:

reader = pa.ipc.open_stream(buf)

reader.schema
# Out[13]:
# f0: int64
# f1: string
# f2: bool

batches = [b for b in reader]

len(batches)
# 5

An important point is that if the input source supports zero-copy reads (e.g. like a memory map, or pyarrow.BufferReader), then the returned batches are also zero-copy and do not allocate any new memory on read.

Writing and Reading Random Access Files

Basically, when you read a full file from disk, you immediately know how many batches it contains (determined by what?) and can read from it in any location

Reading from Stream and File Format for pandas

The stream and file reader classes have a special read_pandas method to simplify reading multiple record batches and converting them to a single DataFrame output

df = pa.ipc.open_file(buf).read_pandas()