7.2.1. 까지….
병렬 스트림!!!
컬렉션에서 parallelStream을 호출해서 병렬 스트림을 생성할 수 있음
1 ~ n까지 합을 구하는 메소드를 만든다고 하자. 일반적인 스트림을 사용한다면 다음과 같이 작성될 수 있다.
public long sequentialSUm(long n) {
return Stream.iterate(1L, i -> i+1)
.limit(n)
.reduce(0L, Long::sum);
}
그럼 이제 병렬 스트림을 이용하여보자. 간단하다….
public long sequentialSUm(long n) {
return Stream.iterate(1L, i -> i+1)
.limit(n)
.parallel() // 병렬 스트림
.reduce(0L, Long::sum);
}

그림으로는 두개의 청크만 쪼개지는 것을 표현하였지만, 딱 두개로 쪼개지지는 않는다.
병렬 스트림의 내부적인 구조를 살펴보면, ForkJoinPool을 사용한다고 한다.
프로세서의 수와 동일하게 설정되는 전역 설정이 있고, 이 값을 통해 스레드를 생성한다.
그리고, 하나의 병렬 스트림에 특별히 사용할 수 있는 값을 지정할 수는 없음.
성능 측정
책에선 너줄너줄 여러가지의 설명을 하지만 핵심적인 내용을 옮겨적어보면
- 측정해봐라
- 측정해보라니까
- 측정해봐야 함.
정도이다.
벤치마크를 돌려본다(직접 돌려보지는 않고 책 내용을 기반으로 한 결과값을 공유한다.)
public long sequentialSUm(long n) {
return Stream.iterate(1L, i -> i+1)
.limit(N)
.reduce(0L, Long::sum);
}

public long iterativeSum(long n) {
long result = 0;
for (long i = 1L; i <= N ; i++) {
result += i;
}
return result;
}

public long sequentialSUm(long n) {
return Stream.iterate(1L, i -> i+1)
.limit(n)
.parallel() // 병렬 스트림
.reduce(0L, Long::sum);
}

전통적 for문 : 3ms, 단일 스트림 : 121ms, 병렬 스트림 : 600ms 가 소모된 어이없는 결과가 나왔다.
왜 이런 결과가 나왔는가? 이유는 다음과 같다
- 연산 자체가 병렬로 나누기가 어려운 상황. reduce연산이 전체 숫자 리스트가 준비되지 않았으므로, 청크로 분할할 수 없음
- 실 로직 동작은 단일 스레드로 진행하면서 스레드 생성, context switch비용만 쓸데없이 더 붙음…
- 박싱 & 언박싱에 소모되는 리소스 존재
이래서 측정을 해 봐야 한다는 이야기가 나오는 것 같다.
그래서 어떻게 해결 할 것인가?
당연히 방법은 있다.
LongStream.rangeClosed() 를 사용한다.
이미 메소드를 호출하는 시점부터 메소드명 그대로 ‘닫힌 범위’ 로 호출되기 때문에, 쉽게 청크로 분류할 수가 있는 구조이다.
public long rangedSum(long n) {
return LongStream.rangeClosed(1,N)
.reduce(0L, Long::sum);
}

전통적 for문보다는 느리지만 그래도 일반 단일 스트림보다는 훨씬 빠르다…. 그럼 여기다 parallel()을 붙이면?
public long rangedSum(long n) {
return LongStream.rangeClosed(1,N)
.parallel()
.reduce(0L, Long::sum);
}

그래서 하고싶은 말은
아무리 스트림을 이용한 병렬 연산이 레거시 자바 대비 쉬워졌다고는 하지만, 병렬 스트림을 무조건 코드에 갖다 붙인다고 다 빠르게 동작하는건 아니다. 심지어, 느리게 나오기라도 하면 다행이지만… mutual exclusive하지 않은 자원을 여러개의 스레드가 경쟁(race condition)하는 경우엔 값 자체가 다르게 나오는 경우도 있다.
상황마다, 케이스마다 일일히 직접 확인을 해야 한다.(결국 조심히 써라 이 말.)
개인적인 의견. 일반적인 웹서비스 개발 영역에서 병렬 스트림을 쓸 필요가 있을까…? 거의 없다고 본다…..
이유 1.
time critical한 시스템이라면 몰라도, 일반적인 웹 서비스에서는 어차피 WAS단에서 (일반적인 tomcat을 띄워도) 어차피 여러개의 스레드를 생성하여 http request를 처리한다. 웹 서비스를 1명한테만 서비스 할 것도 아니고, 애초에 시스템상에서 멀티 프로세스/스레드로 request를 처리하고 있지 않은가? 반드시 필요한 경우가 아니라면 굳이 애써서 병렬화를 할 이유는 사실 별로 없다.
이유 2.
가성비가 나오지 않는다.
첫쨰, 상기 예제를 보면 알겠지만, 단순한 sum하는 것에서도 ‘측정’을 반드시 해야 하는 개발비용이 많이 든다.(예제를 보면 알겠지만 측정을 하지 않고는 빠르다고 장담할 수 없지 않은가) 개발기간이 쓸데없이 늘어난다는 뜻. 게다가 버그 터지면 잡기도 힘들다.
둘째, 기껏 잘 짜봤자, 성능에 지대한 영향을 끼치는 곳은 비즈니스 로직이라기보다는 DB쿼리나, server-server API call등이다. 이게 훠어어어어어얼씬 수행시간이 오래 걸린다. 애꿎은 비즈니스 로직 최적화한답시고 삽질하는것보다 차라리 이쪽을 살펴보고 개선하는게 훨씬 가성비가 잘나옴.
이유 3.
벤치마크 결과를 보면 알겠지만, 전통적 for문보다 stream이 40배나 느린 경우가 있지 않은가? 그래도 우린 for문따위는 쓰지 않고 40배나 느린 stream을 사용한다. (물론 아닌 경우or상황도 있지만)
코드의 가독성이나 직관성이 성능보다 훨씬 가치가 높게 평가되기 떄문. 실전(?)에서 이러한 트렌드로 개발메타가 변화하고 있고, 그건 곧, (서비스가 불가할 정도의)성능의 압도적인 차이가 있지 않는 이상 가독성, 유지보수성을 잡는게 장기적으로 훨씬 이익이라는 것을 우리가 경험적으로 알고 있기 때문.
그럼에도 불구하고
알고서 쓰지 ‘않는 것’과 몰라서 쓰지 ‘못하는’ 경우는 엄연히 다르니. 공부할땐 닥치고 열심히 배우자…ㅋ
병렬 스트림을 효과적으로 사용할 수 있는 가이드
- 측정.측정!측정!!!!! 자바에 대해 빠삭하게 알고 있어 짠 코드의 메커니즘을 한줄 한줄 ‘명확히 알고 있는’ 정도라면야 측정을 패스해도 되겠지만, 그정도의 완벽한 확신이 서지 않으면 직접 측정해봐라.
- primitive data type 특화 스트림을 적극적으로 사용하는것이 좋다. 박싱을 주의하라. 박싱 & 언박싱에 드는 코스트는 생각보다 크다.
- limit, findFirst처럼 요소의 순서에 의존하는 연산은 오히려 성능이 떨어지는 경우가 있다. 주의하라.
- 스트림에서 수행하는 파이프라인 연산 비용을 고려하라.
- (당연히)소량의 데이터는 병렬 스트림이 별 도움이 되지 않는다. 물론 소량?대량?에 대한 그 기준은 개발자가 스스로 생각해야 함.
- 자료구조에 따라서 적합성을 판단해라. ArrayList는 번지수만 알고 있으면 접근 가능한 Random Access이지만, LinkedList는 순차 접근만 가능한 Sequential Access 방식이다.
- 중간 연산이 스트림의 특성을 어떻게 바꾸는지에 따라 성능차이가 존재한다. 스트림 파이프라인에 filter가 중간에 들어가면, filter 결과를 예측할 수 없으므로 효과적으로 병렬처리가 가능한지 알 수 없게 되는 상황이 발생한다. 주의하라.
- 병렬로 처리한 각 스레드의 최종 결과물들을 취합하는 과정의 비용도 생각해야 한다.(Collector.combiner메소드) 머지하는 과정이 비싸면 굳이 병렬처리 할 필요가 없는 경우가 있을 수도 있다.
포크/조인 프레임워크
‘이런게 있다’ 정도로만 보고 넘어가자.
다음 코드는 포크/조인 프레임워크를 사용하는 일종의 pseudo code이다.
public class ForkJoinCalculator extends java.util.concurrent.RecursiveTask<Long> {
//변수선언 생략.
public ForkJoinSumCalculator(long[] numbers) {
this(numbers, 0, numbers.length);
}
// 생성자 생략
private long computeSequentially() {
//로직 생략(걍 sum하는거지 뭐...)
}
@Override
protected Long compute() {
if (length <= THRESHOLD) {
return computeSequentially();
}
ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(numbers, start, start+len/2);
leftTask.fork(); //왼쪽 task 비동기로 실행
ForkJoinSumCalculator rightTask = new ForkJoinSumCalculator(numbers, start+len/2, end);
Long rightResult = rightTask.compute(); //오른쪽 task는 compute() 재귀호출
Long leftResult = leftTask.join(); //재귀적으로 수행해왔었던 결과값을 읽거나, 아직 완료가 안됐으면 이 라인에서 block.
return leftResult + rightResult;
}
}
public static void main(String[] args) {
ForkJoinTask<Long> task = new ForkJoinCalculator(numbers);
return new ForkJoinPool().invoke(task);
}
킁킁… divide & conquer 냄새가 난다. ‘더한다’ 를 ‘정렬한다’ 로 바꾸면 딱 머지소트임…..
차이가 있다면 conquer 하는 해결과정이 RecursiveTask를 구현한 결과를 ForkJoinPool 을 통해 invoke시켰다는 것이다. 그래서 (내가 병렬처리 방식이 아닌 단순 recursion으로 ForkJoinCalculator 코드를 작성했지만)병렬로 로직이 수행될 것이다.
정보 : ForkJoinPool 은 Runtime.availableProcessors 의 리턴 값으로 사용할 스레드 수를 결정한다고 한다. 가상코어(하이퍼스레딩) 포함임.
포크/조인 프레임워크 사용 시 주의사항(제대로 사용하는 법)
- join을 주의하라
- 사실 이건 굳이 포크/조인 프레임워크뿐만 아니라, 7 이하 자바에서 지원해줬던 멀티스레드 인터페이스(Thread,Runnable)에서도 해당되는 말이다. join() 구문 자체가, 연산에 의존성이 있는 다른 스레드의 결과물을 기다린다는 의미인지라.. 다른 스레드에서 내가 연산할 수 있는 상태가 될 때까지 blocking상태로 대기하게 된다.
- RecursiveTask 내에서는 ForkJoinPool.invoke 메소드를 사용하지 말아야 한다고 한다. 대신 compute나 fork메소드를 직접 호출할 수 있다.
- 이유는 나도 잘 모르겠다.
- 서브태스크에 fork 메소드를 호출해서 ForkJoinPool의 일정을 조절할 수 있다.
- 디버깅이 어렵다. stackTrace가 별 도움이 되지 않기 때문
- 묻지마 병렬 스트림의 경우와 똑같이 묻지마 포크/조인 프레임워크가 ‘무조건 빠르다’ 는 보장이 없다.
효율적인 작업분할
앞의 예제에서는 덧셈을 수행할 숫자가 10000개 이하면 더 이상 task를 분할하지 않는 코드이다. 그럼 총 배열 크기가 천만개라면, 예제는 task를 천개씩이나 분할해서 로직을 수행할 것이다.
근데 요즘 하드웨어 잘 뽑혀나왔자 8코어를 넘는건 잘 없고, 코어가 1000개씩이나 되는 경우는 더더욱 없다….. 4코어 기준으로 굳이 task를 1000개씩이나 나눌 이유가 있을까…?
결론부터 말하면 있다. 이론적으로야 천만개의 배열을 4개로 나눠 250만x4개의 task로 쪼개서 로직을 수행하면 동시에 종료되겠지만, 현실은 그렇지 않다.

어차피 로직수행은 ‘모든 스레드의 작업이 끝나야’ 로직수행이 완료되었다고 (당연히)간주된다. 헌데, 작업을 단순히 코어수인 4개로 나눠놓고 로직을 수행하다가, 특정 스레드가 I/O병목이 걸렸거나, 외부요인으로 인해 예상치 못하게 작업시간이 길어졌다면…?
아무튼 그래서, 작업 자체는 잘게 쪼개는게 아무래도 좋다. 그래야 놀고 있는 스레드에 ‘특정 기법’을 이용해서
??? : 너 가서 저기 헤메고 있는 저놈 도와줘
를 시전할 수 가 있다. 이제 그 ‘특정 기법’이 뭔지 한번 살펴보도록 하자.
작업 훔치기
포크/조인 프레임워크는 작업 훔치기(work stealing) 기법을 이용해서 이 문제를 해결한다. 쪼갠 작업을 각 스레드가 로직을 수행하는데, 내 스레드가 내가 갖고 있는 작업을 다 끝내면, 다른 스레드의 작업queue에서 대기하고 있는 작업을 뻇어온다는 이 말이다.
작업 훔치기 기법을 사용하면, 다른 스레드가 미처 처리하지 못한 일까지 내가 쉬지 않고 처리하게 되어, 왠만하면 거의 동시에 모든 작업이 완료될 것이다.
위의 예제는 단순히 분할 로직 없이 숫자를 10000개 단위로 쪼개었지만, 이제 앞으로 살펴볼 것은 자동으로 스트림을 알아서 잘 분할하는 기법인 Spliterator 에 대해서 알아볼 것이다.
Spliterator!!!!
Split Iterator이라는 뜻이다. 말 그대로 ‘쪼개서 반복시킴’ 이라고 생각하면 이해가 쉽겠다.
Spliterator 인터페이스는 다음과 같은 메소드들이 정의되어 있다. 하나하나씩 보면서 알아가 보자
public interface Spliterator<T> {
boolean tryAdvance(Consumer<? super T> action);
Spliterator<T> trySplit();
long estimateSize();
int characteristics();
}
- tryAdvance
- Spliterator의 요소를 하나씩 consume하면서 탐색해야 할 요소가 남아있으면 return true (iterate)
- trySplit
- Spliterator를 쪼개서 또다른 Spliterator를 생성하는 메소드 (아래그림 참조)
- estimateSize
- 탐색해야 할 요소 수를 리턴
- 이 값을 보고 공평하게 Spliterator를 분할할 수 있는지 판단할 수 있음.
- characteristics
- Spliterator 자체의 특성 집합을 포함하는 값을 리턴

Spliterator가 각 작업들을 쪼개는 과정을 도식화 하면 다음과 같이 표현 할 수 있다.

null 나올때까지 쪼개쪼개
그럼 이제 Spliterator를 구현해 볼까?
String의 단어(어절) 수를 구하는 단순한 메소드를 통해 예제를 작성해 보자.
우선 구현에 앞서, 전통적인 for문을 통해서는 어떻게 구현하는지 한번 살펴본다.
public int countWordsIteratively(String s) {
int counter = 0;
boolean lastSpace = true;
for (char c : s.toCharArray()) {
if (Character.isWhiteSpace(c)) {
lastSpace = true;
}
else {
if (lastSpace) counter++;
lastSpace = false;
}
}
return counter;
}
public static void main (String args[]) {
final String SENTENCE = "Nel mezzo del cammin di nostra vita mi ritrovai in una selva oscura" +
"ch la dritta via era smarrita";
sysout(countWordsIteratively(SENTENCS)); // 19 words
}
뭐…간단하다…. 그냥 순차적으로 1글자 1글자씩 읽어가면서 couter를 추가해 준다.
이걸 조금 머리를 써서, 재귀적으로 호출할 수 있는 클래스를 구현하면 다음과 같이 구현할 수 있다.
구현에 앞어 오토마타로 표현해보면 다음과 같은 다이어그램이 나올 것이다.

그린 오토마타를 바탕으로 클래스를 구현한다.
class WordCounter {
private final int counter;
private final boolean lastSpace;
// 생성자 편의상 생략함
public WordCounter accumulate(Character c) {
if (Character.isWhiteSpace(c)) {
return lastSpace ? this : new WordCounter(counter, true);
}
else {
return lastSpace ? new WordCounter(counter+1, false) : this;
}
}
public WordCounter combine(WordCounter wordCounter) {
return new WordCounter(counter + wordCounter.counter, wordCounter.lastSpace);
}
}
그리고 구현한 WordCounter를 기반으로, reduce stream을 이용하여 다음과 같이 단어 수를 세는 메소드를 구현하면 다음과 같이 작성할 수 있다.
private int countWords(Stream<Character> stream) {
WordCounter wordcounter = stream.reduce(new WordCounter(0,true), //초기값
WordCounter::accumulate, //연산
WordCounter::combine); //합칠때
}
Stream<Character> stream = IntStream.range(0, SENTENCE.length())
.mapToObj(SENTENCE::charAt);
sysout(countWords(stream)); // 단일스레드
sysout(countWords(stream.parallel())); //병렬스레드
단일 스트림을 이용한 결과값은 잘 나오지만, 병렬 스트림을 이용한 결과값은 의도한대로 나오지 않을 것이다.
- 문자열을 쪼개는 단위를 어절별로 잘 쪼개야 하는데, 그렇지 않았음.
- 하나의 어절을 쪼개서 계산해버림
따라서, WordCounter + stream.reduce() 를 이용한 코드로는 병렬처리를 할 수가 없는 코드라고 볼 수 있다.
Spliterator를 구현해본다.
물론, 가능한 경우도 있겠지만… 일반적으로 stream.reduce가 가능한 단순한 재귀형 클래스를 구현한다고 해서 병렬스트림을 이용해 올바른 값을 얻기 힘들다는 것을 알게 되었다.
그리고 WordCounter와 스트림의 구조를 보면 알겠지만, WordCounter를 잘 구현한다고 해도, 임의로 ‘하나의 어절을 쪼개는’ 행위를 stream에서 해 버리기 때문에, 현재의 구조로는 병렬수행시에 오답이 나오는 경우를 해결할 수가 없다.
따라서 이번엔 Spliterator를 구현한 WordCounterSpliterator를 작성해 본다.


그리고 이 WordCounterSpliterator 를 다음과 같이 사용하면…
Spliterator<Character> spliterator = new WordCounterSpliterator(SENTENCE); Stream<Character> stream = StreamSupport.stream(spliterator, true);
병렬로(코드 자체의 병렬처리 로직은 없지만, Spliterator에 구현되어있다. 우린 ‘쪼개는 로직’ 만 잘 구현하면 알아서 잘 처리해준다.) 스트림을 이용하여 처리할 수가 있다.
마치며
- 내부 반복(recude같은거)을 이용하면 스트림을 병렬로 처리할 수 있다
- 단..확실하지 않으면 승부를 보지 말아라.
- 항상 병렬처리가 빠른 것은 아니다.
- 코드를 짰으면 반드시 측정해 봐라.
- 항상 병렬처리가 빠른 것은 아니다.
- 처리해야 할 데이터가 많은 경우, 혹은 하나의 동작에 많은 시간이 걸린 경우에 성능을 높일 수 있다.
- 기본형 특화 스트림(IntStream,etc) 등 올바른 자료구조 선택(박싱&언박싱을 피함)하는게 병렬처리보다 더 효과적일 수 있다. 병렬처리는 만능이 아니다.
- 포크/조인 프레임워크는 포크(for) 하여 작업을 분할해 수행한 후, 조인(join) 하는 프레임워크다
- Splietator를 사용하면 스트림 API를 이용하면서 좀 더 편하게 병렬처리를 할 수가 있다