토비의 봄 TV 8회 스프링 리액티브 프로그래밍 (4) 자바와 스프링의 비동기 기술IT/Spring Framework2018. 2. 3. 23:35
Table of Contents
(시청일 : 20171112)
<자바의 비동기 기술>
- Future
- Callback
<Spring MVC의 비동기 기술>
- Callable
- Deferred Result
- Response body emitter
■ Future : 비동기 작업의 결과를 가져오는 핸들러
package toby;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@Slf4j
public class FutureEx {
public static void main(String[] args) throws InterruptedException{
ExecutorService es = Executors.newCachedThreadPool();
es.execute(()-> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {}
log.info("Async");
});
log.info("Exit");
}
}
package toby;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@Slf4j
public class FutureEx {
public static void main(String[] args) throws InterruptedException, ExecutionException{
ExecutorService es = Executors.newCachedThreadPool();
Future<String> f = es.submit(()-> {
Thread.sleep(2000);
log.info("Async");
return "Hello";
});
System.out.println(f.isDone());
Thread.sleep(2100);
log.info("Exit");
System.out.println(f.isDone());
System.out.println(f.get()); // Bloking method
}
}
package toby;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.stereotype.Component;
import java.util.concurrent.Future;
@SpringBootApplication
@Slf4j
@EnableAsync
public class Tobytv8Application {
@Component
public static class MyService {
@Async
public Future<String> hello() throws InterruptedException {
log.debug("hello()");
Thread.sleep(2000);
return new AsyncResult<>( "Hello");
}
}
public static void main(String[] args) {
try (ConfigurableApplicationContext c = SpringApplication.run(Tobytv8Application.class, args)) {
}
}
@Autowired
MyService myService;
@Bean
ApplicationRunner run() {
return args -> {
log.info("run()");
Future<String> f = myService.hello();
log.info("exit: " + f.isDone());
log.info("result: " + f.get());
};
}
}
package toby;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.*;
@Slf4j
public class FutureEx {
public static void main(String[] args) throws InterruptedException, ExecutionException{
ExecutorService es = Executors.newCachedThreadPool();
FutureTask<String> f = new FutureTask<String>(()->{
Thread.sleep(2000);
log.info("Async");
return "Hello";
}) {
@Override
protected void done() {
try {
System.out.println(get());
}catch(InterruptedException e) {
e.printStackTrace();
}catch(ExecutionException e) {
e.printStackTrace();
}
}
};
es.execute(f);
es.shutdown();
}
}
■ Callback
package toby;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.concurrent.Callable;
@SpringBootApplication
@Slf4j
@EnableAsync
public class Tobytv8Application {
@RestController
public static class MyController {
@GetMapping("/callable")
public Callable<String> async() throws InterruptedException {
log.info("callable");
return ()-> {
log.info("async");
Thread.sleep(2000);
return "hello";
};
}
}
public static void main(String[] args) {
SpringApplication.run(Tobytv8Application.class, args);
}
}
package toby;
import lombok.extern.slf4j.Slf4j;
import java.util.Objects;
import java.util.concurrent.*;
@Slf4j
public class FutureEx {
interface SuccessCallback {
void onSuccess(String result);
}
public static class CallbackFutureTask extends FutureTask<String> {
SuccessCallback sc;
public CallbackFutureTask(Callable<String> callable, SuccessCallback sc) {
super(callable);
this.sc = Objects.requireNonNull(sc);
}
@Override
protected void done() {
try {
sc.onSuccess(get());
}catch(InterruptedException e){
e.printStackTrace();
}catch(ExecutionException e){
e.printStackTrace();
}
}
}
public static void main(String[] args) throws InterruptedException, ExecutionException{
ExecutorService es = Executors.newCachedThreadPool();
CallbackFutureTask f = new CallbackFutureTask(() -> {
Thread.sleep(2000);
log.info("Async");
return "Hello";
}, System.out::println);
es.execute(f);
es.shutdown();
}
}
package toby;
import lombok.extern.slf4j.Slf4j;
import java.util.Objects;
import java.util.concurrent.*;
@Slf4j
public class FutureEx {
interface SuccessCallback {
void onSuccess(String result);
}
interface ExceptionCallback {
void onError(Throwable t);
}
public static class CallbackFutureTask extends FutureTask<String> {
SuccessCallback sc;
ExceptionCallback ec;
public CallbackFutureTask(Callable<String> callable, SuccessCallback sc, ExceptionCallback ec) {
super(callable);
this.sc = Objects.requireNonNull(sc);
this.ec = Objects.requireNonNull(ec);
}
@Override
protected void done() {
try {
sc.onSuccess(get());
}catch(InterruptedException e){
Thread.currentThread().interrupt();
}catch(ExecutionException e){
ec.onError(e.getCause());
}
}
}
public static void main(String[] args) throws InterruptedException, ExecutionException{
ExecutorService es = Executors.newCachedThreadPool();
CallbackFutureTask f = new CallbackFutureTask(() -> {
Thread.sleep(2000);
if(1==1) throw new RuntimeException("Async ERROR!!!");
log.info("Async");
return "Hello";
},
s-> System.out.println("Result: " + s),
e-> System.out.println("Error: " + e.getMessage()));
es.execute(f);
es.shutdown();
}
}
■ ListernableFuture
package toby;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
@SpringBootApplication
@Slf4j
@EnableAsync
public class Tobytv8Application {
@Component
public static class MyService {
@Async("tp")
public ListenableFuture<String> hello() throws InterruptedException {
log.debug("hello()");
Thread.sleep(2000);
return new AsyncResult<>( "Hello");
}
}
@Bean // @Async 대신 사용할 수 있는 방법
ThreadPoolTaskExecutor tp() {
ThreadPoolTaskExecutor te = new ThreadPoolTaskExecutor();
te.setCorePoolSize(10);
te.setMaxPoolSize(100);
te.setQueueCapacity(200);
te.setThreadNamePrefix("mythread");
te.initialize();
return te;
}
public static void main(String[] args) {
try (ConfigurableApplicationContext c = SpringApplication.run(Tobytv8Application.class, args)) {
}
}
@Autowired
MyService myService;
@Bean
ApplicationRunner run() {
return args -> {
log.info("run()");
ListenableFuture<String> f = myService.hello();
f.addCallback(s-> System.out.println(s), e-> System.out.println(e.getMessage()));
log.info("exit");
};
}
}
@Async // 계속 쓰레드를 생성하므로 실전에 절대 사용 하면 안됨
@Async("tp") // 비동기 작업이 여러개 이고, 쓰레드 풀을 분리해서 적용하는 방법
■ CompletableFuture
(생략)
- Servlet은 기본적으로 Blocking I/0 방식이다.
■ Callable 관련 Load Test
- LoadTest.java
package toby;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StopWatch;
import org.springframework.web.client.RestTemplate;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
public class LoadTest {
static AtomicInteger counter = new AtomicInteger(0);
public static void main(String[] args) throws InterruptedException{
ExecutorService es = Executors.newFixedThreadPool(100);
RestTemplate rt = new RestTemplate();
StopWatch main = new StopWatch();
main.start();
for(int i=0; i<100; i++){
es.execute(()-> {
int idx = counter.addAndGet(1);
log.info("Thread: {}", idx);
StopWatch sw = new StopWatch();
sw.start();
rt.getForObject(url, String.class);
sw.stop();
log.info("Elapsed: {} {}", idx ,sw.getTotalTimeSeconds());
});
}
es.shutdown();
es.awaitTermination(100, TimeUnit.SECONDS);
main.stop();
log.info("Total: {}", main.getTotalTimeSeconds());
}
}
- Tobytv8Applicaiton.java
package toby;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.concurrent.Callable;
@SpringBootApplication
@Slf4j
@EnableAsync
public class Tobytv8Application {
@RestController
public static class MyController {
@GetMapping("/callable")
public Callable<String> callable() throws InterruptedException {
log.info("callable");
return ()-> {
log.info("async");
Thread.sleep(2000);
return "hello";
};
}
// public String callable() throws InterruptedException {
// log.info("async");
// Thread.sleep(2000);
// return "hello";
// }
}
public static void main(String[] args) {
SpringApplication.run(Tobytv8Application.class, args);
}
}
- application.properties
server.tomcat.max-threads=1
■ DeferredResult : 어떤 요청에 의해서, 기존의 지연되어있는 HTTP응답을 나중에 써줄 수 있게 하는 기술
package toby;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.async.DeferredResult;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedDeque;
@SpringBootApplication
@Slf4j
@EnableAsync
public class Tobytv8Application {
@RestController
public static class MyController {
Queue<DeferredResult<String>> results = new ConcurrentLinkedDeque<>();
@GetMapping("/dr")
public DeferredResult<String> callable() throws InterruptedException {
log.info("dr");
DeferredResult<String> dr = new DeferredResult<>(600000L);
results.add(dr);
return dr;
}
@GetMapping("/dr/count")
public String drcount() {
return String.valueOf(results.size());
}
@GetMapping("/dr/event")
public String drevent(String msg) {
for(DeferredResult<String> dr : results) {
dr.setResult("Hello " + msg);
results.remove(dr);
}
return "OK";
}
}
public static void main(String[] args) {
SpringApplication.run(Tobytv8Application.class, args);
}
}
■ DeferredResult 관련 Load Test
- LoadTest.java
package toby;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StopWatch;
import org.springframework.web.client.RestTemplate;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
public class LoadTest {
static AtomicInteger counter = new AtomicInteger(0);
public static void main(String[] args) throws InterruptedException{
ExecutorService es = Executors.newFixedThreadPool(100);
RestTemplate rt = new RestTemplate();
StopWatch main = new StopWatch();
main.start();
for(int i=0; i<100; i++){
es.execute(()-> {
int idx = counter.addAndGet(1);
log.info("Thread: {}", idx);
StopWatch sw = new StopWatch();
sw.start();
rt.getForObject(url, String.class);
sw.stop();
log.info("Elapsed: {} {}", idx ,sw.getTotalTimeSeconds());
});
}
es.shutdown();
es.awaitTermination(100, TimeUnit.SECONDS);
main.stop();
log.info("Total: {}", main.getTotalTimeSeconds());
}
}
■ emitter
package toby;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.async.DeferredResult;
import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter;
import java.util.concurrent.Executors;
@SpringBootApplication
@Slf4j
@EnableAsync
public class Tobytv8Application {
@RestController
public static class MyController {
@GetMapping("/emitter")
public ResponseBodyEmitter emitter() throws InterruptedException {
ResponseBodyEmitter emitter = new ResponseBodyEmitter();
Executors.newSingleThreadExecutor().submit(() -> {
for (int i = 1; i <= 50; i++) {
try {
emitter.send("<p>Stream " + i + "</p>");
Thread.sleep(100);
} catch (Exception e) {
}
}
});
return emitter;
}
}
public static void main(String[] args) {
SpringApplication.run(Tobytv8Application.class, args);
}
}
'IT > Spring Framework' 카테고리의 다른 글
토비의 봄 TV 2회 - 수퍼 타입 토큰 (0) | 2018.02.03 |
---|---|
토비의 봄 TV 1회 - 재사용성과 다이나믹 디스패치, 더블 디스패치 (0) | 2018.02.03 |
토비의 봄 TV 7회 스프링 리액티브 프로그래밍 (3) - Reactive Streams - Schedulers (0) | 2018.02.03 |
토비의 봄 TV 6회 스프링 리액티브 프로그래밍 (2) - Reactive Streams - Operators (0) | 2018.02.03 |
토비의 봄 TV 5회 스프링 리액티브 프로그래밍 (1) - Reactive Streams (0) | 2018.02.03 |
@DEAN :: Dean Story
포스팅이 좋았다면 "좋아요❤️" 또는 "구독👍🏻" 해주세요!