(시청일 : 20180113)




(중략)

그 동안 java 에서 제공하던 Future interface 가 제공한 Async 이지만 get() 메서드로 Blocking 을 유발하는 단점이 있었다.
이런 불편함은 Spring Framework에서 제공하는 LinstenableFuture 로 해소 할 수 있었다. (Spring Framework를 사용하는 개발자라면 RestTemplated 의 Async 버젼인 AsyncRestTemplate에 대해 관심을 가져 보기 바란다.)
하지만 ListenableFuture를 써 본 개발자라면 한번쯤은 Callback 작성의 복잡함에 고민해 왔을 것이다. 심지어 Callback Hell 이라는 말이 커뮤니티에는 많이 돌고 있기도 하다.
그 불편함을 알았던지 Java 8 에서 드디어 CompletableFuture 를 제공한다.
CompletableFuture는 아래와 같은 패턴으로 개발이 가능하여 비동기 서비스 코드의 가독성이 좋아졌다.
(중략)


- Future : 비동기 작업의 결과를 담고있는 오브젝트
- ListenableFuture : 콜백 구조로 결과가 완료되는 시점에 Hooking을 걸어서 결과를 가져오는 기법. 비동기 메소드를 호출하면서 콜백 메소드를 지정할 수 있도록 되어 있다.
- CompletableFuture : CompletableFuture 리스트의 모든값이 완료될 때까지 기다릴지 아니면 하나의 값만 완료되길 기다릴지 선택할 수 있다는 것이 장점이다. CompletableFuture 클래스는 람다 표현식과 파이프라이닝을 활용하면 구조적으로 예쁘게 만들 수 있다. 병렬성과(Parallelism)과 동시성(Concurrency)에서 CompletableFuture가 중요한데, 여러개의 cpu core 사이에 지연 실행이나 예외를 callable하게 처리할 수 있어서 명시적인 처리가 가능해진다.


■ 예제 1
package toby.live;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
@Slf4j
public class CFuture {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        /*
        CompletableFuture<Integer> f = new CompletableFuture<>();
        //f.complete(2);
        f.completeExceptionally(new RuntimeException());
        System.out.println(f.get());
        */
        CompletableFuture
                .runAsync(() -> log.info("runAsync"))
                .thenRun(() -> log.info("thenRun"))
                .thenRun(() -> log.info("thenRun"));
        log.info("exit");
        ForkJoinPool.commonPool().shutdown();
        ForkJoinPool.commonPool().awaitTermination(10, TimeUnit.SECONDS);
    }
}

<결과>
18:13:38.852 [ForkJoinPool.commonPool-worker-9] INFO toby.live.CFuture - runAsync
18:13:38.852 [main] INFO toby.live.CFuture - exit
18:13:38.859 [ForkJoinPool.commonPool-worker-9] INFO toby.live.CFuture - thenRun
18:13:38.860 [ForkJoinPool.commonPool-worker-9] INFO toby.live.CFuture - thenRun
Process finished with exit code 0


■ 예제 2
package toby.live;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
@Slf4j
public class CFuture {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        /*
        CompletableFuture<Integer> f = new CompletableFuture<>();
        //f.complete(2);
        f.completeExceptionally(new RuntimeException());
        System.out.println(f.get());
        */
        CompletableFuture
                .supplyAsync(() -> {
                    log.info("runAsync");
                    return 1;
                })
                .thenApply(s -> {
                    log.info("thenApply {}",s);
                    return s + 1;
                })
                .thenApply(s2 -> {
                    log.info("thenApply {}",s2);
                    return s2 * 3;
                })
                .thenAccept(s3 -> log.info("thenAccept {}",s3));
        log.info("exit");
        ForkJoinPool.commonPool().shutdown();
        ForkJoinPool.commonPool().awaitTermination(10, TimeUnit.SECONDS);
    }
}


<결과>
18:20:54.956 [main] INFO toby.live.CFuture - exit
18:20:54.956 [ForkJoinPool.commonPool-worker-9] INFO toby.live.CFuture - runAsync
18:20:54.962 [ForkJoinPool.commonPool-worker-9] INFO toby.live.CFuture - thenApply 1
18:20:54.966 [ForkJoinPool.commonPool-worker-9] INFO toby.live.CFuture - thenApply 2
18:20:54.966 [ForkJoinPool.commonPool-worker-9] INFO toby.live.CFuture - thenAccept 6
Process finished with exit code 0



■ 예제 3
package toby.live;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
@Slf4j
public class CFuture {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        /*
        CompletableFuture<Integer> f = new CompletableFuture<>();
        //f.complete(2);
        f.completeExceptionally(new RuntimeException());
        System.out.println(f.get());
        */
        CompletableFuture
                .supplyAsync(() -> {
                    log.info("runAsync");
                    return 1;
                })
                .thenCompose(s -> {
                    log.info("thenApply {}",s);
                    return CompletableFuture.completedFuture(s + 1);
                })
                .thenApply(s2 -> {
                    log.info("thenApply {}",s2);
                    return s2 * 3;
                })
                .thenAccept(s3 -> log.info("thenAccept {}",s3));
        log.info("exit");
        ForkJoinPool.commonPool().shutdown();
        ForkJoinPool.commonPool().awaitTermination(10, TimeUnit.SECONDS);
    }
}


<결과>
18:25:36.218 [main] INFO toby.live.CFuture - exit
18:25:36.218 [ForkJoinPool.commonPool-worker-9] INFO toby.live.CFuture - runAsync
18:25:36.226 [ForkJoinPool.commonPool-worker-9] INFO toby.live.CFuture - thenCompose 1
18:25:36.230 [ForkJoinPool.commonPool-worker-9] INFO toby.live.CFuture - thenApply 2
18:25:36.230 [ForkJoinPool.commonPool-worker-9] INFO toby.live.CFuture - thenAccept 6
Process finished with exit code 0



■ 예제 4
package toby.live;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
@Slf4j
public class CFuture {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        /*
        CompletableFuture<Integer> f = new CompletableFuture<>();
        //f.complete(2);
        f.completeExceptionally(new RuntimeException());
        System.out.println(f.get());
        */
        CompletableFuture
                .supplyAsync(() -> {
                    log.info("runAsync");
                    if(1==1) throw new RuntimeException();  // 강제 오류 발생!!
                    return 1;
                })
                .thenCompose(s -> {
                    log.info("thenCompose {}",s);
                    return CompletableFuture.completedFuture(s + 1);
                })
                .thenApply(s2 -> {
                    log.info("thenApply {}",s2);
                    return s2 * 3;
                })
                .exceptionally(e -> -10)
                .thenAccept(s3 -> log.info("thenAccept {}",s3));
        log.info("exit");
        ForkJoinPool.commonPool().shutdown();
        ForkJoinPool.commonPool().awaitTermination(10, TimeUnit.SECONDS);
    }
}



<결과>
18:27:08.541 [main] INFO toby.live.CFuture - exit
18:27:08.541 [ForkJoinPool.commonPool-worker-9] INFO toby.live.CFuture - runAsync
18:27:08.546 [ForkJoinPool.commonPool-worker-9] INFO toby.live.CFuture - thenAccept -10
Process finished with exit code 0



■ 예제 5
package toby.live;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.*;
@Slf4j
public class CFuture {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        /*
        CompletableFuture<Integer> f = new CompletableFuture<>();
        //f.complete(2);
        f.completeExceptionally(new RuntimeException());
        System.out.println(f.get());
        */
        ExecutorService es = Executors.newFixedThreadPool(10);
        CompletableFuture
                .supplyAsync(() -> {
                    log.info("runAsync");
                    return 1;
                })
                .thenCompose(s -> {
                    log.info("thenCompose {}",s);
                    return CompletableFuture.completedFuture(s + 1);
                })
                .thenApplyAsync(s2 -> {
                    log.info("thenApplyAsync {}",s2);
                    return s2 * 3;
                }, es)
                .exceptionally(e -> -10)
                .thenAcceptAsync(s3 -> log.info("thenAcceptAsync {}",s3), es);
        log.info("exit");
        ForkJoinPool.commonPool().shutdown();
        ForkJoinPool.commonPool().awaitTermination(10, TimeUnit.SECONDS);
    }
}


<결과>
18:32:24.425 [main] INFO toby.live.CFuture - exit
18:32:24.425 [ForkJoinPool.commonPool-worker-9] INFO toby.live.CFuture - runAsync
18:32:24.431 [ForkJoinPool.commonPool-worker-9] INFO toby.live.CFuture - thenCompose 1
18:32:24.441 [pool-1-thread-1] INFO toby.live.CFuture - thenApplyAsync 2
18:32:24.447 [pool-1-thread-2] INFO toby.live.CFuture - thenAcceptAsync 6





■ LoadTest.java
package toby.live;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StopWatch;
import org.springframework.web.client.RestTemplate;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
public class LoadTest {
    static AtomicInteger counter = new AtomicInteger(0);
    public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
        ExecutorService es = Executors.newFixedThreadPool(100);
        RestTemplate rt = new RestTemplate();
        String url = "http://localhost:8080/rest?idx={idx}";;
        CyclicBarrier barrier = new CyclicBarrier(101);
        StopWatch main = new StopWatch();
        main.start();
        for (int i=0; i<100; i++){
            es.submit(()->{
                int idx = counter.addAndGet(1);
                barrier.await();
                log.info("Thread {}", idx);
                StopWatch sw = new StopWatch();
                sw.start();
                String res = rt.getForObject(url,String.class, idx);
                sw.stop();
                log.info("Elapsed: {} {} / {}", idx, sw.getTotalTimeSeconds(), res);
                return null;
            });
        }
        barrier.await();
        es.shutdown();
        es.awaitTermination(100, TimeUnit.SECONDS);
        main.stop();
        log.info("Total: {}", main.getTotalTimeSeconds());
    }
}


■ Tobytv011Application.java
package toby.live;
import io.netty.channel.nio.NioEventLoopGroup;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.http.ResponseEntity;
import org.springframework.http.client.Netty4ClientHttpRequestFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.AsyncRestTemplate;
import org.springframework.web.context.request.async.DeferredResult;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Function;
@SpringBootApplication
@RestController
@EnableAsync
public class Tobytv011Application {
    AsyncRestTemplate rt = new AsyncRestTemplate(new Netty4ClientHttpRequestFactory(new NioEventLoopGroup(1)));
    @Autowired
    MyService myService;
    static final String URL1 = "http://localhost:8081/service?req={req}";;
    static final String URL2 = "http://localhost:8081/service2?req={req}";;
    @GetMapping("/rest")
    public DeferredResult<String> rest(int idx) {
        DeferredResult<String> dr = new DeferredResult<>();
        toCF(rt.getForEntity(URL1, String.class, "h" + idx))
        .thenCompose(s -> {
            //if (1==1) throw new RuntimeException("ERROR");
            return toCF(rt.getForEntity(URL2, String.class, s.getBody()));
        })
        .thenApplyAsync(s2 -> myService.work(s2.getBody()))
        .thenAccept(s3 -> dr.setResult(s3))
        .exceptionally(e -> {dr.setErrorResult(e.getMessage()); return (Void)null;});
        return dr;
    }
    <T> CompletableFuture<T> toCF(ListenableFuture<T> lf) {
        CompletableFuture<T> cf = new CompletableFuture<T>();
        lf.addCallback(s -> cf.complete(s), e -> cf.completeExceptionally(e));
        return cf;
    }
    public static class AcceptCompletion<S> extends Completion<S,Void> {
        Consumer<S> con;
        public AcceptCompletion(Consumer<S> con) {
            this.con = con;
        }
        @Override
        void run(S value) {
            con.accept(value);
        }
    }
    public static class ErrorCompletion<T> extends Completion<T,T> {
        Consumer<Throwable> econ;
        public ErrorCompletion(Consumer<Throwable> econ) {
            this.econ = econ;
        }
        @Override
        void run(T value) {
            if (next != null) next.run(value);
        }
    }
    public static class ApplyCompletion<S,T> extends Completion<S,T> {
        Function<S, ListenableFuture<T>> fn;
        public ApplyCompletion(Function<S, ListenableFuture<T>> fn) {
            this.fn = fn;
        }
        @Override
        void run(S value) {
            ListenableFuture<T> lf = fn.apply(value);
            lf.addCallback(s->complete(s), e->error(e));
        }
    }
    public static class Completion<S,T>{
        Completion next;
        public void andAccept(Consumer<T> con){
            Completion<T, Void> c = new AcceptCompletion(con);
            this.next = c;
        }
        public Completion<T,T> andError(Consumer<Throwable> econ){
            Completion<T,T> c = new ErrorCompletion<>(econ);
            this.next = c;
            return c;
        }
        public <V> Completion<T,V> andApply(Function<T, ListenableFuture<V>> fn){
            Completion<T,V> c = new ApplyCompletion<>(fn);
            this.next = c;
            return c;
        }
        public static <S,T> Completion<S,T> from(ListenableFuture<T> lf) {
            Completion<S,T> c = new Completion<>();
            lf.addCallback(s->{
                c.complete(s);
            }, e->{
                c.error(e);
            });
            return c;
        }
        void error(Throwable e) {
            if (next != null) next.error(e);
        }
        void complete(T s) {
            if (next != null) next.run(s);
        }
        void run(S value) {
        }
    }
    @Service
    public static class MyService {
        public String work(String req) {
            return req + "/asyncwork";
        }
    }
    @Bean
    public ThreadPoolTaskExecutor myThreadPool() {
        ThreadPoolTaskExecutor te = new ThreadPoolTaskExecutor();
        te.setCorePoolSize(1);
        te.setMaxPoolSize(1);
        return te;
    }
    public static void main(String[] args) {
        SpringApplication.run(Tobytv011Application.class, args);
    }
}



■ ReomoteService.java
package toby.live;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@SpringBootApplication
public class RemoteService {
@RestController
public static class MyController {
@GetMapping("/service")
public String service(String req) throws InterruptedException {
Thread.sleep(1000);
//throw new RuntimeException();
return req + "/service1"; // html/text
}

@GetMapping("/service2")
public String service2(String req) throws InterruptedException {
Thread.sleep(1000);
return req + "/service2"; // html/text
}
}


public static void main(String[] args) {
System.setProperty("SERVER.PORT","8081");
System.setProperty("server.tomcat.max-threads","1000");

SpringApplication.run(RemoteService.class, args);
}


}

■ application.properties
server.tomcat.max-threads=1



+ Recent posts