5

On Some Aspects of Big Data Processing in Apache Spark, Part 3: How To Deal With...

 2 years ago
source link: https://dzone.com/articles/on-some-aspects-of-big-data-processing-in-apache-s-2
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

On Some Aspects of Big Data Processing in Apache Spark, Part 3: How To Deal With Malformed Data?

In this post I present some solutions on how to deal with malformed date/time data, and on how to set a default value for malformed data.

In my previous post, I presented design patterns to program Spark applications in a modular, maintainable, and serializable way. This time I demonstrate a solution to deal with malformed date/time data, and how to set a default value to malformed data.

When I worked on a big data project, my tasks were to load data in different formats (JSON, orc, etc) from different sources (Kafka, Hadoop Distributed File System, Apache Hive, Postgres, Oracle), then transform the data, and to save the data to the same or different sources. The simplest task was to load data from a single data source (Postgres), and then save the data to another source (Hive), without any transformations. 

Even in this simplest case, there were a lot of malformed data! Especially, malformed date/time data took our team a lot of time to deal with. Also, rather often there were null values, and, sometimes empty arrays of data. So, it is worthwhile to have compact and versatile solutions to process such irregularities.

This post is organized as follows:

  • Section 1 describes how to deal with malformed date/time data,
  • Section 2 describes a simple decorator pattern to assign a default value to a malformed piece of data.

A fully workable code can be found here. Let's go.

I. Malformed Date/Time Data.

In my case, data was received in JSON format either from Kafka or from JSON files. Json-formatted data is built on two structures: a collection (object) of key-value pairs and an ordered list (array) of values. Values can be an object, an array, a string, a decimal number, a boolean, or null.  So, a piece of date/time data usually comes as a string.

Java doesn't have a built-in Date class, but there is a java.time package to work with date and time. The package contains LocalDate, LocalTime, LocalDateTime, ZonedDateTime, etc classes to store date/time data and parse date/time strings. Also, there are parsers, like SimpleDateFormat and DateTimeFormatter. These parsers accept a pattern string, like "dd.MM.yyyy HH:mm", and an input string, like "20.10.2020 12:30", to return a DateTime or Date object out of the input string. These objects can then act as fields in a Hibernate @Entity. Looks pretty straightforward, right?

Unfortunately, no. In many big data projects, date/time data comes in many different formats. Moreover, I encountered examples, when date/time data strings contained substrings in a different language!  So, it may not be possible to parse such date strings with a single pattern or even by a single parsing tool. 

To attack this problem, lets recall Chain of Responsibility design pattern.  The pattern consists of a Handler interface and ConcreteHandlers implementations. A ConcreteHandler refers to another ConcreteHandler; all the ConcreteHandlers form a linked list. The last ConcreteHandler refers to null.

In our case, this pattern is implemented as follows. Our Handler interface is called IChainDT:

public interface IChainDT {
	LocalDate getDateTime(String input);
	Date parseDateTime(String input);
    void setNextChain(IChainDT element);
}

Here parseDateTime method parses date/time strings, getDateTime  converts a Date object to a more convenient LocalDate object, and setNextChain method sets a link to another parser. The converter is added to demonstrate how the parser can make output dates "prettier" before the dates are returned. 

SimpleDateTimeParser class implements IChainDT interface:

public class SimpleDateTimeParser implements IChainDT {
	 String shortDateTimePattern;	
	    IChainDT dateTimeParser = null;
	    Date defaultTime = new Date(0L);
	    public SimpleDateTimeParser(String pattern) {
	        shortDateTimePattern = pattern;	       
	    }
	    public SimpleDateTimeParser(String pattern, IChainDT nextValidator) {
	        this(pattern);
	        this.dateTimeParser = nextValidator;
	    }
	    public LocalDate getDateTime(String json)  {
	    	Date result =parseDateTime(json);
	    	return result.toInstant().atZone(ZoneId.systemDefault()).toLocalDate());
	     	    }
	    public void setNextChain(IChainDT validator) {
	        this.dateTimeParser = validator;
	    }
	    public Date parseDateTime(String input) {
	    	DateFormat simpleDateFormatter=new SimpleDateFormat(shortDateTimePattern);
	        try {
	        	return simpleDateFormatter.parse(input);	          
	        } catch (Exception e) {
	            if (this.dateTimeParser != null) return this.dateTimeParser.parseDateTime(input);
	            else return defaultTime;
	        }
	    }
}

Here IChainDT dateTimeParser is a reference to another parser,  String shortDateTimePattern is a date/time pattern string. The other parser reference can be set either via the two-argument constructor or via the setter setNextChain.

Notice how the parseDateTime method works. Firstly, the method creates an instance  SimpleDateFormat with a specific pattern; we need the instance to be a local variable for the SimpleDateTimeParser  to be serializable (this post explains how Spark serializes tasks). If the simpleDateFormatter (with the specified pattern) fails to parse the input string, the formatter throws an exception. 

The exception gets caught in the catch block. If there is a dateTimeParser next in the chain, the next dateTimeParser.parseDateTime(input) gets called.  If the current parser is the last in the chain, the last parser's default value is returned; the value may be null.

Finally, let's see what this parser is called.

@Test
  public void parserTest(){
      String pattern1 = "yyyy-MM-dd";
      String pattern2 = "yyyy.MM.dd";
    
      IChainDT validator1 = new SimpleDateTimeParser(pattern1);  
      IChainDT validator2 = new SimpleDateTimeParser(pattern2);
      validator1.setNextChain(validator2);

      String testString = "2020-10-19";
      LocalDate result = validator1.getDateTime(testString);     
      assertEquals(result.getYear(),2020);
      
      testString = "2020.10.19";
      result = validator1.getDateTime(testString);
      assertEquals(result.getYear(),2020);
      
      testString="10/19/2020";
      result = validator1.getDateTime(testString);
      assertEquals(result.getYear(),1969);
  }

First, we create parsers for every pattern string. Next, we chain the parsers. Finally, we call the first parser in the chain on a date/time string. If none of the parsers succeeds in parsing the string, the default LocalDate value (new Date(0L)in this case) of the last parser in the chain is returned. 

This parser can also be implemented via an abstract class. In this case, we define an abstract class AChainDT instead of the interface IChainDT:

public abstract class AChainDT {	
	public AChainDT( String shortDateTimePattern) {			
		this.shortDateTimePattern = shortDateTimePattern;		
	}
	protected AChainDT nextParser=null;
	protected String shortDateTimePattern;	  
	protected Date defaultTime = new Date(0L);	
	public void setNextParser(AChainDT nextParser) {
		this.nextParser = nextParser;
	}	
	 public LocalDate getDateTime(String input) {
		 Date result =parseDateTime(input);
	    	LocalDate localDate = result.toInstant().atZone(ZoneId.systemDefault()).toLocalDate();
	    	    return localDate;
	 }
	 public abstract Date parseDateTime(String input);
}

Here, the abstract class contains the common part of all parsers - another parser, a pattern string, and the Date to LocalDate converter. A ConcreteHandler now looks more concise:

public class SimpleDateTimeParserA extends AChainDT{	
	public SimpleDateTimeParserA(String shortDateTimePattern) {
		super(shortDateTimePattern);		
	}
	@Override
	public Date parseDateTime(String input) {
		DateFormat simpleDateFormatter=new SimpleDateFormat(shortDateTimePattern);
        try {
        	Date result = simpleDateFormatter.parse(input);

            return result;
            } catch (Exception e) {
                if (nextParser != null) return nextParser.parseDateTime(input);
                else return defaultTime;
            }
	}
}

Again, we create a SimpleDateFormatter instance as a local variable for the parser to be serializable. This parser runs as before, except we replace IChainDT with AChainDT and SimpleDateTimeParser with SimpleDateTmeParserA. See the code for details.

II. Default Value Decorator.

As I mentioned in the introduction, a lot of nulls and empty arrays come as values in JSON strings. Also, sometimes when data is transferred from one database to another, the data types need to be converted, like Integer to BigDecimal. In all these cases, NullPointerExceptions, ArrayIndexOutOfBondsExceptions, and other exceptions need to get caught and processed.

A common scenario is when there is a functional interface to be fed as a callback to RDD transformations or actions. Let's decorate such an interface to catch and process exceptions. 

import org.apache.spark.api.java.function.Function;

public interface IExceptionDecoratorSpark {
	 static <Input, Output> Function<Input, Output> process(Function<Input, Output> fun, Output def) {
	        return new Function<Input, Output>() {
	            @Override
	            public Output call(Input o) {
	                try {
	                    return (Output) fun.call(o);
	                } catch (NullPointerException e) {
	                    return null;
	                } catch (Exception e) {
	                    return def;
	                }
	            }
	        };
	    }
}

Here the fun is an input function that implements Function interface. This function's input is an Input type object, and the output is an Output-type object.  The interface, returned by the process method, overrides a call method; inside the call method the fun is called. If there are exceptions, they get caught in the catch block and a null or a provided default def value is returned. As it should be in Java, more specific exceptions should be processed first.

This decorator is called the following way: 

@Test
    public void basicProcessorSparkTest() throws Exception {
        Double def = 10000.0;
        Double shouldBe = 0.5;
        Function<Integer, Double> fun = (x) -> 1.0 / x;
        Function<Integer, Double> outFun = IExceptionDecoratorSpark.process(fun, def);
        Double result = outFun.call(2);
        assertEquals(result, shouldBe);

    }

In this case, the fun returns an inverse of its input. In this case, fun works regularly. 

On the other hand, here is an example of when an exception is thrown and processed; a provided default value is returned as a result: 

 @Test
    public void exceptionProcessorSparkTest() throws Exception {
        Integer def = 10;
        Double shouldBe = 0.5;
        Integer[] input = new Integer[0];
        Function<Integer[], Integer> fun = (x) -> x[1];
        Function<Integer[], Integer> outFun = IExceptionDecoratorSpark.process(fun, def);
        Integer result = outFun.call(input);
        assertEquals(result, def);
    }

The fun returns the second element of an input array of integers. If such an element doesn't exist, the provided default value is returned. 

Notice that org.apache.spark.api.java.function.Function interface is not the same as java.util.function.Function. The former has to implement a call method; also the former interface is Serializable. The latter has to implement an apply method that is not necessarily serializable. The presented approach also works for java.util.function.Function interfaces, if we replace the input function type and the call method for an apply method. See the code for details.  

Conclusions

In this post, I demonstrated possible ways how to process malformed date/time data and how to create a default value decorator. The date/time processor can handle date/time strings that can not be parsed by means of a single template string. The decorator returns different default values for different exceptions thrown. Hope these tricks will be helpful for you.


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK