spark udaf update array type of tuple

By : Jeremy
Source: Stackoverflow.com
Question!

I'm using Scala + Spark 2.0 and trying to write an UDAF that has an Array of tuples as its internal buffer as well as its return type: ...

def bufferSchema = new StructType().add("midResults", ArrayType(  StructType(Array(StructField("a", DoubleType),StructField("b", DoubleType))) ))

def dataType: DataType = ArrayType(  StructType(Array(StructField( "a", DoubleType),StructField("b", DoubleType))) )

And this is how I update the buffer

def update(buffer: MutableAggregationBuffer, input: Row) = {
buffer(0) = buffer.getAs[mutable.WrappedArray[(Double,Double)]](3) ++ Array((3.0,4.0))
}

But I get the following exception:

java.lang.ArrayStoreException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema

This pattern works if I have a simple Array of Double..

By : Jeremy


Answers

java.lang.ArrayStoreException is "thrown to indicate that an attempt has been made to store the wrong type of object into an array of objects" and this expected because a local Scala type for StructType is o.a.s.sql.Row not a tuple. In other words you should use a Seq[Row] as a buffer field and Row as a value.

Notes:

  • Calling ++ in a loop is probably not the best idea ever.
  • Creating an UDAF is slightly obsolete if you consider that since Spark 2.0 collect_list supports complex types.
  • Arguably Aggregators are way more user friendly than the UserDefinedAggregateFunctions.
By : zero323


The best solution of course is to use promises, but as you have reported you do not know very well about the language, this solution does not impact much your original code:

Instead $.each loop, execute every iteration at a time, then call the next when the callback fires.

function (data) {
    function done() {
        populateFirstConversation(data);
        bindEvents();
    }

    function getNextConversation(index) {
        if (index == data.length)
            return done();

        var value = data[index];

        if (value.toUser == ...) {
            getUser(..., function () {
                ...

                getNextConversation(index + 1);
            });
        } else {
           getUser(..., function () {
                ...

                getNextConversation(index + 1);
           });
        }
    }

    getNextConversation(0);
}


Nothing seems to have changed because I am able to achieve it even using Swift 3 as shown below.

import UIKit

class ViewController: UIViewController
{
    override func viewDidLoad()
    {
        super.viewDidLoad()

        let temp: UIDatePicker = UIDatePicker(frame: self.view.frame)
        temp.setValue(UIColor.purple, forKey: "textColor")

        view.addSubview(temp)
    }

    override func didReceiveMemoryWarning()
    {
        super.didReceiveMemoryWarning()
    }
}


This video can help you solving your question :)
By: admin