(시청일 : 20171126)






■ LoadTest.java
package toby.live;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StopWatch;
import org.springframework.web.client.RestTemplate;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
public class LoadTest {
    static AtomicInteger counter = new AtomicInteger(0);
    public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
        ExecutorService es = Executors.newFixedThreadPool(100);
        RestTemplate rt = new RestTemplate();
        String url = "http://localhost:8080/rest?idx={idx}";;
        CyclicBarrier barrier = new CyclicBarrier(101);
        StopWatch main = new StopWatch();
        main.start();
        for (int i=0; i<100; i++){
            es.submit(()->{
                int idx = counter.addAndGet(1);
                barrier.await();
                log.info("Thread {}", idx);
                StopWatch sw = new StopWatch();
                sw.start();
                String res = rt.getForObject(url,String.class, idx);
                sw.stop();
                log.info("Elapsed: {} {} / {}", idx, sw.getTotalTimeSeconds(), res);
                return null;
            });
        }
        barrier.await();
        es.shutdown();
        es.awaitTermination(100, TimeUnit.SECONDS);
        main.stop();
        log.info("Total: {}", main.getTotalTimeSeconds());
    }
}



■ Tobytv010Application.java
package toby.live;
import io.netty.channel.nio.NioEventLoopGroup;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.http.ResponseEntity;
import org.springframework.http.client.Netty4ClientHttpRequestFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.AsyncRestTemplate;
import org.springframework.web.context.request.async.DeferredResult;
import java.util.function.Consumer;
import java.util.function.Function;
@SpringBootApplication
@RestController
@EnableAsync
public class Tobytv010Application {
    AsyncRestTemplate rt = new AsyncRestTemplate(new Netty4ClientHttpRequestFactory(new NioEventLoopGroup(1)));
    @Autowired
    MyService myService;
    static final String URL1 = "http://localhost:8081/service?req={req}";;
    static final String URL2 = "http://localhost:8081/service2?req={req}";;
    @GetMapping("/rest")
    public DeferredResult<String> rest(int idx) {
        DeferredResult<String> dr = new DeferredResult<>();
//            ListenableFuture<ResponseEntity<String>> f1 = rt.getForEntity(URL1, String.class, "h" + idx);
//            f1.addCallback(s->{
//                ListenableFuture<ResponseEntity<String>> f2 = rt.getForEntity(URL2, String.class, s.getBody());
//
//                f2.addCallback(s2->{
//                    ListenableFuture<String> f3 = myService.work(s2.getBody());
//                    f3.addCallback(s3->{
//                        dr.setResult(s3);
//                    }, e->{
//                        dr.setErrorResult(e.getMessage());
//                    });
//                }, e->{
//                    dr.setErrorResult(e.getMessage());
//                });
//            }, e->{
//                dr.setErrorResult(e.getMessage());
//            });
        Completion
                .from(rt.getForEntity(URL1, String.class, "h" + idx))
                .andApply(s->rt.getForEntity(URL2, String.class, s.getBody()))
                .andApply(s->myService.work(s.getBody()))
                .andError(e->dr.setErrorResult("Error [" + e.toString() + "]"))
                .andAccept(s->dr.setResult(s));
        return dr;
    }
    
    public static class AcceptCompletion<S> extends Completion<S,Void> {
        Consumer<S> con;
        public AcceptCompletion(Consumer<S> con) {
            this.con = con;
        }
        @Override
        void run(S value) {
            con.accept(value);
        }
    }
    public static class ErrorCompletion<T> extends Completion<T,T> {
        Consumer<Throwable> econ;
        public ErrorCompletion(Consumer<Throwable> econ) {
            this.econ = econ;
        }
        @Override
        void run(T value) {
            if (next != null) next.run(value);
        }
    }
    public static class ApplyCompletion<S,T> extends Completion<S,T> {
        Function<S, ListenableFuture<T>> fn;
        public ApplyCompletion(Function<S, ListenableFuture<T>> fn) {
            this.fn = fn;
        }
        @Override
        void run(S value) {
            ListenableFuture<T> lf = fn.apply(value);
            lf.addCallback(s->complete(s), e->error(e));
        }
    }
    public static class Completion<S,T>{
        Completion next;
        public void andAccept(Consumer<T> con){
            Completion<T, Void> c = new AcceptCompletion(con);
            this.next = c;
        }
        public Completion<T,T> andError(Consumer<Throwable> econ){
            Completion<T,T> c = new ErrorCompletion<>(econ);
            this.next = c;
            return c;
        }
        public <V> Completion<T,V> andApply(Function<T, ListenableFuture<V>> fn){
            Completion<T,V> c = new ApplyCompletion<>(fn);
            this.next = c;
            return c;
        }
        public static <S,T> Completion<S,T> from(ListenableFuture<T> lf) {
            Completion<S,T> c = new Completion<>();
            lf.addCallback(s->{
                c.complete(s);
            }, e->{
                c.error(e);
            });
            return c;
        }
        void error(Throwable e) {
            if (next != null) next.error(e);
        }
        void complete(T s) {
            if (next != null) next.run(s);
        }
        void run(S value) {
        }
    }
    @Service
    public static class MyService {
        @Async
        public ListenableFuture<String> work(String req) { return new AsyncResult<>(req + "/asyncwork"); }
    }
    @Bean
    public ThreadPoolTaskExecutor myThreadPool() {
        ThreadPoolTaskExecutor te = new ThreadPoolTaskExecutor();
        te.setCorePoolSize(1);
        te.setMaxPoolSize(1);
        return te;
    }
    public static void main(String[] args) {
        SpringApplication.run(Tobytv010Application.class, args);
    }
}




■ RemoteService.java
package toby.live;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@SpringBootApplication
public class RemoteService {
    @RestController
    public static class MyController {
        @GetMapping("/service")
        public String service(String req) throws InterruptedException {
            Thread.sleep(1000);
            //throw new RuntimeException();
            return req + "/service1";  // html/text
        }
        @GetMapping("/service2")
        public String service2(String req) throws InterruptedException {
            Thread.sleep(1000);
            return req + "/service2";  // html/text
        }
    }
    public static void main(String[] args) {
        System.setProperty("SERVER.PORT","8081");
        System.setProperty("server.tomcat.max-threads","1000");
        SpringApplication.run(RemoteService.class, args);
    }
}


■ application.properties

server.tomcat.max-threads=1


+ Recent posts