Java Custom Accumulators implementation to collect bad records

Accumulators:

During transformations in spark we often encounter a problem where we can use the variables defined outside the function that we pass to map() or filter but cannot pass the data from the function back to driver. Accumulators which is a shared variable in the spark cluster solve this problem. 

Accumulators work as follows:
  • We create them in the driver by calling the SparkContext.accumulator(initial Value) method, which produces an accumulator holding an initial value. The return type is an org.apache.spark.Accumulator[T] object, where T is the type of initialValue.
  • Worker code in Spark closures can add to the accumulator with its += method (or add in Java).
  • The driver program can call the value property on the accumulator to access its value (or call value() and setValue() in Java).
Spark’s built-in accumulator types: integers (Accumulator[Int]) with addition. Out of the box, Spark supports accumulators of type Double, Long, and Float. In addition to these, Spark also includes an API to define custom accumulator types.

Custom accumulators need to extend AccumulatorParam, which is covered in the Spark API documentation. Beyond adding to a numeric value, we can use any operation for add, provided that operation is commutative and associative.

I have a requirement where I have data in a file I want to collect the records who length is greater than characters: First I create custom accumulator implementing org.apache.spark.AccumulatorParam

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class RecordAccmulator implements AccumulatorParam<Map<String, String>> {
    private static final long serialVersionUID = 1L;

  @Override
  public Map<String, String> addInPlace(Map<String, String> arg0, Map<String, String> arg1) {
    Map<String, String> map = new HashMap<>();
    map.putAll(arg0);
    map.putAll(arg1);
    return map;
  }

  @Override
  public Map<String, String> zero(Map<String, String> arg0) {

    return new HashMap<>();
  }

  @Override
  public Map<String, String> addAccumulator(Map<String, String> arg0, Map<String, String> arg1) {

    return addInPlace(arg0, arg1);
  }
}

I use the custom accumulator in my business class:


1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
JavaSparkContext sc = SparkUtils.createSparkContext(MyTest.class.getName(), "local[*]");
 SQLContext hiveContext = SparkUtils.getSQLContext(sc);
 
 JavaRDD<String> file = sc.textFile("inputfile.txt");
 logger.info("File Record Count:: "+file.count());
 
 
    Accumulator<Map<String, String>> accm = sc.accumulator(new HashMap<>(), new RecordAccmulator());
 
 JavaPairRDD<String, String> filePair = file.mapToPair(new PairFunction<String, String, String>(  ) {

    private static final long serialVersionUID = 1L;

    @Override
    public Tuple2<String, String> call(String t) throws Exception {
        String[] str = StringUtils.split(t,":");
      
      if(str[1].length()>10){
        Map map = new HashMap<>();
        map.put(str[0], str[1]);
        accm.add(map);
      }
      return new Tuple2<String, String>(str[0], str[1]);
    }});
 
 logger.info("Pair Count:: "+filePair.count());
 logger.info("Accumulator Values:: "+accm.value());

No comments:

Post a Comment