</We are hiring!>
Switch style

Writing custom PrestoDB functions

Published on data-infrastructure technology - - Dante Pawlow

Here at Jampp we process and analyze large amounts of data. One of the tools we employ to do so is PrestoDB, which is a “Distributed SQL Query Engine for Big Data”. Presto comes with many native functions, which are usually enough for most use cases. Nevertheless, sometimes you need to implement your own function for a very specific use.

Enter the User Defined Functions (UDFs, for short). Writing one for the first time is not as straightforward as it may appear, mainly because the information to do so is very scattered around the web (and across many Presto versions).

In this blogpost, we present our JSON_SUM function, how we wrote it, and some of the lessons we learned along the way.

Function types

Presto has two main types of functions: scalar and aggregation1.

Scalar functions are applied to every element of a list (or every selected row, in this case), without altering the order or the amount of elements of said list. You can think of them as being map functions.

On the other hand, aggregation functions take multiple rows as input and combine them into a single output. We’ll focus mainly on the these, as they’re more complex (and more interesting to implement!).

Aggregation functions can harness the power of Presto’s distributed workers via a divide and conquer approach. They consist of three main methods:

  • input, which reads each row and feeds it to a state.
  • combine, invoked whenever two states need to be merged.
  • output, which returns the result of the aggregation at the end.

The State is a data structure that stores the meaningful results of each step of the aggregation, and can be designed according to the specific needs, the only requirement being that it’s serializable to be shareable between workers.

It’s important to note that no order of input or combination should be assumed: any given state should be able to be combined with any other and any row should be able to be converted to a valid state. You can, however, define the initial parameters of the state via its constructor.

Implementation

User Defined Functions are written in Java, inside a plugin, separate from the default Presto functions. They are exposed to Presto by declaring them in UdfPlugin.java and then deploying them as a .jar file in the /usr/lib/presto/plugin/udfs directory.

public class UdfPlugin
        implements Plugin
{
    @Override
    public Set<Class<?>> getFunctions()
    {
        return ImmutableSet.<Class<?>>builder()
                .add(JSONAggregation.class)
                .add(YourUDF.class)
                .build();
    }
}

Scalar functions are fairly simple: you write a single Java function, and annotate it appropriately. Here’s a simple abs function from Presto’s Math Functions:

@Description("absolute value")
@ScalarFunction("abs")
@SqlType(StandardTypes.TINYINT)
public static long absTinyint(@SqlType(StandardTypes.TINYINT) long num)
{
    checkCondition(num != Byte.MIN_VALUE, NUMERIC_VALUE_OUT_OF_RANGE, "Value -128 is out of range for abs(tinyint)");
    return Math.abs(num);
}

Aggregation functions are trickier to implement, as they have many more moving parts. Aside from the input, combine and output functions, you should write a State interface and its auxiliary files. If your state uses just basic data types, Presto automatically knows how to construct, serialize and deserialize it. Otherwise, you should implement a Factory and a Serializer, and link them to the State using Presto’s Metadata Annotations.

@AccumulatorStateMetadata(stateFactoryClass = JSONAggregationStateFactory.class, stateSerializerClass = JSONAggregationStateSerializer.class)
public interface JSONAggregationState
        extends AccumulatorState
{
    Map<String, Object> getMap();

    void setMap(Map<String, Object> value);
}

If you’d like to write an UDF that takes different data types as input (JSON and VARCHAR, for example), there are two ways of going about it. The simple one is to write two (or more) input functions that take those data types as parameters. This works in our case, but if you need your UDF to be more generic, you can follow PrestoDB’s example in their native functions.

Testing

Once you have your UDF, the sensible thing is to test it. But how do you simulate Presto’s inner workings? This is especially important for aggregation functions, given that Presto takes the components of your UDF and uses them inside a black box. Thankfully, we have Docker!

Here we provide a Makefile and a docker-compose.yml2 for getting a Presto server up and running locally, automatically incorporating your UDF.

  1. First you need to package your UDFs with Maven.
  2. Then, replace the path to them in the docker-compose.yml file.
  3. Lastly, run the make and make run-with-logs commands.

Once it’s up, it can be queried like any other Presto cluster, so you can use it to test your UDF.

Pitfalls

My kingdom for a discernible stack trace!
King Richard III

Developing software never is a smooth-sailing kind of deal (where’s the fun in that?), so be prepared to stare into weird looking stack traces. Here are some of the traps we’ve fallen into, hopefully it’ll save you some headaches:

  • Do NOT use the presto-main dependency for anything besides testing (and even then, try to avoid it). Whichever function you need from presto-main will most like have an equivalent in the presto-spi package.
  • If your state uses more complex data types than the basic types, you should add a Factory and a Serializer (otherwise, you get weird errors like “HashMap<> not supported”).
  • Always check that the .jar files are deployed in both the coordinator and the workers. If you get an esoteric Presto exception (like “varchar not found” even though the function is listed in the SHOW FUNCTIONS; query), this is the most likely suspect.

Conclusion

Presto is a very powerful tool, and given the wide array of native functions at your disposal, it’s likely that you’ll never need to write an UDF. Nevertheless, doing so can be a fun adventure into Presto’s inner workings and may encourage you to contribute to the project.

  1. There’s also window functions, but we have yet to implement one of those, so we’ll leave them out of the scope of this article.

  2. Thanks to Lewuathe’s docker-presto-cluster!

x </We are hiring>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
    import tornado.ioloop
    import tornado.web

    
    class CandidatesHandler(tornado.web.RequestHandler):

        def get(self, name):
            secret = self.get_argument('secret')
            is_geek = False
    
            if ',' in secret:
                word = ''.join([chr(int(x)) for x in secret.split(',')])
                is_geek = word == 'geek'
    
            if is_geek:
                self.write("Hi %s, we are waiting for you. jobs@jampp.com" % name)
            else:
                raise tornado.web.HTTPError(404, "Geek not found")
    
    
    if __name__ == "__main__":
        app = tornado.web.Application([
           (r'/candidates/(.*)', CandidatesHandler)
        ])
        app.listen(8000)
        tornado.ioloop.IOLoop.instance().start()