2

Polyglot Apache Arrow: Java and Python Perspective

 1 year ago
source link: https://medium.com/gooddata-developers/polyglot-apache-arrow-java-and-python-perspective-bf2ce020e27d
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

Polyglot Apache Arrow: Java and Python Perspective

1*IiLe1-JfTrw9o3M32Oeu8g.png

Apache Arrow, a versatile, analytics-focused in-memory data format, offers the flexibility to work with data seamlessly across multiple programming languages, such as Java and Python. While utilizing Apache Arrow in more than one language may initially seem straightforward — write a feature in one language and ‘translate’ it to another — there are certain nuances and challenges that may arise. In this article, we aim to shed light on some of these challenges we encountered at GoodData. We do this not to criticize the Arrow project, but rather to provide insights and solutions for fellow developers who may encounter similar obstacles.

Issues encountered

To keep things organized, let’s split the tricky parts into three categories:

  • Missing APIs
  • Different API philosophies
  • API inconsistencies

Missing cancel in FlightStreamWriter in Python

The first difference we encountered has to do with how the client can cancel a put operation.

There are several reasons the client might want to stop the writing half-way. For example it reads from some underlying datasource batch-by-batch and sends each batch to the Flight server. It can then happen that the underlying datasource raises an exception. The flight client then wants to signal to the server: “I am canceling this put operation, disregard whatever I have sent you so far”.

In Java, the process is simple: clients call the error method of the ClientStreamListener returned by the startPut method, which will close the stream and signal the cancellation to the server (where server_call_context.is_cancelled() returns True). Consequently, the server can react accordingly.

In Python, though, the client cannot cancel the flight put operation in a way the server would be able to detect. It can raise an exception, but the server has no way of knowing which of these cases occurred:

  1. The client is done writing and just did not call done_writing and the data sent by the client should be persisted by the server.
  2. The client wants to cancel the operation, and any data sent by the client should not be persisted by the server.

Currently, there is no way of doing this in Python (we’ve opened an issue for it). Fortunately for us, we currently do not use the Python client in this context. However, should we start, or decide to make the client public, we would have to handle this somehow.

Different API philosophies

Start_put: tuple in Python, listener as an argument in Java

Since we are running the Arrow Flight RPC servers in a High-Availability (HA) environment, the logic behind putting a flight there is more involved. First, we need to contact the cluster with the flightPath we would like to put and it responds with a list of locations of the nodes that can accept that particular path. Once we have the list, we try each node in the list and use the first node that successfully accepts the put to perform the operation.

To avoid duplicating this logic across the applications, we encapsulated it in our clients. First, we implemented this logic in Python.

def _start_put(
self,
flight_path: str,
schema: pyarrow.Schema,
options: pyarrow.flight.FlightCallOptions,
) -> tuple[pyarrow.flight.FlightStreamWriter, pyarrow.flight.FlightMetadataReader]:
# this method handles the initiation of the do_put operation while also handling situations when the cluster
# changes between the operations (the client that was ok before is now broken): in that case we need to try
# again with another client. the tricky part is that we only know whether the client is ok or not when we
# initiate the do_put operation: only then can we decide to query the shards again
descriptor = pyarrow.flight.FlightDescriptor.for_path(flight_path)

writer: Optional[pyarrow.flight.FlightStreamWriter] = None
metadata_reader: Optional[pyarrow.flight.FlightMetadataReader] = None
error: Optional[pyarrow.flight.FlightError] = None
put_started = time.time()

while (
writer is None
and time.time() - put_started < self._put_establishing_timeout
):
try:
error_count = 0
shard_count = 0
for shard in self.shards_for_put(flight_path=flight_path):
try:
shard_count += 1
writer, metadata_reader = shard.do_put(
descriptor=descriptor, schema=schema, options=options
)
break
except pyarrow.flight.FlightUnavailableError as e:
error_count += 1
error = e
# if all shards failed raise an error so that the placement is tried again after backoff, not immediately
if shard_count > 0 and shard_count == error_count:
raise cast(pyarrow.flight.FlightError, error)
except pyarrow.flight.FlightUnavailableError as e:
error = e
time.sleep(0.25)

if writer is None or metadata_reader is None:
# if the writer is not assigned, we must have hit an error
assert error is not None
raise error

return writer, metadata_reader

This is pretty straightforward, so when we wanted to replicate a similar logic in Java, we approached it with a “just rewrite this to Java” attitude. As it turns out, this it not so simple, as there are two substantial differences between Java and Python in this area:

  1. The semantics differ for the writer.
  2. The API shape is quite different in Java.

The first difference is between what state the writer is returned in. In Python, when the writer is returned, you can start writing to it right away. In Java, however, you should first check the isReady flag (or even better subscribe to setOnReadyHandler) before writing to the stream. This complicates our node probing logic a bit.

public StartPutResult startPut(FlightDescriptor descriptor,
VectorSchemaRoot root,
CallOption... options) throws FlightRuntimeException, InterruptedException {
ClientStreamListener writer = null;
CustomPutListener metadataListener = null;
var startTime = System.currentTimeMillis();
FlightRuntimeException error = null;
var resolveLocationsCalled = false;
while (writer == null && System.currentTimeMillis() - startTime < putEstablishingTimeout) {
// first, try getting all the locations that can accept the flight
List<Location> locations;
try {
var distributionNodes = getDistribution(descriptor, options);
locations = nodesToLocations(distributionNodes);
} catch (FlightRuntimeException e) {
if (isRecoverable(e)) {
if (isConnectionFailure(e) && !resolveLocationsCalled) {
locationProvider.resolveLocations();
resolveLocationsCalled = true;
}
error = e;
Thread.sleep(putEstablishingGracePeriod); // back off for a bit before retrying
continue;
} else {
throw e;
}
}

// then try each location in turn to find the first one that manages to establish the put operation
for (var location : locations) {
try {
metadataListener = new CustomPutListener();
writer = flightClientsAdapter.startPut(location, descriptor, root, metadataListener, options);
Thread.sleep(putEstablishingGracePeriod); // give it some time to get ready, without it, the isReady would never be true
if (writer.isReady()) {
// we have the writer that is ready to accept flights: break out of the cycle
break;
} else {
// the writer was not ready in time: clean up and try another
var toCleanUp = writer;
writer = null;
metadataListener = null;
error = null;
toCleanUp.error(new Exception("client not ready in time, cleaning up"));
toCleanUp.getResult();
}
} catch (FlightRuntimeException e) {
// we assume that FlightStatusCode.CANCELLED encountered here always means we hit
// the "client not ready in time" and should therefore try again
var isClientNotReadyInTimeError = e.status().code() == FlightStatusCode.CANCELLED;
var shouldRetry = isClientNotReadyInTimeError || isRecoverable(e);
if (shouldRetry) {
if (isConnectionFailure(e) && !resolveLocationsCalled) {
locationProvider.resolveLocations();
resolveLocationsCalled = true;
break;
}
error = e;
} else {
throw e;
}
}
}
}

if (writer == null) {
if (error == null) {
throw new ArrowClientNoNodeForPutException();
} else {
throw error;
}
}

return new StartPutResult(writer, metadataListener);
}

As you can see, the logic is more elaborate than its Python counterpart (even though the ideas behind them are the same).

The second difference is that in Python you get both the writer and the metadata_reader for both directions of the bidi channel. In Java, however, you only get the writer and you need to provide your own listener implementation (defined by the PutListener interface). Due to the retry logic we have in place, we need to actually return this listener (and control its creation: we might need more than one and return the one associated with the node that is used in the end).

To make providing your own PutListener easier, there are two implementations provided by the Arrow Java client: SyncPutListener and AsyncPutListener you can use.

Since we want to read from the server only after the whole writing process is complete, SyncPutListener seemed like the more fitting choice. We wanted to extend it with some domain-specific methods to make it easier to use in our context, but alas, the SyncPutListener is final and cannot be extended. We could make a new class and compose the SyncPutListener into it, but that seemed like quite a lot of boilerplate. Instead, we chose to extend the extensible AsyncPutListener. This class allows you to override only one method and build your logic around it.

package com.gooddata.demo;

import org.apache.arrow.flight.AsyncPutListener;
import org.apache.arrow.flight.PutResult;

import java.nio.charset.StandardCharsets;

public final class CustomPutListener extends AsyncPutListener {
private boolean hasWaitedForResult = false;
private String syncToken = null;

/**
* Returns the syncToken that can be used to enforce read-after-write consistency.
* Must be called after {@link CustomPutListener#waitForComplete}.
*/
public String getSyncToken() {
if (!hasWaitedForResult) {
throw new IllegalStateException("You must call waitForComplete before trying to access the sync token.");
}
if (syncToken == null || syncToken.isBlank()) {
throw new IllegalStateException("Server did not send any sync token back.");
}
return syncToken;
}

@Override
public void onNext(PutResult val) {
var metadata = val.getApplicationMetadata();
var charBuffer = StandardCharsets.UTF_8.decode(metadata.nioBuffer());
this.syncToken = charBuffer.toString();
}

/**
* Call this method to ensure the writing of the server-sent metadata is finished.
* This must be done before accessing any of the properties provided by this class.
*/
public void waitForComplete() {
getResult();
hasWaitedForResult = true;
}
}

Mandatory done_writing in Python

An especially tricky aspect of the Python client code is the requirement to call the done_writing method once writing to the stream is complete. Otherwise, the server can get stuck waiting for the next batch that might never come. Below is a simple server code illustrating this:

import pyarrow as pa
import pyarrow.flight as flight


class Server(flight.FlightServerBase):
def __init__(self, location):
super(Server, self).__init__(location=location)

def do_put(self, context, descriptor, reader, writer):
for batch in reader:
print(f"got batch {str(batch.data)}")
# or even just reader.read_all()

print("sending metadata back")
writer.write(pa.py_buffer((42).to_bytes(8, byteorder="big")))


def main():
server = Server("grpc://localhost:16661")
server.serve()


if __name__ == "__main__":
main()

This server works correctly with well-behaved clients. When faced with a not-so-well-behaved one like the one in the following code listing, it will get stuck:

import pandas as pd
import pyarrow as pa
import pyarrow.flight as flight


def main():
client = flight.connect("grpc://localhost:16661")
df = pd.DataFrame(
{
"n_legs": [None, 4, 5, None],
"animals": ["Flamingo", "Horse", None, "Centipede"],
}
)
table = pa.Table.from_pandas(df)

writer, reader = client.do_put(
flight.FlightDescriptor.for_path(b"fun/times"), table.schema
)

writer.write_table(table)

print("done putting")
# if this line is uncommented, everything starts working fine
# writer.done_writing()

meta = reader.read()
print(int.from_bytes(meta))


if __name__ == "__main__":
main()

Unless the writer.done_writing() is called, both the client and the server get stuck: the server waiting for batches from the client, the client waiting for a message from the server.

The documentation does not mention the significance of the done_writing call. It is apparently a known issue, one that will hopefully get resolved with the introduction of async APIs.

In Java, this is not an issue: once the client is closed, the stream is closed as well and everything works as expected (although you should also call the completed method, as mentioned in the Arrow Java Cookbook).

API inconsistencies

The mismatch between metadata types: bytes in Python, String in Java

The most recently discovered tricky part had to do with custom flight metadata. Quite some time ago, we added a custom metadata field to our flights. This field contains a serialized protobuf command that was used to generate this flight (an opt-in feature useful for debugging).

In Python, we serialized this protobuf object to bytes using the standard protobuf serialization (using the SerializeToString method of the protobuf-generated object). We stored this serialized object in the flight metadata. This worked in Python no problem: writing the metadata was ok, reading it as well (we even had tests for both).

Some time later we tried to access this metadata field from Java and started getting exceptions:

java.lang.IllegalArgumentException: Invalid UTF-8: Illegal leading byte in 2 bytes utf.

After a bit of not-so-easy debugging (especially due to our error handling unintentionally swallowing the original exception, which we have since improved), we discovered that the problem is that in Java, each of the metadata values must be a String, not a bytes array (see the particular line that raises the exception).

To work around this limitation, we now store the command base64-encoded in the schema: this admittedly makes the schema larger, but is currently the only way around this issue.

Conclusion

In conclusion, although working with Apache Arrow in multiple languages such as Java and Python offers great flexibility, there are certain challenges and differences that developers need to be aware of. This article has highlighted some of these challenges encountered during the implementation process.

Despite these challenges, it is important to emphasize that the Apache Arrow project itself is highly appreciated and enjoyable to work with. By highlighting the encountered challenges, this article aims to provide awareness and guidance for developers who may face similar difficulties.

As the Apache Arrow community continues to evolve and improve the project, we expect that some of these challenges will be addressed, leading to smoother cross-language integration and enhanced usability.


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK