토비의 봄 TV 10회 스프링 리액티브 프로그래밍 (6) AsyncRestTemplate의 콜백 헬과 중복 작업 문제IT/Spring Framework2018. 2. 3. 23:49
Table of Contents
(시청일 : 20171126)
■ LoadTest.java
package toby.live;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StopWatch;
import org.springframework.web.client.RestTemplate;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
public class LoadTest {
static AtomicInteger counter = new AtomicInteger(0);
public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
ExecutorService es = Executors.newFixedThreadPool(100);
RestTemplate rt = new RestTemplate();
CyclicBarrier barrier = new CyclicBarrier(101);
StopWatch main = new StopWatch();
main.start();
for (int i=0; i<100; i++){
es.submit(()->{
int idx = counter.addAndGet(1);
barrier.await();
log.info("Thread {}", idx);
StopWatch sw = new StopWatch();
sw.start();
String res = rt.getForObject(url,String.class, idx);
sw.stop();
log.info("Elapsed: {} {} / {}", idx, sw.getTotalTimeSeconds(), res);
return null;
});
}
barrier.await();
es.shutdown();
es.awaitTermination(100, TimeUnit.SECONDS);
main.stop();
log.info("Total: {}", main.getTotalTimeSeconds());
}
}
■ Tobytv010Application.java
package toby.live;
import io.netty.channel.nio.NioEventLoopGroup;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.http.ResponseEntity;
import org.springframework.http.client.Netty4ClientHttpRequestFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.AsyncRestTemplate;
import org.springframework.web.context.request.async.DeferredResult;
import java.util.function.Consumer;
import java.util.function.Function;
@SpringBootApplication
@RestController
@EnableAsync
public class Tobytv010Application {
AsyncRestTemplate rt = new AsyncRestTemplate(new Netty4ClientHttpRequestFactory(new NioEventLoopGroup(1)));
@Autowired
MyService myService;
@GetMapping("/rest")
public DeferredResult<String> rest(int idx) {
DeferredResult<String> dr = new DeferredResult<>();
// ListenableFuture<ResponseEntity<String>> f1 = rt.getForEntity(URL1, String.class, "h" + idx);
// f1.addCallback(s->{
// ListenableFuture<ResponseEntity<String>> f2 = rt.getForEntity(URL2, String.class, s.getBody());
//
// f2.addCallback(s2->{
// ListenableFuture<String> f3 = myService.work(s2.getBody());
// f3.addCallback(s3->{
// dr.setResult(s3);
// }, e->{
// dr.setErrorResult(e.getMessage());
// });
// }, e->{
// dr.setErrorResult(e.getMessage());
// });
// }, e->{
// dr.setErrorResult(e.getMessage());
// });
Completion
.from(rt.getForEntity(URL1, String.class, "h" + idx))
.andApply(s->rt.getForEntity(URL2, String.class, s.getBody()))
.andApply(s->myService.work(s.getBody()))
.andError(e->dr.setErrorResult("Error [" + e.toString() + "]"))
.andAccept(s->dr.setResult(s));
return dr;
}
public static class AcceptCompletion<S> extends Completion<S,Void> {
Consumer<S> con;
public AcceptCompletion(Consumer<S> con) {
this.con = con;
}
@Override
void run(S value) {
con.accept(value);
}
}
public static class ErrorCompletion<T> extends Completion<T,T> {
Consumer<Throwable> econ;
public ErrorCompletion(Consumer<Throwable> econ) {
this.econ = econ;
}
@Override
void run(T value) {
if (next != null) next.run(value);
}
}
public static class ApplyCompletion<S,T> extends Completion<S,T> {
Function<S, ListenableFuture<T>> fn;
public ApplyCompletion(Function<S, ListenableFuture<T>> fn) {
this.fn = fn;
}
@Override
void run(S value) {
ListenableFuture<T> lf = fn.apply(value);
lf.addCallback(s->complete(s), e->error(e));
}
}
public static class Completion<S,T>{
Completion next;
public void andAccept(Consumer<T> con){
Completion<T, Void> c = new AcceptCompletion(con);
this.next = c;
}
public Completion<T,T> andError(Consumer<Throwable> econ){
Completion<T,T> c = new ErrorCompletion<>(econ);
this.next = c;
return c;
}
public <V> Completion<T,V> andApply(Function<T, ListenableFuture<V>> fn){
Completion<T,V> c = new ApplyCompletion<>(fn);
this.next = c;
return c;
}
public static <S,T> Completion<S,T> from(ListenableFuture<T> lf) {
Completion<S,T> c = new Completion<>();
lf.addCallback(s->{
c.complete(s);
}, e->{
c.error(e);
});
return c;
}
void error(Throwable e) {
if (next != null) next.error(e);
}
void complete(T s) {
if (next != null) next.run(s);
}
void run(S value) {
}
}
@Service
public static class MyService {
@Async
public ListenableFuture<String> work(String req) { return new AsyncResult<>(req + "/asyncwork"); }
}
@Bean
public ThreadPoolTaskExecutor myThreadPool() {
ThreadPoolTaskExecutor te = new ThreadPoolTaskExecutor();
te.setCorePoolSize(1);
te.setMaxPoolSize(1);
return te;
}
public static void main(String[] args) {
SpringApplication.run(Tobytv010Application.class, args);
}
}
■ RemoteService.java
package toby.live;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@SpringBootApplication
public class RemoteService {
@RestController
public static class MyController {
@GetMapping("/service")
public String service(String req) throws InterruptedException {
Thread.sleep(1000);
//throw new RuntimeException();
return req + "/service1"; // html/text
}
@GetMapping("/service2")
public String service2(String req) throws InterruptedException {
Thread.sleep(1000);
return req + "/service2"; // html/text
}
}
public static void main(String[] args) {
System.setProperty("SERVER.PORT","8081");
System.setProperty("server.tomcat.max-threads","1000");
SpringApplication.run(RemoteService.class, args);
}
}
■ application.properties
server.tomcat.max-threads=1
'IT > Spring Framework' 카테고리의 다른 글
토비의 봄 TV 12회 스프링 리액티브 프로그래밍 (8) WebFlux (0) | 2018.02.03 |
---|---|
토비의 봄 TV 11회 스프링 리액티브 프로그래밍 (7) CompletableFuture (0) | 2018.02.03 |
토비의 봄 TV 9회 스프링 리액티브 프로그래밍 (5) 비동기 RestTemplate과 비동기 MVC/Servlet (0) | 2018.02.03 |
토비의 봄 TV 2회 - 수퍼 타입 토큰 (0) | 2018.02.03 |
토비의 봄 TV 1회 - 재사용성과 다이나믹 디스패치, 더블 디스패치 (0) | 2018.02.03 |
@DEAN :: Dean Story
포스팅이 좋았다면 "좋아요❤️" 또는 "구독👍🏻" 해주세요!