JAVA/Stream

JAVA Stream 병렬 처리와 성능 분석

수달하나 2022. 12. 9. 11:30

항상 for 문을 사용해서 개발을 했었는데 실무에서 사용된 코드를 보니 많은 곳에서 stream이 사용된다는 것을 확인했다. 그래서 stream 에 대한 정리가 필요 할 것 같아 stream 기능에 대해 공부 해보기로 했다. stream의 핵심은 사용방법이 아니고 가독성과 효율성 사이 트레이드 오프 관계의 이해이다.

 

stream 기능은 Java 8 부터 지원하는 기능이다.  2014년에 Java 8 이 처음 나왔는데 아직까지 많이 사용되는 버전중에 하나인것을 생각해보면 상당히 잘 만들어진 자바 버전인것 같다. 컬렉션 요소 하나하나를 람다식으로 처리할 수 있도록 하는 stream 기능은 개발자가 직접 반복자를 정의해서 처리하지 않고 내부 반복자를 통해서 처리 할 수 있도록 하여 코드의 가독성을 높여주고 있다. 또한 병렬 처리를 지원한다는 이점이 있다는데, 내가 궁금한건 바로 이 부분이다.

 

병렬처리 와 그 이점 그리고 이점이라고 하면 속도의 향상이나 이런것들을 생각할 텐데 과연 성능적으로 우세할까?

 

많은 블로그들의 속도 비교 분석을 통해 먼저 생각해보자면 일반적인 경우에 있어서 stream 이 성능적으로 떨어진다 라는 결론이 난다. 가장 속도가 빠른것은 인덱스를 지정한 for문 , 그 다음은 향상된 for문, 그리고 마지막으로 stream 이 제일 느리다는 일반적인 얘기가 있다.

 

병렬 처리를 지원하는 stream 이 속도가 가장 느리다?

무슨 아이러니 일까?

 

사실 병렬 처리를 지원하는것은 stream 이 아니고 paralledStream 이다. 일반 stream의 경우 단순히 for문의 형태를 stream 으로 변경하여 내부 반복자를 사용하는 알고리즘이지만 paralledStream은 좀 다르다. 쓰레드를 이용하여 내부 Task를 순서와 무관하게 처리하는 알고리즘으로 우선 가장 먼저 이해 해야 할 것은 ForkJoinPool 방식의 병렬 처리이다. 

 

보통의 ThreadPool 에서는 여러개의 쓰레드에게 Task 를 순서대로 배분, 쓰레드 들은 배분 받은 Task를 수행하는 과정을 거치게 되는데 여기서 문제점은  각각의 쓰레드가 할당된 Task 의 처리비용이 동일하지 않다는 것이다. 따라서 A 쓰레드가Task 를 완료시킨 것 과 B 쓰레드가 Task 를 완료시키지 못한것 은 아무런 상관 관계가 없다.(상관 관계가 없다는 것은 애매하다 서로 영향을 줄 수 없다 라는 표현이 맞을 듯 하다)

따라서 Task 를 마친 쓰레드는 아무행동도 취하지 않는 유휴타임이 생기게 되는데 ForkJoinPool 방식에서는 이러한 것을 방지한다. 모든 쓰레드가 최대한 동일한 Task 처리 비용을 유지하도록 하여 특정 쓰레드의 유휴타임이 생기지 않도록 쓰레드를 관리하는 방식이다. 

하나의 큐 로부터 쌓인 Task 들을 각각의 queue를 가진 여러개의 쓰레드가 분배받게 되고 Task 가 없는 쓰레드는 다른 쓰레드의 큐 에서 Task 를 스틸하여 유휴쓰레드가 없도록 하는 것이 ForkJoinPool의 동작 알고리즘이다.

ForkJoinPool 방식은 별도의 커스텀 설정이 없다면 cpu 의 스펙 혹은 어플리케이션의 스펙에 따라서 자동으로 쓰레드의 갯수가 지정이 되고 스레드가 N 개 생성되 될 경우 N-1 개의 쓰레드는 ForkJoinPool에 사용, 메인쓰레드는 스트림을 처리하는 쓰레드로 사용이 되는것이 기본 사용정의다.

 

아 그럼 paralleStream 은 항상 stream 보다 빠르게 동작할까? 

당연하게도 이것은 틀렸다.

 

모든 상황에서 절대적인 성능이 존재하지 않듯이 parallelStream을 사용한다고 해서 stream보다 성능이 우세한 것은 아니다. 실제로 parallelStream은 객체 생성 비용이 발생하기 때문에 Task의 자원소모 비용과 개수에 따라서 각각의 상황에 따라 다른 성능을 보여줄 수 있기 때문이다.

public class StreamTest {

    private String name;
    private Integer value;

    public StreamTest(String name, Integer value){
        this.name = name;
        this.value = value;
    }
}

데이터의 양을 조절하여 데이터가 적을 경우와 많을 경우 따로 확인을 해 보았다.

public static final int small_size = 1000;
public static final int large_size = 1000000;

List<StreamTest> tests = new ArrayList<>();
Random random = new Random();

for(int i = 0; i< small_size; i++){
	tests.add(new StreamTest(UUID.randomUUID().toString(), random.nextInt(100)));
}

for(int i=0; i<10; i++){
	tests.stream().filter(t ->t.getValue()%2==0);
	tests.parallelStream().filter(t -> t.getValue()%2==0);
}

System.out.println("small list average value");
filterTest(tests);

for(int i=0; i<10; i++) {
	Map<Integer, List<StreamTest>> map1 = tests.stream().collect(Collectors.groupingBy(StreamTest::getValue));
	Map<Integer, List<StreamTest>> map2 = tests.parallelStream().collect(Collectors.groupingBy(StreamTest::getValue));
}

System.out.println("small list groupingBy value");
groupingTest(tests);
    
for(int i = 0; i< large_size; i++){
	tests.add(new StreamTest(UUID.randomUUID().toString(), random.nextInt(100)));
}

for(int i=0; i<10; i++){
	tests.stream().filter(t ->t.getValue()%2==0);
	tests.parallelStream().filter(t -> t.getValue()%2==0);
}

System.out.println("large list average value");
filterTest(tests);

for(int i=0; i<10; i++) {
	Map<Integer, List<StreamTest>> map1 = tests.stream().collect(Collectors.groupingBy(StreamTest::getValue));
	Map<Integer, List<StreamTest>> map2 = tests.parallelStream().collect(Collectors.groupingBy(StreamTest::getValue));
}

System.out.println("large list groupingBy value");
groupingTest(tests);

private static void filterTest(List<StreamTest> tests){
	long start = 0;
	long end = 0;
	long runningTime = 0;

	start = System.nanoTime();
	tests.stream().filter(t -> t.getValue()%2==0);
	end = System.nanoTime();
	runningTime = end - start;
	System.out.println("single stream : "+runningTime);

	start = System.nanoTime();
	tests.parallelStream().filter(t->t.getValue()%2==0);
	end = System.nanoTime();
	runningTime = end - start;
	System.out.println("Parallel Stream : " + runningTime +"\n");

}

public static void groupingTest(List<StreamTest> tests){
	long start = 0;
	long end = 0;
	long runningTime = 0;

	start = System.nanoTime();
	Map<Integer, List<StreamTest>> map1 = tests.stream().collect(Collectors.groupingBy(StreamTest::getValue));
	end = System.nanoTime();
	runningTime = end - start;
	System.out.println("single Stream : "+ runningTime);

	start = System.nanoTime();
	Map<Integer, List<StreamTest>> map2 = tests.parallelStream().collect(Collectors.groupingBy(StreamTest::getValue));
	end = System.nanoTime();
	runningTime = end - start;
	System.out.println("parallel Stream : "+runningTime +"\n");
}

복잡한 연산이 아닌 단순 stream 일 경우 일반 stream과 parallelStream 의 차이가 많지는 않았다. 하지만 복잡한 연산과정을 통해서 stream을 수행하는 경우엔 일반적인 stream 보다 parallelStream을 쓰는것이 시간 단축에 있어서 유리하다는 결과를 얻을수 있었다.


참고 블로그 : https://hamait.tistory.com/612

 

쓰레드풀 과 ForkJoinPool

쓰레드똑똑똑!누구니?쓰레드 에요.. 프로그램(프로세스) 안에서 실행 되는 하나의 흐름 단위에요. 내부에서 while 을 돌면 엄청 오랬동안 일을 할 수 도 있답니다.쓰레드 끼리는 값 (메모리) 을 공

hamait.tistory.com