(시청일 : 20171112)




<자바의 비동기 기술>
  1. Future
  2. Callback

<Spring MVC의 비동기 기술>
  1. Callable
  2. Deferred Result
  3. Response body emitter



■ Future  : 비동기 작업의 결과를 가져오는 핸들러
package toby;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@Slf4j
public class FutureEx {
    public static void main(String[] args) throws InterruptedException{
        ExecutorService es = Executors.newCachedThreadPool();
        es.execute(()-> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {}
            log.info("Async");
        });
        log.info("Exit");
    }
}


package toby;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@Slf4j
public class FutureEx {
    public static void main(String[] args) throws InterruptedException, ExecutionException{
        ExecutorService es = Executors.newCachedThreadPool();
        Future<String> f = es.submit(()-> {
            Thread.sleep(2000);
            log.info("Async");
            return "Hello";
        });
        System.out.println(f.isDone());
        Thread.sleep(2100);
        log.info("Exit");
        System.out.println(f.isDone());
        System.out.println(f.get()); // Bloking method
    }
}


package toby;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.stereotype.Component;
import java.util.concurrent.Future;
@SpringBootApplication
@Slf4j
@EnableAsync
public class Tobytv8Application {
    @Component
    public static class MyService {
        @Async
        public Future<String> hello() throws InterruptedException {
            log.debug("hello()");
            Thread.sleep(2000);
            return new AsyncResult<>( "Hello");
        }
    }
    public static void main(String[] args) {
        try (ConfigurableApplicationContext    c = SpringApplication.run(Tobytv8Application.class, args)) {
        }
    }
    @Autowired
    MyService myService;
    @Bean
    ApplicationRunner run() {
        return args -> {
            log.info("run()");
            Future<String> f = myService.hello();
            log.info("exit: " + f.isDone());
            log.info("result: " + f.get());
        };
    }
}


package toby;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.*;
@Slf4j
public class FutureEx {
    public static void main(String[] args) throws InterruptedException, ExecutionException{
        ExecutorService es = Executors.newCachedThreadPool();
        FutureTask<String> f = new FutureTask<String>(()->{
            Thread.sleep(2000);
            log.info("Async");
            return "Hello";
        }) {
            @Override
            protected void done() {
                try {
                    System.out.println(get());
                }catch(InterruptedException e) {
                    e.printStackTrace();
                }catch(ExecutionException e) {
                    e.printStackTrace();
                }
            }
        };
        es.execute(f);
        es.shutdown();
    }
}



■ Callback

package toby;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.concurrent.Callable;
@SpringBootApplication
@Slf4j
@EnableAsync
public class Tobytv8Application {
    @RestController
    public static class MyController {
        @GetMapping("/callable")
        public Callable<String> async() throws InterruptedException {
            log.info("callable");
            return ()-> {
                log.info("async");
                Thread.sleep(2000);
                return "hello";
            };
        }
    }
    public static void main(String[] args) {
        SpringApplication.run(Tobytv8Application.class, args);
    }
}



package toby;
import lombok.extern.slf4j.Slf4j;
import java.util.Objects;
import java.util.concurrent.*;
@Slf4j
public class FutureEx {
    interface SuccessCallback {
        void onSuccess(String result);
    }
    public static class CallbackFutureTask extends FutureTask<String> {
        SuccessCallback sc;
        public CallbackFutureTask(Callable<String> callable, SuccessCallback sc) {
            super(callable);
            this.sc = Objects.requireNonNull(sc);
        }
        @Override
        protected void done() {
            try {
                sc.onSuccess(get());
            }catch(InterruptedException e){
                e.printStackTrace();
            }catch(ExecutionException e){
                e.printStackTrace();
            }
        }
    }
    public static void main(String[] args) throws InterruptedException, ExecutionException{
        ExecutorService es = Executors.newCachedThreadPool();
        CallbackFutureTask f = new CallbackFutureTask(() -> {
            Thread.sleep(2000);
            log.info("Async");
            return "Hello";
        }, System.out::println);
        es.execute(f);
        es.shutdown();
    }
}




package toby;
import lombok.extern.slf4j.Slf4j;
import java.util.Objects;
import java.util.concurrent.*;
@Slf4j
public class FutureEx {
    interface SuccessCallback {
        void onSuccess(String result);
    }
    interface ExceptionCallback {
        void onError(Throwable t);
    }
    public static class CallbackFutureTask extends FutureTask<String> {
        SuccessCallback sc;
        ExceptionCallback ec;
        public CallbackFutureTask(Callable<String> callable, SuccessCallback sc, ExceptionCallback ec) {
            super(callable);
            this.sc = Objects.requireNonNull(sc);
            this.ec = Objects.requireNonNull(ec);
        }
        @Override
        protected void done() {
            try {
                sc.onSuccess(get());
            }catch(InterruptedException e){
                Thread.currentThread().interrupt();
            }catch(ExecutionException e){
                ec.onError(e.getCause());
            }
        }
    }
    public static void main(String[] args) throws InterruptedException, ExecutionException{
        ExecutorService es = Executors.newCachedThreadPool();
        CallbackFutureTask f = new CallbackFutureTask(() -> {
            Thread.sleep(2000);
            if(1==1) throw new RuntimeException("Async ERROR!!!");
            log.info("Async");
            return "Hello";
        },
                s-> System.out.println("Result: " + s),
                e-> System.out.println("Error: " + e.getMessage()));
        es.execute(f);
        es.shutdown();
    }
}




■ ListernableFuture
package toby;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
@SpringBootApplication
@Slf4j
@EnableAsync
public class Tobytv8Application {
    @Component
    public static class MyService {
        @Async("tp")
        public ListenableFuture<String> hello() throws InterruptedException {
            log.debug("hello()");
            Thread.sleep(2000);
            return new AsyncResult<>( "Hello");
        }
    }
    @Bean // @Async 대신 사용할 수 있는 방법
    ThreadPoolTaskExecutor tp() {
        ThreadPoolTaskExecutor te = new ThreadPoolTaskExecutor();
        te.setCorePoolSize(10);
        te.setMaxPoolSize(100);
        te.setQueueCapacity(200);
        te.setThreadNamePrefix("mythread");
        te.initialize();
        return te;
    }
    public static void main(String[] args) {
        try (ConfigurableApplicationContext    c = SpringApplication.run(Tobytv8Application.class, args)) {
        }
    }
    @Autowired
    MyService myService;
    @Bean
    ApplicationRunner run() {
        return args -> {
            log.info("run()");
            ListenableFuture<String> f = myService.hello();
            f.addCallback(s-> System.out.println(s), e-> System.out.println(e.getMessage()));
            log.info("exit");
        };
    }
}



@Async // 계속 쓰레드를 생성하므로 실전에 절대 사용 하면 안됨
@Async("tp") // 비동기 작업이 여러개 이고, 쓰레드 풀을 분리해서 적용하는 방법


■ CompletableFuture
(생략)




- Servlet은 기본적으로 Blocking I/0 방식이다.





■ Callable 관련 Load Test

- LoadTest.java
package toby;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StopWatch;
import org.springframework.web.client.RestTemplate;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
public class LoadTest {
    static AtomicInteger counter = new AtomicInteger(0);
    public static void main(String[] args) throws InterruptedException{
        ExecutorService es = Executors.newFixedThreadPool(100);
        RestTemplate rt = new RestTemplate();
        String url = "http://localhost:8080/callable";;
        StopWatch main = new StopWatch();
        main.start();
        for(int i=0; i<100; i++){
            es.execute(()-> {
                int idx = counter.addAndGet(1);
                log.info("Thread: {}", idx);
                StopWatch sw = new StopWatch();
                sw.start();
                rt.getForObject(url, String.class);
                sw.stop();
                log.info("Elapsed: {} {}", idx ,sw.getTotalTimeSeconds());
            });
        }
        es.shutdown();
        es.awaitTermination(100, TimeUnit.SECONDS);
        main.stop();
        log.info("Total: {}", main.getTotalTimeSeconds());
    }
}


- Tobytv8Applicaiton.java
package toby;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.concurrent.Callable;
@SpringBootApplication
@Slf4j
@EnableAsync
public class Tobytv8Application {
    @RestController
    public static class MyController {
        @GetMapping("/callable")
        public Callable<String> callable() throws InterruptedException {
            log.info("callable");
            return ()-> {
                log.info("async");
                Thread.sleep(2000);
                return "hello";
            };
        }
//        public String callable() throws InterruptedException {
//            log.info("async");
//            Thread.sleep(2000);
//            return "hello";
//        }
    }
    public static void main(String[] args) {
        SpringApplication.run(Tobytv8Application.class, args);
    }
}



- application.properties
server.tomcat.max-threads=1


■ DeferredResult : 어떤 요청에 의해서, 기존의 지연되어있는 HTTP응답을 나중에 써줄 수 있게 하는 기술

package toby;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.async.DeferredResult;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedDeque;
@SpringBootApplication
@Slf4j
@EnableAsync
public class Tobytv8Application {
    @RestController
    public static class MyController {
        Queue<DeferredResult<String>> results = new ConcurrentLinkedDeque<>();
        @GetMapping("/dr")
        public DeferredResult<String> callable() throws InterruptedException {
            log.info("dr");
            DeferredResult<String> dr = new DeferredResult<>(600000L);
            results.add(dr);
            return dr;
        }
        @GetMapping("/dr/count")
        public String drcount() {
            return String.valueOf(results.size());
        }
        @GetMapping("/dr/event")
        public String drevent(String msg) {
            for(DeferredResult<String> dr : results) {
                dr.setResult("Hello " + msg);
                results.remove(dr);
            }
            return "OK";
        }
    }
    public static void main(String[] args) {
        SpringApplication.run(Tobytv8Application.class, args);
    }
}




■ DeferredResult 관련 Load Test
- LoadTest.java
package toby;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StopWatch;
import org.springframework.web.client.RestTemplate;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
public class LoadTest {
    static AtomicInteger counter = new AtomicInteger(0);
    public static void main(String[] args) throws InterruptedException{
        ExecutorService es = Executors.newFixedThreadPool(100);
        RestTemplate rt = new RestTemplate();
        String url = "http://localhost:8080/dr";;
        StopWatch main = new StopWatch();
        main.start();
        for(int i=0; i<100; i++){
            es.execute(()-> {
                int idx = counter.addAndGet(1);
                log.info("Thread: {}", idx);
                StopWatch sw = new StopWatch();
                sw.start();
                rt.getForObject(url, String.class);
                sw.stop();
                log.info("Elapsed: {} {}", idx ,sw.getTotalTimeSeconds());
            });
        }
        es.shutdown();
        es.awaitTermination(100, TimeUnit.SECONDS);
        main.stop();
        log.info("Total: {}", main.getTotalTimeSeconds());
    }
}




■ emitter
package toby;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.async.DeferredResult;
import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter;
import java.util.concurrent.Executors;
@SpringBootApplication
@Slf4j
@EnableAsync
public class Tobytv8Application {
    @RestController
    public static class MyController {
        @GetMapping("/emitter")
        public ResponseBodyEmitter emitter() throws InterruptedException {
            ResponseBodyEmitter emitter = new ResponseBodyEmitter();
            Executors.newSingleThreadExecutor().submit(() -> {
                for (int i = 1; i <= 50; i++) {
                    try {
                        emitter.send("<p>Stream " + i + "</p>");
                        Thread.sleep(100);
                    } catch (Exception e) {
                    }
                }
            });
            return emitter;
        }
    }
    public static void main(String[] args) {
        SpringApplication.run(Tobytv8Application.class, args);
    }
}



+ Recent posts