(시청일 : 20180114)









■ 예제 1
package toby.tobytv014;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
@SpringBootApplication
@Slf4j
@RestController
public class Tobytv014Application {
    @GetMapping("/event/{id}")
    Mono<Event> hello(@PathVariable long id) {
        return Mono.just(new Event(id, "event" + id));
    }
    public static void main(String[] args) {
        SpringApplication.run(Tobytv014Application.class, args);
    }
    @Data    @AllArgsConstructor
    public static class Event {
        long id;
        String value;
    }
}


<Terminal>
C:\Users\Soohyeon\Desktop\curl_7_53_1_openssl_nghttp2_x64>curl localhost:8080/event/123
{"id":123,"value":"event123"}




■ 예제 2
package toby.tobytv014;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@SpringBootApplication
@Slf4j
@RestController
public class Tobytv014Application {
    @GetMapping("/event/{id}")
    Mono<Event> hello(@PathVariable long id) {
        return Mono.just(new Event(id, "event" + id));
    }
    @GetMapping("/events")
    Flux<Event> events() {
        return Flux.just(new Event(1L, "event1"), new Event(2L, "event2"));
    }
    public static void main(String[] args) {
        SpringApplication.run(Tobytv014Application.class, args);
    }
    @Data    @AllArgsConstructor
    public static class Event {
        long id;
        String value;
    }
}




<Terminal>
C:\Users\Soohyeon\Desktop\curl_7_53_1_openssl_nghttp2_x64>curl localhost:8080/events
[{"id":1,"value":"event1"},{"id":2,"value":"event2"}]



■ 예제 3
package toby.tobytv014;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.Arrays;
import java.util.List;
@SpringBootApplication
@Slf4j
@RestController
public class Tobytv014Application {
    @GetMapping("/event/{id}")
    Mono<List<Event>> hello(@PathVariable long id) {
        List<Event> list = Arrays.asList(new Event(1L, "event1"), new Event(2L, "event2"));
        return Mono.just(list);
    }
    @GetMapping("/events")
    Flux<Event> events() {
        return Flux.just(new Event(1L, "event1"), new Event(2L, "event2"));
    }
    public static void main(String[] args) {
        SpringApplication.run(Tobytv014Application.class, args);
    }
    @Data    @AllArgsConstructor
    public static class Event {
        long id;
        String value;
    }
}




<Terminal>
C:\Users\Soohyeon\Desktop\curl_7_53_1_openssl_nghttp2_x64>curl localhost:8080/event/1
[{"id":1,"value":"event1"},{"id":2,"value":"event2"}]
C:\Users\Soohyeon\Desktop\curl_7_53_1_openssl_nghttp2_x64>curl localhost:8080/events
[{"id":1,"value":"event1"},{"id":2,"value":"event2"}]





■ 예제 3
package toby.tobytv014;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.Arrays;
import java.util.List;
@SpringBootApplication
@Slf4j
@RestController
public class Tobytv014Application {
    @GetMapping("/event/{id}")
    Mono<List<Event>> hello(@PathVariable long id) {
        List<Event> list = Arrays.asList(new Event(1L, "event1"), new Event(2L, "event2"));
        return Mono.just(list);
    }
    @GetMapping(value="/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    Flux<Event> events() {
        List<Event> list = Arrays.asList(new Event(1L, "event1"), new Event(2L, "event2"));
        return Flux.fromIterable(list);
    }
    public static void main(String[] args) {
        SpringApplication.run(Tobytv014Application.class, args);
    }
    @Data    @AllArgsConstructor
    public static class Event {
        long id;
        String value;
    }
}





<Terminal>
C:\Users\Soohyeon\Desktop\curl_7_53_1_openssl_nghttp2_x64>curl localhost:8080/events
data:{"id":1,"value":"event1"}
data:{"id":2,"value":"event2"}




<Flux>






<Flux.fromIterable()>






■ 예제 4
package toby.tobytv014;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Stream;
@SpringBootApplication
@Slf4j
@RestController
public class Tobytv014Application {
    @GetMapping("/event/{id}")
    Mono<List<Event>> hello(@PathVariable long id) {
        List<Event> list = Arrays.asList(new Event(1L, "event1"), new Event(2L, "event2"));
        return Mono.just(list);
    }
    @GetMapping(value="/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    Flux<Event> events() {
        return Flux
                .fromStream(Stream.generate(() -> new Event(System.currentTimeMillis(), "value")))
                .delayElements(Duration.ofSeconds(1))
                .take(10);
    }
    public static void main(String[] args) {
        SpringApplication.run(Tobytv014Application.class, args);
    }
    @Data    @AllArgsConstructor
    public static class Event {
        long id;
        String value;
    }
}







<Terminal>
C:\Users\Soohyeon\Desktop\curl_7_53_1_openssl_nghttp2_x64>curl localhost:8080/events
data:{"id":1516531806083,"value":"value"}
data:{"id":1516531806192,"value":"value"}
data:{"id":1516531806194,"value":"value"}
data:{"id":1516531806195,"value":"value"}
data:{"id":1516531806210,"value":"value"}
data:{"id":1516531806213,"value":"value"}
data:{"id":1516531806215,"value":"value"}
data:{"id":1516531806216,"value":"value"}
data:{"id":1516531806217,"value":"value"}
data:{"id":1516531806219,"value":"value"}


<Flux.take()>








■ 예제 5
package toby.tobytv014;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Stream;
@SpringBootApplication
@Slf4j
@RestController
public class Tobytv014Application {
    @GetMapping("/event/{id}")
    Mono<List<Event>> hello(@PathVariable long id) {
        List<Event> list = Arrays.asList(new Event(1L, "event1"), new Event(2L, "event2"));
        return Mono.just(list);
    }
    @GetMapping(value="/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    Flux<Event> events() {
        return Flux
                .<Event>generate(sink -> sink.next(new Event(System.currentTimeMillis(), "value")))
                .delayElements(Duration.ofSeconds(1))
                .take(10);
    }
    public static void main(String[] args) {
        SpringApplication.run(Tobytv014Application.class, args);
    }
    @Data    @AllArgsConstructor
    public static class Event {
        long id;
        String value;
    }
}





<Terminal>
C:\Users\Soohyeon\Desktop\curl_7_53_1_openssl_nghttp2_x64>curl localhost:8080/events
data:{"id":1516533395000,"value":"value"}
data:{"id":1516533396018,"value":"value"}
data:{"id":1516533397019,"value":"value"}
data:{"id":1516533398021,"value":"value"}
data:{"id":1516533399023,"value":"value"}
data:{"id":1516533400023,"value":"value"}
data:{"id":1516533401024,"value":"value"}
data:{"id":1516533402025,"value":"value"}
data:{"id":1516533403026,"value":"value"}
data:{"id":1516533404027,"value":"value"}






■ 예제 6
package toby.tobytv014;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Stream;
@SpringBootApplication
@Slf4j
@RestController
public class Tobytv014Application {
    @GetMapping("/event/{id}")
    Mono<List<Event>> hello(@PathVariable long id) {
        List<Event> list = Arrays.asList(new Event(1L, "event1"), new Event(2L, "event2"));
        return Mono.just(list);
    }
    @GetMapping(value="/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    Flux<Event> events() {
        return Flux
                .<Event, Long>generate(()->1L, (id, sink) -> {
                    sink.next(new Event(id, "value" + id));
                    return id+1;
                })
                .delayElements(Duration.ofSeconds(1))
                .take(10);
    }
    public static void main(String[] args) {
        SpringApplication.run(Tobytv014Application.class, args);
    }
    @Data    @AllArgsConstructor
    public static class Event {
        long id;
        String value;
    }
}



<Terminal>
C:\Users\Soohyeon\Desktop\curl_7_53_1_openssl_nghttp2_x64>curl localhost:8080/events
data:{"id":1,"value":"value1"}
data:{"id":2,"value":"value2"}
data:{"id":3,"value":"value3"}
data:{"id":4,"value":"value4"}
data:{"id":5,"value":"value5"}
data:{"id":6,"value":"value6"}
data:{"id":7,"value":"value7"}
data:{"id":8,"value":"value8"}
data:{"id":9,"value":"value9"}
data:{"id":10,"value":"value10"}




■ 예제 7
package toby.tobytv014;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Stream;
@SpringBootApplication
@Slf4j
@RestController
public class Tobytv014Application {
    @GetMapping("/event/{id}")
    Mono<List<Event>> hello(@PathVariable long id) {
        List<Event> list = Arrays.asList(new Event(1L, "event1"), new Event(2L, "event2"));
        return Mono.just(list);
    }
    @GetMapping(value="/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    Flux<Event> events() {
        Flux<Event> es = Flux
                .<Event, Long>generate(()->1L, (id, sink) -> {
                    sink.next(new Event(id, "value" + id));
                    return id+1;
                });
        Flux<Long> interval = Flux.interval(Duration.ofSeconds(1));
        return Flux.zip(es, interval).map(tu->tu.getT1()).take(10);
    }
    public static void main(String[] args) {
        SpringApplication.run(Tobytv014Application.class, args);
    }
    @Data    @AllArgsConstructor
    public static class Event {
        long id;
        String value;
    }
}



<Terminal>
C:\Users\Soohyeon\Desktop\curl_7_53_1_openssl_nghttp2_x64>curl localhost:8080/events
data:{"id":1,"value":"value1"}
data:{"id":2,"value":"value2"}
data:{"id":3,"value":"value3"}
data:{"id":4,"value":"value4"}
data:{"id":5,"value":"value5"}
data:{"id":6,"value":"value6"}
data:{"id":7,"value":"value7"}
data:{"id":8,"value":"value8"}
data:{"id":9,"value":"value9"}
data:{"id":10,"value":"value10"}





■ 예제 8
package toby.tobytv014;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Stream;
@SpringBootApplication
@Slf4j
@RestController
public class Tobytv014Application {
    @GetMapping("/event/{id}")
    Mono<List<Event>> hello(@PathVariable long id) {
        List<Event> list = Arrays.asList(new Event(1L, "event1"), new Event(2L, "event2"));
        return Mono.just(list);
    }
    @GetMapping(value="/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    Flux<Event> events() {
        Flux<String> es = Flux.generate(sink -> sink.next("Value"));
        Flux<Long> interval = Flux.interval(Duration.ofSeconds(1));
        return Flux.zip(es, interval).map(tu -> new Event(tu.getT2()+1, tu.getT1())).take(10);
    }
    public static void main(String[] args) {
        SpringApplication.run(Tobytv014Application.class, args);
    }
    @Data    @AllArgsConstructor
    public static class Event {
        long id;
        String value;
    }
}



<Terminal>
C:\Users\Soohyeon\Desktop\curl_7_53_1_openssl_nghttp2_x64>curl localhost:8080/events
data:{"id":1,"value":"Value"}
data:{"id":2,"value":"Value"}
data:{"id":3,"value":"Value"}
data:{"id":4,"value":"Value"}
data:{"id":5,"value":"Value"}
data:{"id":6,"value":"Value"}
data:{"id":7,"value":"Value"}
data:{"id":8,"value":"Value"}
data:{"id":9,"value":"Value"}
data:{"id":10,"value":"Value"}




<Flux.generate()>





<Flux.interval()>





<Flux.zip()>





(시청일 : 20170114)



스프링 5.0 WebFlux에서 사용되는 Reactor의 Mono의 기본 동작방식을 살펴봅니다.



■ 예제 1
package toby;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
@SpringBootApplication
@RestController
@Slf4j
public class Toby013Application {
    @GetMapping("/")
    Mono<String> hello() {
        return Mono.just("Hello WebFlux").log(); // Publisher -> (Publisher) -> (Publisher) -> Subscriber
    }
    public static void main(String[] args) {
        SpringApplication.run(Toby013Application.class, args);
    }
}


<결과>
2018-01-21 17:15:04.761  INFO 8148 --- [ctor-http-nio-2] reactor.Mono.Just.1                      : | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
2018-01-21 17:15:04.764  INFO 8148 --- [ctor-http-nio-2] reactor.Mono.Just.1                      : | request(1)
2018-01-21 17:15:04.764  INFO 8148 --- [ctor-http-nio-2] reactor.Mono.Just.1                      : | onNext(Hello WebFlux)
2018-01-21 17:15:04.787  INFO 8148 --- [ctor-http-nio-2] reactor.Mono.Just.1                      : | request(1)
2018-01-21 17:15:04.787  INFO 8148 --- [ctor-http-nio-2] reactor.Mono.Just.1                      : | request(31)
2018-01-21 17:15:04.787  INFO 8148 --- [ctor-http-nio-2] reactor.Mono.Just.1                      : | onComplete()
2018-01-21 17:15:05.264  INFO 8148 --- [ctor-http-nio-2] reactor.Mono.Just.1                      : | cancel()



■ 예제 2
package toby;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
@SpringBootApplication
@RestController
@Slf4j
public class Toby013Application {
    @GetMapping("/")
    Mono<String> hello() {
        log.info("pos1");
        Mono m = Mono.just("Hello WebFlux").log(); // Publisher -> (Publisher) -> (Publisher) -> Subscriber
        log.info("pos2");
        return m;
    }
    public static void main(String[] args) {
        SpringApplication.run(Toby013Application.class, args);
    }
}




<결과>
2018-01-21 17:18:46.265  INFO 13388 --- [ctor-http-nio-2] toby.Toby013Application                  : pos1
2018-01-21 17:18:46.268  INFO 13388 --- [ctor-http-nio-2] toby.Toby013Application                  : pos2
2018-01-21 17:18:46.284  INFO 13388 --- [ctor-http-nio-2] reactor.Mono.Just.1                      : | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
2018-01-21 17:18:46.285  INFO 13388 --- [ctor-http-nio-2] reactor.Mono.Just.1                      : | request(1)
2018-01-21 17:18:46.286  INFO 13388 --- [ctor-http-nio-2] reactor.Mono.Just.1                      : | onNext(Hello WebFlux)
2018-01-21 17:18:46.307  INFO 13388 --- [ctor-http-nio-2] reactor.Mono.Just.1                      : | request(1)
2018-01-21 17:18:46.308  INFO 13388 --- [ctor-http-nio-2] reactor.Mono.Just.1                      : | request(31)
2018-01-21 17:18:46.308  INFO 13388 --- [ctor-http-nio-2] reactor.Mono.Just.1                      : | onComplete()
2018-01-21 17:18:46.746  INFO 13388 --- [ctor-http-nio-2] reactor.Mono.Just.1                      : | cancel()




■ 예제 3
package toby;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
@SpringBootApplication
@RestController
@Slf4j
public class Toby013Application {
    @GetMapping("/")
    Mono<String> hello() {
        log.info("pos1");
        Mono m = Mono.just("Hello WebFlux").doOnNext(c->log.info(c)).log(); // Publisher -> (Publisher) -> (Publisher) -> Subscriber
        log.info("pos2");
        return m;
    }
    public static void main(String[] args) {
        SpringApplication.run(Toby013Application.class, args);
    }
}




<결과>
2018-01-21 17:20:50.096  INFO 13748 --- [ctor-http-nio-2] toby.Toby013Application                  : pos1
2018-01-21 17:20:50.099  INFO 13748 --- [ctor-http-nio-2] toby.Toby013Application                  : pos2
2018-01-21 17:20:50.115  INFO 13748 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | onSubscribe([Fuseable] FluxPeekFuseable.PeekFuseableSubscriber)
2018-01-21 17:20:50.117  INFO 13748 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | request(1)
2018-01-21 17:20:50.117  INFO 13748 --- [ctor-http-nio-2] toby.Toby013Application                  : Hello WebFlux
2018-01-21 17:20:50.117  INFO 13748 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | onNext(Hello WebFlux)
2018-01-21 17:20:50.138  INFO 13748 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | request(1)
2018-01-21 17:20:50.138  INFO 13748 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | request(31)
2018-01-21 17:20:50.139  INFO 13748 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | onComplete()
2018-01-21 17:20:50.229  INFO 13748 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | cancel()




■ 예제 4
package toby;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
@SpringBootApplication
@RestController
@Slf4j
public class Toby013Application {
    @GetMapping("/")
    Mono<String> hello() {
        log.info("pos1");
        Mono m = Mono.just(generateHello()).doOnNext(c->log.info(c)).log(); // Publisher -> (Publisher) -> (Publisher) -> Subscriber
        log.info("pos2");
        return m;
    }
    private String generateHello() {
        log.info("method generateHello()");
        return "Hello Mono";
    }
    public static void main(String[] args) {
        SpringApplication.run(Toby013Application.class, args);
    }
}




<결과>
2018-01-21 17:26:20.531  INFO 14112 --- [ctor-http-nio-2] toby.Toby013Application                  : pos1
2018-01-21 17:26:20.531  INFO 14112 --- [ctor-http-nio-2] toby.Toby013Application                  : method generateHello()
2018-01-21 17:26:20.534  INFO 14112 --- [ctor-http-nio-2] toby.Toby013Application                  : pos2
2018-01-21 17:26:20.552  INFO 14112 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | onSubscribe([Fuseable] FluxPeekFuseable.PeekFuseableSubscriber)
2018-01-21 17:26:20.553  INFO 14112 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | request(1)
2018-01-21 17:26:20.553  INFO 14112 --- [ctor-http-nio-2] toby.Toby013Application                  : Hello Mono
2018-01-21 17:26:20.553  INFO 14112 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | onNext(Hello Mono)
2018-01-21 17:26:20.575  INFO 14112 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | request(1)
2018-01-21 17:26:20.575  INFO 14112 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | request(31)
2018-01-21 17:26:20.575  INFO 14112 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | onComplete()
2018-01-21 17:26:20.755  INFO 14112 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | cancel()


■ 예제 5
package toby;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
@SpringBootApplication
@RestController
@Slf4j
public class Toby013Application {
    @GetMapping("/")
    Mono<String> hello() {
        log.info("pos1");
        Mono m = Mono.fromSupplier(() -> generateHello()).doOnNext(c->log.info(c)).log(); // Publisher -> (Publisher) -> (Publisher) -> Subscriber
        log.info("pos2");
        return m;
    }
    private String generateHello() {
        log.info("method generateHello()");
        return "Hello Mono";
    }
    public static void main(String[] args) {
        SpringApplication.run(Toby013Application.class, args);
    }
}





<결과>
2018-01-21 17:29:04.571  INFO 13256 --- [ctor-http-nio-2] toby.Toby013Application                  : pos1
2018-01-21 17:29:04.575  INFO 13256 --- [ctor-http-nio-2] toby.Toby013Application                  : pos2
2018-01-21 17:29:04.589  INFO 13256 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | onSubscribe([Fuseable] FluxPeekFuseable.PeekFuseableSubscriber)
2018-01-21 17:29:04.591  INFO 13256 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | request(1)
2018-01-21 17:29:04.591  INFO 13256 --- [ctor-http-nio-2] toby.Toby013Application                  : method generateHello()
2018-01-21 17:29:04.591  INFO 13256 --- [ctor-http-nio-2] toby.Toby013Application                  : Hello Mono
2018-01-21 17:29:04.591  INFO 13256 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | onNext(Hello Mono)
2018-01-21 17:29:04.622  INFO 13256 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | request(1)
2018-01-21 17:29:04.622  INFO 13256 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | request(31)
2018-01-21 17:29:04.623  INFO 13256 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | onComplete()



■ 예제 6
package toby;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
@SpringBootApplication
@RestController
@Slf4j
public class Toby013Application {
    @GetMapping("/")
    Mono<String> hello() {
        log.info("pos1");
        Mono m = Mono.fromSupplier(() -> generateHello()).doOnNext(c->log.info(c)).log(); // Publisher -> (Publisher) -> (Publisher) -> Subscriber
        m.subscribe();
        log.info("pos2");
        return m;
    }
    private String generateHello() {
        log.info("method generateHello()");
        return "Hello Mono";
    }
    public static void main(String[] args) {
        SpringApplication.run(Toby013Application.class, args);
    }
}





<결과>
2018-01-21 17:33:30.588  INFO 13316 --- [ctor-http-nio-2] toby.Toby013Application                  : pos1
2018-01-21 17:33:30.596  INFO 13316 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | onSubscribe([Fuseable] FluxPeekFuseable.PeekFuseableSubscriber)
2018-01-21 17:33:30.598  INFO 13316 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | request(unbounded)
2018-01-21 17:33:30.598  INFO 13316 --- [ctor-http-nio-2] toby.Toby013Application                  : method generateHello()
2018-01-21 17:33:30.598  INFO 13316 --- [ctor-http-nio-2] toby.Toby013Application                  : Hello Mono
2018-01-21 17:33:30.598  INFO 13316 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | onNext(Hello Mono)
2018-01-21 17:33:30.599  INFO 13316 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | onComplete()
2018-01-21 17:33:30.599  INFO 13316 --- [ctor-http-nio-2] toby.Toby013Application                  : pos2
2018-01-21 17:33:30.612  INFO 13316 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | onSubscribe([Fuseable] FluxPeekFuseable.PeekFuseableSubscriber)
2018-01-21 17:33:30.612  INFO 13316 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | request(1)
2018-01-21 17:33:30.612  INFO 13316 --- [ctor-http-nio-2] toby.Toby013Application                  : method generateHello()
2018-01-21 17:33:30.612  INFO 13316 --- [ctor-http-nio-2] toby.Toby013Application                  : Hello Mono
2018-01-21 17:33:30.612  INFO 13316 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | onNext(Hello Mono)
2018-01-21 17:33:30.633  INFO 13316 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | request(1)
2018-01-21 17:33:30.633  INFO 13316 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | request(31)
2018-01-21 17:33:30.634  INFO 13316 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | onComplete()
2018-01-21 17:33:31.249  INFO 13316 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | cancel()




■ 예제 7
package toby;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
@SpringBootApplication
@RestController
@Slf4j
public class Toby013Application {
    @GetMapping("/")
    Mono<String> hello() {
        log.info("pos1");
        String msg = generateHello();
        Mono<String> m = Mono.just(msg).doOnNext(c->log.info(c)).log(); // Publisher -> (Publisher) -> (Publisher) -> Subscriber
        String msg2 = m.block();
        log.info("pos2: " + msg2);
        return m;
    }
    private String generateHello() {
        log.info("method generateHello()");
        return "Hello Mono";
    }
    public static void main(String[] args) {
        SpringApplication.run(Toby013Application.class, args);
    }
}






<결과>
2018-01-21 17:39:20.356  INFO 11348 --- [ctor-http-nio-2] toby.Toby013Application                  : pos1
2018-01-21 17:39:20.356  INFO 11348 --- [ctor-http-nio-2] toby.Toby013Application                  : method generateHello()
2018-01-21 17:39:20.362  INFO 11348 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | onSubscribe([Fuseable] FluxPeekFuseable.PeekFuseableSubscriber)
2018-01-21 17:39:20.364  INFO 11348 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | request(unbounded)
2018-01-21 17:39:20.364  INFO 11348 --- [ctor-http-nio-2] toby.Toby013Application                  : Hello Mono
2018-01-21 17:39:20.364  INFO 11348 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | onNext(Hello Mono)
2018-01-21 17:39:20.364  INFO 11348 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | onComplete()
2018-01-21 17:39:20.365  INFO 11348 --- [ctor-http-nio-2] toby.Toby013Application                  : pos2: Hello Mono
2018-01-21 17:39:20.381  INFO 11348 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | onSubscribe([Fuseable] FluxPeekFuseable.PeekFuseableSubscriber)
2018-01-21 17:39:20.381  INFO 11348 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | request(1)
2018-01-21 17:39:20.381  INFO 11348 --- [ctor-http-nio-2] toby.Toby013Application                  : Hello Mono
2018-01-21 17:39:20.381  INFO 11348 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | onNext(Hello Mono)
2018-01-21 17:39:20.407  INFO 11348 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | request(1)
2018-01-21 17:39:20.407  INFO 11348 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | request(31)
2018-01-21 17:39:20.407  INFO 11348 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | onComplete()
2018-01-21 17:39:20.579  INFO 11348 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | cancel()


- block : publisher가 제공하는 결과값을 꺼내서 Mono나 Flux 같은 컨테이너를 제거하고 값을 넘겨주는 것이 목적.


■ 예제 8
package toby;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
@SpringBootApplication
@RestController
@Slf4j
public class Toby013Application {
    @GetMapping("/")
    Mono<String> hello() {
        log.info("pos1");
        String msg = generateHello();
        Mono<String> m = Mono.just(msg).doOnNext(c->log.info(c)).log(); // Publisher -> (Publisher) -> (Publisher) -> Subscriber
        String msg2 = m.block();
        log.info("pos2: " + msg2);
        return Mono.just(msg2);
    }
    private String generateHello() {
        log.info("method generateHello()");
        return "Hello Mono";
    }
    public static void main(String[] args) {
        SpringApplication.run(Toby013Application.class, args);
    }
}




<결과>
2018-01-21 17:48:23.009  INFO 13012 --- [ctor-http-nio-2] toby.Toby013Application                  : pos1
2018-01-21 17:48:23.009  INFO 13012 --- [ctor-http-nio-2] toby.Toby013Application                  : method generateHello()
2018-01-21 17:48:23.015  INFO 13012 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | onSubscribe([Fuseable] FluxPeekFuseable.PeekFuseableSubscriber)
2018-01-21 17:48:23.016  INFO 13012 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | request(unbounded)
2018-01-21 17:48:23.016  INFO 13012 --- [ctor-http-nio-2] toby.Toby013Application                  : Hello Mono
2018-01-21 17:48:23.017  INFO 13012 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | onNext(Hello Mono)
2018-01-21 17:48:23.017  INFO 13012 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | onComplete()
2018-01-21 17:48:23.018  INFO 13012 --- [ctor-http-nio-2] toby.Toby013Application                  : pos2: Hello Mono




(시청일 : 20170114)



- 책 추천 : RXJava를 활용한 리액티브 프로그래밍


■ 프로젝트 환경
- 스프링부트 버전 : SpringBoot 2.0 M1
- 스프링 프레임워크 버전 : SpringFramework 5.0 RC1
- Core : Lombok
- Web : Reactive Web  
           (Web과 Reactive Web는 서로 배타적이라 한 프로젝트에 동시에 적용될 수 없다. 두가지 모두 선택하게 되면 Web이 우선시된다)


■ 예제 1
/* 
*
* LoadTest.java 
*
*/
package toby.live;

import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StopWatch;
import org.springframework.web.client.RestTemplate;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

@Slf4j
public class LoadTest {
static AtomicInteger counter = new AtomicInteger(0);

public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
ExecutorService es = Executors.newFixedThreadPool(100);

RestTemplate rt = new RestTemplate();


CyclicBarrier barrier = new CyclicBarrier(101);


StopWatch main = new StopWatch();
main.start();

for (int i=0; i<100; i++){
es.submit(()->{
int idx = counter.addAndGet(1);

barrier.await();

log.info("Thread {}", idx);

StopWatch sw = new StopWatch();
sw.start();

String res = rt.getForObject(url,String.class, idx);

sw.stop();
log.info("Elapsed: {} {} / {}", idx, sw.getTotalTimeSeconds(), res);

return null;
});
}

barrier.await();
es.shutdown();
es.awaitTermination(100, TimeUnit.SECONDS);

main.stop();
log.info("Total: {}", main.getTotalTimeSeconds());

}

}



/*
* Toby012Application.java
(원래는 Netty에서 돌아가야하지만, 한 프로젝트에서 Tomcat과 Netty를 동시에 돌릴 수 없어서 Tomcat에서 실행하였음.
*/
package toby.live;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.stereotype.Service;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
@SpringBootApplication
@RestController
@EnableAsync
public class Toby012Application {
    static final String URL1 = "http://localhost:8081/service?req={req}";;
    static final String URL2 = "http://localhost:8081/service2?req={req}";;
    @Autowired
    MyService myService;
    WebClient client = WebClient.create();
    @GetMapping("/rest")
    public Mono<String> rest(int idx) {
        return client.get().uri(URL1, idx).exchange() // Mono<ClientResponse>
                .flatMap(c ->  c.bodyToMono(String.class)) // Mono<String>
                .flatMap((String res1) -> client.get().uri(URL2, res1).exchange()) // Mono<ClientResponse>
                .flatMap(c -> c.bodyToMono((String.class)));  // Mono<String>
    }
    public static void main(String[] args) {
        System.setProperty("reactor.ipc.netty.workerCount", "2");
        System.setProperty("reactor.ipc.netty.pool.maxConnections", "2000");
        SpringApplication.run(Toby012Application.class, args);
    }
    @Service
    public static class MyService {
        public String work(String req) {
            return req + "/asyncwork";
        }
    }
}





/*
* RemoteService.java
(Tomcat에서 실행)
*/
package toby.live;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@SpringBootApplication
public class RemoteService {
    @RestController
    public static class MyController {
        @GetMapping("/service")
        public String service(String req) throws InterruptedException {
            Thread.sleep(1000);
            //throw new RuntimeException();
            return req + "/service1";  // html/text
        }
        @GetMapping("/service2")
        public String service2(String req) throws InterruptedException {
            Thread.sleep(1000);
            return req + "/service2";  // html/text
        }
    }
    public static void main(String[] args) {
        System.setProperty("SERVER.PORT","8081");
        System.setProperty("server.tomcat.max-threads","1000");
        SpringApplication.run(RemoteService.class, args);
    }
}


 


■ 예제 2
/* 
*
* LoadTest.java 
*
*/
package toby.live;

import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StopWatch;
import org.springframework.web.client.RestTemplate;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

@Slf4j
public class LoadTest {
static AtomicInteger counter = new AtomicInteger(0);

public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
ExecutorService es = Executors.newFixedThreadPool(100);

RestTemplate rt = new RestTemplate();


CyclicBarrier barrier = new CyclicBarrier(101);


StopWatch main = new StopWatch();
main.start();

for (int i=0; i<100; i++){
es.submit(()->{
int idx = counter.addAndGet(1);

barrier.await();

log.info("Thread {}", idx);

StopWatch sw = new StopWatch();
sw.start();

String res = rt.getForObject(url,String.class, idx);

sw.stop();
log.info("Elapsed: {} {} / {}", idx, sw.getTotalTimeSeconds(), res);

return null;
});
}

barrier.await();
es.shutdown();
es.awaitTermination(100, TimeUnit.SECONDS);

main.stop();
log.info("Total: {}", main.getTotalTimeSeconds());

}

}



/*
* Toby012Application.java
(원래는 Netty에서 돌아가야하지만, 한 프로젝트에서 Tomcat과 Netty를 동시에 돌릴 수 없어서 Tomcat에서 실행하였음.
*/
package toby.live;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.stereotype.Service;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
import java.util.concurrent.CompletableFuture;
@SpringBootApplication
@RestController
@Slf4j
@EnableAsync
public class Toby012Application {
    static final String URL1 = "http://localhost:8081/service?req={req}";;
    static final String URL2 = "http://localhost:8081/service2?req={req}";;
    @Autowired
    MyService myService;
    WebClient client = WebClient.create();
    @GetMapping("/rest")
    public Mono<String> rest(int idx) {
        return client.get().uri(URL1, idx).exchange() // Mono<ClientResponse>
                .flatMap(c ->  c.bodyToMono(String.class)) // Mono<String>
                //.doOnNext(c -> log.info(c))
                .flatMap(res1 -> client.get().uri(URL2, res1).exchange()) // Mono<ClientResponse>
                .flatMap(c -> c.bodyToMono((String.class))) // Mono<String>
                .doOnNext(c -> log.info(c))
                .flatMap(res2 -> Mono.fromCompletionStage(myService.work(res2)))      // CompletableFuture<String> -> Mono<String>
                .doOnNext(c -> log.info(c));
    }
    public static void main(String[] args) {
        System.setProperty("reactor.ipc.netty.workerCount", "2");
        System.setProperty("reactor.ipc.netty.pool.maxConnections", "2000");
        SpringApplication.run(Toby012Application.class, args);
    }
    @Service
    public static class MyService {
        @Async
        public CompletableFuture<String> work(String req) {
            return CompletableFuture.completedFuture(req + "/asyncwork");
        }
    }
}





/*
* RemoteService.java
(Tomcat에서 실행)
*/
package toby.live;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@SpringBootApplication
public class RemoteService {
    @RestController
    public static class MyController {
        @GetMapping("/service")
        public String service(String req) throws InterruptedException {
            Thread.sleep(1000);
            //throw new RuntimeException();
            return req + "/service1";  // html/text
        }
        @GetMapping("/service2")
        public String service2(String req) throws InterruptedException {
            Thread.sleep(1000);
            return req + "/service2";  // html/text
        }
    }
    public static void main(String[] args) {
        System.setProperty("SERVER.PORT","8081");
        System.setProperty("server.tomcat.max-threads","1000");
        SpringApplication.run(RemoteService.class, args);
    }
}



■ application.properties

server.tomcat.max-threads=1


(시청일 : 20180113)




(중략)

그 동안 java 에서 제공하던 Future interface 가 제공한 Async 이지만 get() 메서드로 Blocking 을 유발하는 단점이 있었다.
이런 불편함은 Spring Framework에서 제공하는 LinstenableFuture 로 해소 할 수 있었다. (Spring Framework를 사용하는 개발자라면 RestTemplated 의 Async 버젼인 AsyncRestTemplate에 대해 관심을 가져 보기 바란다.)
하지만 ListenableFuture를 써 본 개발자라면 한번쯤은 Callback 작성의 복잡함에 고민해 왔을 것이다. 심지어 Callback Hell 이라는 말이 커뮤니티에는 많이 돌고 있기도 하다.
그 불편함을 알았던지 Java 8 에서 드디어 CompletableFuture 를 제공한다.
CompletableFuture는 아래와 같은 패턴으로 개발이 가능하여 비동기 서비스 코드의 가독성이 좋아졌다.
(중략)


- Future : 비동기 작업의 결과를 담고있는 오브젝트
- ListenableFuture : 콜백 구조로 결과가 완료되는 시점에 Hooking을 걸어서 결과를 가져오는 기법. 비동기 메소드를 호출하면서 콜백 메소드를 지정할 수 있도록 되어 있다.
- CompletableFuture : CompletableFuture 리스트의 모든값이 완료될 때까지 기다릴지 아니면 하나의 값만 완료되길 기다릴지 선택할 수 있다는 것이 장점이다. CompletableFuture 클래스는 람다 표현식과 파이프라이닝을 활용하면 구조적으로 예쁘게 만들 수 있다. 병렬성과(Parallelism)과 동시성(Concurrency)에서 CompletableFuture가 중요한데, 여러개의 cpu core 사이에 지연 실행이나 예외를 callable하게 처리할 수 있어서 명시적인 처리가 가능해진다.


■ 예제 1
package toby.live;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
@Slf4j
public class CFuture {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        /*
        CompletableFuture<Integer> f = new CompletableFuture<>();
        //f.complete(2);
        f.completeExceptionally(new RuntimeException());
        System.out.println(f.get());
        */
        CompletableFuture
                .runAsync(() -> log.info("runAsync"))
                .thenRun(() -> log.info("thenRun"))
                .thenRun(() -> log.info("thenRun"));
        log.info("exit");
        ForkJoinPool.commonPool().shutdown();
        ForkJoinPool.commonPool().awaitTermination(10, TimeUnit.SECONDS);
    }
}

<결과>
18:13:38.852 [ForkJoinPool.commonPool-worker-9] INFO toby.live.CFuture - runAsync
18:13:38.852 [main] INFO toby.live.CFuture - exit
18:13:38.859 [ForkJoinPool.commonPool-worker-9] INFO toby.live.CFuture - thenRun
18:13:38.860 [ForkJoinPool.commonPool-worker-9] INFO toby.live.CFuture - thenRun
Process finished with exit code 0


■ 예제 2
package toby.live;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
@Slf4j
public class CFuture {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        /*
        CompletableFuture<Integer> f = new CompletableFuture<>();
        //f.complete(2);
        f.completeExceptionally(new RuntimeException());
        System.out.println(f.get());
        */
        CompletableFuture
                .supplyAsync(() -> {
                    log.info("runAsync");
                    return 1;
                })
                .thenApply(s -> {
                    log.info("thenApply {}",s);
                    return s + 1;
                })
                .thenApply(s2 -> {
                    log.info("thenApply {}",s2);
                    return s2 * 3;
                })
                .thenAccept(s3 -> log.info("thenAccept {}",s3));
        log.info("exit");
        ForkJoinPool.commonPool().shutdown();
        ForkJoinPool.commonPool().awaitTermination(10, TimeUnit.SECONDS);
    }
}


<결과>
18:20:54.956 [main] INFO toby.live.CFuture - exit
18:20:54.956 [ForkJoinPool.commonPool-worker-9] INFO toby.live.CFuture - runAsync
18:20:54.962 [ForkJoinPool.commonPool-worker-9] INFO toby.live.CFuture - thenApply 1
18:20:54.966 [ForkJoinPool.commonPool-worker-9] INFO toby.live.CFuture - thenApply 2
18:20:54.966 [ForkJoinPool.commonPool-worker-9] INFO toby.live.CFuture - thenAccept 6
Process finished with exit code 0



■ 예제 3
package toby.live;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
@Slf4j
public class CFuture {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        /*
        CompletableFuture<Integer> f = new CompletableFuture<>();
        //f.complete(2);
        f.completeExceptionally(new RuntimeException());
        System.out.println(f.get());
        */
        CompletableFuture
                .supplyAsync(() -> {
                    log.info("runAsync");
                    return 1;
                })
                .thenCompose(s -> {
                    log.info("thenApply {}",s);
                    return CompletableFuture.completedFuture(s + 1);
                })
                .thenApply(s2 -> {
                    log.info("thenApply {}",s2);
                    return s2 * 3;
                })
                .thenAccept(s3 -> log.info("thenAccept {}",s3));
        log.info("exit");
        ForkJoinPool.commonPool().shutdown();
        ForkJoinPool.commonPool().awaitTermination(10, TimeUnit.SECONDS);
    }
}


<결과>
18:25:36.218 [main] INFO toby.live.CFuture - exit
18:25:36.218 [ForkJoinPool.commonPool-worker-9] INFO toby.live.CFuture - runAsync
18:25:36.226 [ForkJoinPool.commonPool-worker-9] INFO toby.live.CFuture - thenCompose 1
18:25:36.230 [ForkJoinPool.commonPool-worker-9] INFO toby.live.CFuture - thenApply 2
18:25:36.230 [ForkJoinPool.commonPool-worker-9] INFO toby.live.CFuture - thenAccept 6
Process finished with exit code 0



■ 예제 4
package toby.live;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
@Slf4j
public class CFuture {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        /*
        CompletableFuture<Integer> f = new CompletableFuture<>();
        //f.complete(2);
        f.completeExceptionally(new RuntimeException());
        System.out.println(f.get());
        */
        CompletableFuture
                .supplyAsync(() -> {
                    log.info("runAsync");
                    if(1==1) throw new RuntimeException();  // 강제 오류 발생!!
                    return 1;
                })
                .thenCompose(s -> {
                    log.info("thenCompose {}",s);
                    return CompletableFuture.completedFuture(s + 1);
                })
                .thenApply(s2 -> {
                    log.info("thenApply {}",s2);
                    return s2 * 3;
                })
                .exceptionally(e -> -10)
                .thenAccept(s3 -> log.info("thenAccept {}",s3));
        log.info("exit");
        ForkJoinPool.commonPool().shutdown();
        ForkJoinPool.commonPool().awaitTermination(10, TimeUnit.SECONDS);
    }
}



<결과>
18:27:08.541 [main] INFO toby.live.CFuture - exit
18:27:08.541 [ForkJoinPool.commonPool-worker-9] INFO toby.live.CFuture - runAsync
18:27:08.546 [ForkJoinPool.commonPool-worker-9] INFO toby.live.CFuture - thenAccept -10
Process finished with exit code 0



■ 예제 5
package toby.live;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.*;
@Slf4j
public class CFuture {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        /*
        CompletableFuture<Integer> f = new CompletableFuture<>();
        //f.complete(2);
        f.completeExceptionally(new RuntimeException());
        System.out.println(f.get());
        */
        ExecutorService es = Executors.newFixedThreadPool(10);
        CompletableFuture
                .supplyAsync(() -> {
                    log.info("runAsync");
                    return 1;
                })
                .thenCompose(s -> {
                    log.info("thenCompose {}",s);
                    return CompletableFuture.completedFuture(s + 1);
                })
                .thenApplyAsync(s2 -> {
                    log.info("thenApplyAsync {}",s2);
                    return s2 * 3;
                }, es)
                .exceptionally(e -> -10)
                .thenAcceptAsync(s3 -> log.info("thenAcceptAsync {}",s3), es);
        log.info("exit");
        ForkJoinPool.commonPool().shutdown();
        ForkJoinPool.commonPool().awaitTermination(10, TimeUnit.SECONDS);
    }
}


<결과>
18:32:24.425 [main] INFO toby.live.CFuture - exit
18:32:24.425 [ForkJoinPool.commonPool-worker-9] INFO toby.live.CFuture - runAsync
18:32:24.431 [ForkJoinPool.commonPool-worker-9] INFO toby.live.CFuture - thenCompose 1
18:32:24.441 [pool-1-thread-1] INFO toby.live.CFuture - thenApplyAsync 2
18:32:24.447 [pool-1-thread-2] INFO toby.live.CFuture - thenAcceptAsync 6





■ LoadTest.java
package toby.live;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StopWatch;
import org.springframework.web.client.RestTemplate;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
public class LoadTest {
    static AtomicInteger counter = new AtomicInteger(0);
    public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
        ExecutorService es = Executors.newFixedThreadPool(100);
        RestTemplate rt = new RestTemplate();
        String url = "http://localhost:8080/rest?idx={idx}";;
        CyclicBarrier barrier = new CyclicBarrier(101);
        StopWatch main = new StopWatch();
        main.start();
        for (int i=0; i<100; i++){
            es.submit(()->{
                int idx = counter.addAndGet(1);
                barrier.await();
                log.info("Thread {}", idx);
                StopWatch sw = new StopWatch();
                sw.start();
                String res = rt.getForObject(url,String.class, idx);
                sw.stop();
                log.info("Elapsed: {} {} / {}", idx, sw.getTotalTimeSeconds(), res);
                return null;
            });
        }
        barrier.await();
        es.shutdown();
        es.awaitTermination(100, TimeUnit.SECONDS);
        main.stop();
        log.info("Total: {}", main.getTotalTimeSeconds());
    }
}


■ Tobytv011Application.java
package toby.live;
import io.netty.channel.nio.NioEventLoopGroup;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.http.ResponseEntity;
import org.springframework.http.client.Netty4ClientHttpRequestFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.AsyncRestTemplate;
import org.springframework.web.context.request.async.DeferredResult;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Function;
@SpringBootApplication
@RestController
@EnableAsync
public class Tobytv011Application {
    AsyncRestTemplate rt = new AsyncRestTemplate(new Netty4ClientHttpRequestFactory(new NioEventLoopGroup(1)));
    @Autowired
    MyService myService;
    static final String URL1 = "http://localhost:8081/service?req={req}";;
    static final String URL2 = "http://localhost:8081/service2?req={req}";;
    @GetMapping("/rest")
    public DeferredResult<String> rest(int idx) {
        DeferredResult<String> dr = new DeferredResult<>();
        toCF(rt.getForEntity(URL1, String.class, "h" + idx))
        .thenCompose(s -> {
            //if (1==1) throw new RuntimeException("ERROR");
            return toCF(rt.getForEntity(URL2, String.class, s.getBody()));
        })
        .thenApplyAsync(s2 -> myService.work(s2.getBody()))
        .thenAccept(s3 -> dr.setResult(s3))
        .exceptionally(e -> {dr.setErrorResult(e.getMessage()); return (Void)null;});
        return dr;
    }
    <T> CompletableFuture<T> toCF(ListenableFuture<T> lf) {
        CompletableFuture<T> cf = new CompletableFuture<T>();
        lf.addCallback(s -> cf.complete(s), e -> cf.completeExceptionally(e));
        return cf;
    }
    public static class AcceptCompletion<S> extends Completion<S,Void> {
        Consumer<S> con;
        public AcceptCompletion(Consumer<S> con) {
            this.con = con;
        }
        @Override
        void run(S value) {
            con.accept(value);
        }
    }
    public static class ErrorCompletion<T> extends Completion<T,T> {
        Consumer<Throwable> econ;
        public ErrorCompletion(Consumer<Throwable> econ) {
            this.econ = econ;
        }
        @Override
        void run(T value) {
            if (next != null) next.run(value);
        }
    }
    public static class ApplyCompletion<S,T> extends Completion<S,T> {
        Function<S, ListenableFuture<T>> fn;
        public ApplyCompletion(Function<S, ListenableFuture<T>> fn) {
            this.fn = fn;
        }
        @Override
        void run(S value) {
            ListenableFuture<T> lf = fn.apply(value);
            lf.addCallback(s->complete(s), e->error(e));
        }
    }
    public static class Completion<S,T>{
        Completion next;
        public void andAccept(Consumer<T> con){
            Completion<T, Void> c = new AcceptCompletion(con);
            this.next = c;
        }
        public Completion<T,T> andError(Consumer<Throwable> econ){
            Completion<T,T> c = new ErrorCompletion<>(econ);
            this.next = c;
            return c;
        }
        public <V> Completion<T,V> andApply(Function<T, ListenableFuture<V>> fn){
            Completion<T,V> c = new ApplyCompletion<>(fn);
            this.next = c;
            return c;
        }
        public static <S,T> Completion<S,T> from(ListenableFuture<T> lf) {
            Completion<S,T> c = new Completion<>();
            lf.addCallback(s->{
                c.complete(s);
            }, e->{
                c.error(e);
            });
            return c;
        }
        void error(Throwable e) {
            if (next != null) next.error(e);
        }
        void complete(T s) {
            if (next != null) next.run(s);
        }
        void run(S value) {
        }
    }
    @Service
    public static class MyService {
        public String work(String req) {
            return req + "/asyncwork";
        }
    }
    @Bean
    public ThreadPoolTaskExecutor myThreadPool() {
        ThreadPoolTaskExecutor te = new ThreadPoolTaskExecutor();
        te.setCorePoolSize(1);
        te.setMaxPoolSize(1);
        return te;
    }
    public static void main(String[] args) {
        SpringApplication.run(Tobytv011Application.class, args);
    }
}



■ ReomoteService.java
package toby.live;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@SpringBootApplication
public class RemoteService {
@RestController
public static class MyController {
@GetMapping("/service")
public String service(String req) throws InterruptedException {
Thread.sleep(1000);
//throw new RuntimeException();
return req + "/service1"; // html/text
}

@GetMapping("/service2")
public String service2(String req) throws InterruptedException {
Thread.sleep(1000);
return req + "/service2"; // html/text
}
}


public static void main(String[] args) {
System.setProperty("SERVER.PORT","8081");
System.setProperty("server.tomcat.max-threads","1000");

SpringApplication.run(RemoteService.class, args);
}


}

■ application.properties
server.tomcat.max-threads=1



(시청일 : 20171126)






■ LoadTest.java
package toby.live;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StopWatch;
import org.springframework.web.client.RestTemplate;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
public class LoadTest {
    static AtomicInteger counter = new AtomicInteger(0);
    public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
        ExecutorService es = Executors.newFixedThreadPool(100);
        RestTemplate rt = new RestTemplate();
        String url = "http://localhost:8080/rest?idx={idx}";;
        CyclicBarrier barrier = new CyclicBarrier(101);
        StopWatch main = new StopWatch();
        main.start();
        for (int i=0; i<100; i++){
            es.submit(()->{
                int idx = counter.addAndGet(1);
                barrier.await();
                log.info("Thread {}", idx);
                StopWatch sw = new StopWatch();
                sw.start();
                String res = rt.getForObject(url,String.class, idx);
                sw.stop();
                log.info("Elapsed: {} {} / {}", idx, sw.getTotalTimeSeconds(), res);
                return null;
            });
        }
        barrier.await();
        es.shutdown();
        es.awaitTermination(100, TimeUnit.SECONDS);
        main.stop();
        log.info("Total: {}", main.getTotalTimeSeconds());
    }
}



■ Tobytv010Application.java
package toby.live;
import io.netty.channel.nio.NioEventLoopGroup;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.http.ResponseEntity;
import org.springframework.http.client.Netty4ClientHttpRequestFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.AsyncRestTemplate;
import org.springframework.web.context.request.async.DeferredResult;
import java.util.function.Consumer;
import java.util.function.Function;
@SpringBootApplication
@RestController
@EnableAsync
public class Tobytv010Application {
    AsyncRestTemplate rt = new AsyncRestTemplate(new Netty4ClientHttpRequestFactory(new NioEventLoopGroup(1)));
    @Autowired
    MyService myService;
    static final String URL1 = "http://localhost:8081/service?req={req}";;
    static final String URL2 = "http://localhost:8081/service2?req={req}";;
    @GetMapping("/rest")
    public DeferredResult<String> rest(int idx) {
        DeferredResult<String> dr = new DeferredResult<>();
//            ListenableFuture<ResponseEntity<String>> f1 = rt.getForEntity(URL1, String.class, "h" + idx);
//            f1.addCallback(s->{
//                ListenableFuture<ResponseEntity<String>> f2 = rt.getForEntity(URL2, String.class, s.getBody());
//
//                f2.addCallback(s2->{
//                    ListenableFuture<String> f3 = myService.work(s2.getBody());
//                    f3.addCallback(s3->{
//                        dr.setResult(s3);
//                    }, e->{
//                        dr.setErrorResult(e.getMessage());
//                    });
//                }, e->{
//                    dr.setErrorResult(e.getMessage());
//                });
//            }, e->{
//                dr.setErrorResult(e.getMessage());
//            });
        Completion
                .from(rt.getForEntity(URL1, String.class, "h" + idx))
                .andApply(s->rt.getForEntity(URL2, String.class, s.getBody()))
                .andApply(s->myService.work(s.getBody()))
                .andError(e->dr.setErrorResult("Error [" + e.toString() + "]"))
                .andAccept(s->dr.setResult(s));
        return dr;
    }
    
    public static class AcceptCompletion<S> extends Completion<S,Void> {
        Consumer<S> con;
        public AcceptCompletion(Consumer<S> con) {
            this.con = con;
        }
        @Override
        void run(S value) {
            con.accept(value);
        }
    }
    public static class ErrorCompletion<T> extends Completion<T,T> {
        Consumer<Throwable> econ;
        public ErrorCompletion(Consumer<Throwable> econ) {
            this.econ = econ;
        }
        @Override
        void run(T value) {
            if (next != null) next.run(value);
        }
    }
    public static class ApplyCompletion<S,T> extends Completion<S,T> {
        Function<S, ListenableFuture<T>> fn;
        public ApplyCompletion(Function<S, ListenableFuture<T>> fn) {
            this.fn = fn;
        }
        @Override
        void run(S value) {
            ListenableFuture<T> lf = fn.apply(value);
            lf.addCallback(s->complete(s), e->error(e));
        }
    }
    public static class Completion<S,T>{
        Completion next;
        public void andAccept(Consumer<T> con){
            Completion<T, Void> c = new AcceptCompletion(con);
            this.next = c;
        }
        public Completion<T,T> andError(Consumer<Throwable> econ){
            Completion<T,T> c = new ErrorCompletion<>(econ);
            this.next = c;
            return c;
        }
        public <V> Completion<T,V> andApply(Function<T, ListenableFuture<V>> fn){
            Completion<T,V> c = new ApplyCompletion<>(fn);
            this.next = c;
            return c;
        }
        public static <S,T> Completion<S,T> from(ListenableFuture<T> lf) {
            Completion<S,T> c = new Completion<>();
            lf.addCallback(s->{
                c.complete(s);
            }, e->{
                c.error(e);
            });
            return c;
        }
        void error(Throwable e) {
            if (next != null) next.error(e);
        }
        void complete(T s) {
            if (next != null) next.run(s);
        }
        void run(S value) {
        }
    }
    @Service
    public static class MyService {
        @Async
        public ListenableFuture<String> work(String req) { return new AsyncResult<>(req + "/asyncwork"); }
    }
    @Bean
    public ThreadPoolTaskExecutor myThreadPool() {
        ThreadPoolTaskExecutor te = new ThreadPoolTaskExecutor();
        te.setCorePoolSize(1);
        te.setMaxPoolSize(1);
        return te;
    }
    public static void main(String[] args) {
        SpringApplication.run(Tobytv010Application.class, args);
    }
}




■ RemoteService.java
package toby.live;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@SpringBootApplication
public class RemoteService {
    @RestController
    public static class MyController {
        @GetMapping("/service")
        public String service(String req) throws InterruptedException {
            Thread.sleep(1000);
            //throw new RuntimeException();
            return req + "/service1";  // html/text
        }
        @GetMapping("/service2")
        public String service2(String req) throws InterruptedException {
            Thread.sleep(1000);
            return req + "/service2";  // html/text
        }
    }
    public static void main(String[] args) {
        System.setProperty("SERVER.PORT","8081");
        System.setProperty("server.tomcat.max-threads","1000");
        SpringApplication.run(RemoteService.class, args);
    }
}


■ application.properties

server.tomcat.max-threads=1


(시청일 : 20171119)


[스프링 리액티브 웹 개발 5부. 비동기 RestTemplate과 비동기 MVC의 결합]




- CyclicBarrier : 자바에서 사용하는 simple 동기화 기법

<Callable과 Runnable 의  차이점>

Callable
Runnable
리턴값 존재 여부
X
Exception 던지도록 선언되어있는지 여부
X

- ListenableFuture : 비동기 작업 결과를 가져올 수 있는 future 타입인데, 성공/실패 시의 콜백을 등록할 수 있다.

- DeferredResult : 컨트롤러의 리턴을 별도의 작업에서 할 수 있게 해준다. 하지만 이스레드는 스프링에의해 관리 되는 것이 아니다.
 DeferredResult클래스는 어떤 요청에 대한 응답을 이벤트를 Queue에 저장하고 있다가 DeferredResult.setResult() 메소드가 호출되면 DispatcherSerlvet으로 응답을 보낸다.  즉 서버가 Push하는 기술들을 구현할 수 있게 해준다.



<자바의 쓰레드풀의 동작원리>
큐를 먼저 채우다가 큐가 다 차면, maxPoolSize까지 쓰레드를 더 추가로 늘렸다가 쓰레드가 다 차면, 오류 발생



■ Tobytv009Application.java
package toby.live.asyncrest;
import io.netty.channel.nio.NioEventLoopGroup;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.AutoConfigureOrder;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.http.ResponseEntity;
import org.springframework.http.client.Netty4ClientHttpRequestFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.AsyncRestTemplate;
import org.springframework.web.client.RestTemplate;
import org.springframework.web.context.request.async.DeferredResult;
@SpringBootApplication
@EnableAsync
public class Tobytv009Application {
    @RestController
    public static class MyController {
        //RestTemplate rt = new RestTemplate(); // 블로킹 방식  => 각각 2초 작업시간이 걸리는 100개의 API 호출 시에 하나의 쓰레드로 처리하기 때문에 약 200초 걸림
        //AsyncRestTemplate rt = new AsyncRestTemplate(); // 논블로킹 방식 => 각각 2초 작업시간이 걸리는 100개의 API 호출 시에 100개의 쓰레드로 처리하기 때문에 약 2초 걸림
        AsyncRestTemplate rt = new AsyncRestTemplate(new Netty4ClientHttpRequestFactory(new NioEventLoopGroup(1))); // 논블로킹 방식 => 각각 2초 작업시간이 걸리는 100개의 API 호출 시에 1개의 쓰레드로 처리하기 때문에 약 2초 걸림
//        @GetMapping("/rest")
//        public String rest(int idx) {
//            String res = rt.getForObject("http://localhost:8081/service?req={req}";, String.class, "hello" + idx);
//
//            return res;  // html/text
//        }
//        @GetMapping("/rest")
//        public ListenableFuture<ResponseEntity<String>> rest(int idx) {
//            return rt.getForEntity("http://localhost:8081/service?req={req}";, String.class, "hello" + idx);
//        }
        @Autowired MyService myService;
        @GetMapping("/rest")
        public DeferredResult<String> rest(int idx) {   // 결과를 가공하여 리턴 또는 다른 API와 의존적인 관계로 만드는 법
            DeferredResult<String> dr = new DeferredResult<>();
            ListenableFuture<ResponseEntity<String>> f1 = rt.getForEntity("http://localhost:8081/service1?req={req}";, String.class, "hello" + idx);
            f1.addCallback(s1->{
                ListenableFuture<ResponseEntity<String>> f2 = rt.getForEntity("http://localhost:8081/service2?req={req}";, String.class, s1.getBody());
                f2.addCallback(s2->{
                     //dr.setResult(s2.getBody());
                    ListenableFuture<String> f3 =  myService.work(s2.getBody());
                    f3.addCallback(s3->{
                        dr.setErrorResult(s3);
                    },e3->{
                        dr.setErrorResult(e3.getMessage());
                    });
                },e2->{
                    dr.setErrorResult(e2.getMessage());
                });
            }, e1->{
                dr.setErrorResult(e1.getMessage());
            });
            return dr;
        }
    }
    @Service
    public static class MyService {
        @Async
        public ListenableFuture<String> work(String req) {
            return new AsyncResult<>(req + "/asyncwork");
        }
    }
    @Bean
    ThreadPoolTaskExecutor myThreadPool() {  //
        ThreadPoolTaskExecutor te = new ThreadPoolTaskExecutor();
        te.setCorePoolSize(1);
        te.setMaxPoolSize(1);
        te.initialize();
        return te;
    }
    public static void main(String[] args) {
        SpringApplication.run(Tobytv009Application.class, args);
    }
}


■ RemoteService.java
package toby.live.asyncrest;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@SpringBootApplication
public class RemoteService {
    @RestController
    public static class MyController {
        @GetMapping("/service1")
        public String service1(String req) throws InterruptedException {
            Thread.sleep(2000);
            return req + "/service1";  // html/text
        }
        @GetMapping("/service2")
        public String service2(String req) throws InterruptedException {
            Thread.sleep(2000);
            return req + "/service2";  // html/text
        }
    }
    public static void main(String[] args) {
        System.setProperty("SERVER.PORT","8081");
        System.setProperty("server.tomcat.max-threads","1000");
        SpringApplication.run(RemoteService.class, args);
    }
}


■ LoadTest.java

package toby.live.asyncrest;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StopWatch;
import org.springframework.web.client.RestTemplate;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
public class LoadTest {
    static AtomicInteger counter = new AtomicInteger(0);
    public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
        ExecutorService es = Executors.newFixedThreadPool(100);
        RestTemplate rt = new RestTemplate();
        String url = "http://localhost:8080/rest?idx={idx}";;
        CyclicBarrier barrier = new CyclicBarrier(101);
        StopWatch main = new StopWatch();
        main.start();
        for (int i=0; i<100; i++){
            es.submit(()->{
                int idx = counter.addAndGet(1);
                barrier.await();
                log.info("Thread {}", idx);
                StopWatch sw = new StopWatch();
                sw.start();
                String res = rt.getForObject(url,String.class, idx);
                sw.stop();
                log.info("Elapsed: {} {} / {}", idx, sw.getTotalTimeSeconds(), res);
                return null;
            });
        }
        barrier.await();
        es.shutdown();
        es.awaitTermination(100, TimeUnit.SECONDS);
        main.stop();
        log.info("Total: {}", main.getTotalTimeSeconds());
    }
}


(시청일 : 20170924)


- https://www.youtube.com/watch?v=01sdXvZSjcI : 스프링에서 사용되는 ParameterizedTypeReference의 작동 원리인 Super Type Token의 동작 원리와 활용법을 알아봅니다.




■ Type Token : 타입 정보를 key로 넘겨서 value를 리턴


■ Type Safe Token 예제 코드

public class TypeToken {
    static Class TypeSafeMap {
        Map<Class<?>, Object> map = new HashMap<>();
        <T> void put(Class<T> clazz, T value) {
            map.put(clazz, value);
        }
        <T> T get(Class<T> clazz) {
            return clazz.cast(map.get(clazz));
        }
    }
    public static void main(String[] args) throws Exception {
        TypeSafeMap m = new TypeSafeMap();
        m.put(Integer.class, 1);
        m.put(String.class, "String");
        m.put(List.class, Arrays.asList(1,2,3));
        System.out.println(m.get(Integer.class));
        System.out.println(m.get(String.class));
        System.out.println(m.get(List.class));
    }
}




■ Super Type Token 예제 코드   => 메모리 누수를 야기하는 등 좋지 않은 코드라서, 2.5회에서 개선 됨.

public class SuperTypeToken {
    static Class TypeSafeMap {
        Map<TypeReference<?>, Object> map = new HashMap<>();
    <T> void put(TypeReference<T> tr, T value) {
        map.put(tr, value);
    }
    <T> T get(TypeReference<T> tr) {
        if (tr.type instanceof Class<?>)
            return ((Class<T>)tr.type).cast(map.get(tr));
        else
            return ((Class<T>)((ParameterizedType)tr.type).getRawType()).cast(map.get(tr)); // TR<List<String>>
    }
}
    static class TypeReference<T> {
        Type type;
        public TypeReference() {
            Type stype = getClass().getGenericSuperclass();
            if (stype instanceof ParameterizedType) {
                this.type = ((ParameterizedType) stype).getActualTypeArguments()[0];
            } else throw new RuntimeException();
        }
        @Override
        public boolean equals(Object o) {
            if (this == o) return true;
            if (o == null || getClass().getGenericSuperclass() != o.getClass().getGenericSuperclass()) return false;
            TypeReference that = (TypeReference) o;
            return type.equals(that.type);
        }
        @Override
        public int hashCode() {
            return type.hashCode();
        }
    }
   public static void main(String[] args) throws Exception {
        TypeReference m = new TypeSafeMap();
        m.put(new TypeReference<Integer>() {}, 1);
        m.put(new TypeReference<String>() {}, "String");
        m.put(new TypeReference<List<Integer>>() {}, Arrays.asList(1,2,3)); // List<Integer>
        m.put(new TypeReference<List<String>>() {}, Arrays.asList("a","b","c")); // List<String>
        m.put(new TypeReference<List<List<String>>>() {}, Arrays.asList(Arrays.asList("a"),Arrays.asList("b"),Arrays.asList("c"))); // List<String>

        System.out.println(m.get(new TypeReference<Integer>() {}));
        System.out.println(m.get(new TypeReference<String>() {}));
        System.out.println(m.get(new TypeReference<List<Integer>>() {}));
        System.out.println(m.get(new TypeReference<List<String>>() {}));
        System.out.println(m.get(new TypeReference<List<List<String>>>() {}));
}
}




■ 위의 코드와 유사한 Spring 3.2 버전 이상에서 지원하는
ParameterizedTypeReference의 사용 예제
public class SpringTypeReference {
    public static void main(String[] args) {
        ParameterizedTypeReference<?> typeRef = new ParameterizedTypeReference<List<Map<Set<Integer>, String>>>() {};
        System.out.println(typeRef.getType());
    }
}
 



■ ParameterizedTypeReference의 사용 사례

@SpringBootApplication
public class TobyTv002liveApplication {
    public static void main(String[] args) {
        SpringApplication.run(TobyTv002liveApplication.class, args);
    }
    @RestController
    public static class MyController {
        @RequestMapping("/")
        public List<User> users() {
            return Arrays.asList(new User("A"), new User("B"), new User("C"));
        }
    }
    public static class User {
        String name;
        public User(String name) {
            this.name = name;
        }
        public User() {
        }
        public String getName() {
            return name;
        }
        @Override
        public String toString() {
            return "User[" +
                    "name='" + name + '\'' +
                    ']';
        }
    }
}

public class SpringTypeReference {
    public static void main(String[] args) {
        RestTemplate rt = new RestTemplate();
        // 타입 safe하지 않음
//        List<Map> users = rt.getForObject("http://localhost:8080";, List.class);
//        System.out.println(users.get(0).getName("name"));
        List<User> users = rt.exchange("http://localhost:8080";,
                HttpMethod.Get, null, new ParameterizedTypeReference<List<User>>() {}).getBody();
        users.forEach(System.out::println );
    }
}


(시청일 : 20190917)


- https://www.youtube.com/watch?v=s-tXAHub6vg :  객체지향의 재사용성과 다이나믹 디스패치, 더블 디스패치에 관한 이야기를 코드를 만들어가면서 설명합니다.
- http://limmmee.tistory.com/28
- https://zetawiki.com/wiki/%EC%A0%9C%EC%96%B4%EC%9D%98_%EC%97%AD%EC%A0%84_IoC



■ 스프링 = Dependency + Injection + Framework

■ Dependency 관계 : Supplier의 변화가 Client에 영향을 주는 경우
- 의존 관계 발생
    -> Supplier가 Client의 필드
    -> Supplier가 Client 메소드의 파라미터
    -> Supplier가 Client의 로컬 변수
    -> Supplier로 메시지를 보냄
- 객체지향 설계/개발 : 재사용성이 높다.
- 의존 관계 : Client는 재사용이 어렵고, Client는 컴포넌트/서비스가 될 수 없다.

■ 컴포넌트 : 개발자의 손이 미치지 않는 곳에서도 아무 변경 없이 필요에 따라 확장해서 사용될 수 있는 소프트웨어 덩어리다.     

■ 오브젝트 패턴 : 런타임 시에 바뀔 수 있는 (상속 관계보다) 더 동적은 오브젝트 의존 관계를 다룬다.

■ Dependency : (런타임 시에 결정/구성되는 오브젝트의) 의존 관계
    - 구현 대신 인터페이스 사용
        -> 클래스(구현) 의존 관계 제거
        -> 클래스에 대한 의존성은 생성 패턴처럼 3자에게 위임
    - 오브젝트 합성(composition) 사용
        -> 재사용성을 확보하기 위한 방법 중 한 가지 (상속의 대안)
        -> 인터페이스의 사용이 전제 (블랙 박스 재사용)
        -> 새롭고 복잡한 기능을 얻기 위해 오브젝트를 조합/합성
        -> 런타임 시에 다른 오브젝트에 대한 레퍼런스를 획득
        -> 각 클래스가 캡슐화되고 자신의 역할에 충실하게 도와줌

■ Inversion of Control         =>         Container의 역할
    - 프레임워크의 동작 방식을 설명하는 용어. 객체지향 컨텍스트.
    - 프로그램의 제어 흐름 구조가 뒤바뀌는 것. 모든 제어 권한(실행에 필요한 객체의 생성·사용 등)을 자신이 아닌 다른 대상에게 위임.
    - 오브젝트가 자신이 사용할 오브젝트를 스스로 선택/생성하지 않는다.
    - 예시
        -> 서블릿(기능 정의)-컨테이너(실제 수행)



■ 빈 : 스프링이 제어권을 가지고 직접 만들고 관계를 부여하는 오브젝트
■ 스프링 빈 ; 스프링 컨테이너가 생성, 관계 설정, 사용 등을 제어해주는 제어의 역전이 적용된 오브젝트
■ 빈팩토리/애플리케이션 컨텍스트 : 빈의 생성과 관계 설정 같은 제어를 담당하는 IOC 오브젝트


■ Dispatch
    - Static Dispatch    => 메소드 오버로딩과 관련 있음.
    - Dynamic Dispatch   => 컴파일 시간에 reciever parameter(메소드 호출할 시에 사용되는 오브젝트 변수)만 타입 추적 가능
        -> Dependency(런타임 시에 결정/구성되는 오브젝트 의존 관계) 가 가능해지게 함.
    - Double(Multiple) (Dynamic) Dispatch
        -> Visitor 패턴의 근본적인 형태.
        -> 컴파일 시간에 reciever parameter(메소드 호출할 시에 사용되는 오브젝트 변수) 뿐만 아니라, 메소드의 파라미터 타입 추적 가능


■ Double Dispatch 예제 코드

public class Dispatch {
    interface Post {
        void postOn(SNS sns);
    }
    static class Text implements Post {
        public void postOn(SNS sns){
            sns.post(this);
        }
    }
    static class Picture implements Post {
        public void postOn(SNS sns){
            sns.post(this);
        }
    }
    interface SNS {
        void post(Text post);
        void post(Picture post);
    }
    static class Facebook implements SNS {
        public void post(Text post) {    System.out.println("text-facebook");      }
        public void post(Picture post) {    System.out.println("picture-facebook"); }      }
    }
    static class Twitter implements SNS {
        public void post(Text post) {    System.out.println("text-twitter");      }
        public void post(Picture post) {    System.out.println("picture-twitter"); }      }
    }
    static class GooglePlus implements SNS {
        public void post(Text post) {    System.out.println("text-googlePlus");      }
        public void post(Picture post) {    System.out.println("picture-googlePlus"); }      }
    }
    public static void main(String[] args){
        List<Post> posts = Arrays.asList(new Text(), new Picture());
        List<SNS> sns = Arrays.asList(new Facebook(), new Twitter(), new GooglePlus());

        posts.forEach(p -> sns.forEach((Sns s) -> p.postOn(s)));
    }
}


■ Method Signature (name, parameter types)  => 오버라이딩 관계 여부와 관련 있음
    - 두 메소드의 return type 및 Method Signature 가 서로 동일 하면, 오버라이딩 관계

■ Method Type (return type, method type parameter, method argument types, exception) => Method Reference 사용 가능 여부와 관련 있음


■ overloading(중복)과 overriding(재정의) 성립 조건

구분
overloading
overriding
리턴 타입
상관 없음
일치
메소드 이름
일치
일치
매개변수 타입/개수
불일치
일치



■ Visitor 패턴

    - AnnotationVisitor 클래스 참고


(시청일 : 20171112)




<자바의 비동기 기술>
  1. Future
  2. Callback

<Spring MVC의 비동기 기술>
  1. Callable
  2. Deferred Result
  3. Response body emitter



■ Future  : 비동기 작업의 결과를 가져오는 핸들러
package toby;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@Slf4j
public class FutureEx {
    public static void main(String[] args) throws InterruptedException{
        ExecutorService es = Executors.newCachedThreadPool();
        es.execute(()-> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {}
            log.info("Async");
        });
        log.info("Exit");
    }
}


package toby;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@Slf4j
public class FutureEx {
    public static void main(String[] args) throws InterruptedException, ExecutionException{
        ExecutorService es = Executors.newCachedThreadPool();
        Future<String> f = es.submit(()-> {
            Thread.sleep(2000);
            log.info("Async");
            return "Hello";
        });
        System.out.println(f.isDone());
        Thread.sleep(2100);
        log.info("Exit");
        System.out.println(f.isDone());
        System.out.println(f.get()); // Bloking method
    }
}


package toby;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.stereotype.Component;
import java.util.concurrent.Future;
@SpringBootApplication
@Slf4j
@EnableAsync
public class Tobytv8Application {
    @Component
    public static class MyService {
        @Async
        public Future<String> hello() throws InterruptedException {
            log.debug("hello()");
            Thread.sleep(2000);
            return new AsyncResult<>( "Hello");
        }
    }
    public static void main(String[] args) {
        try (ConfigurableApplicationContext    c = SpringApplication.run(Tobytv8Application.class, args)) {
        }
    }
    @Autowired
    MyService myService;
    @Bean
    ApplicationRunner run() {
        return args -> {
            log.info("run()");
            Future<String> f = myService.hello();
            log.info("exit: " + f.isDone());
            log.info("result: " + f.get());
        };
    }
}


package toby;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.*;
@Slf4j
public class FutureEx {
    public static void main(String[] args) throws InterruptedException, ExecutionException{
        ExecutorService es = Executors.newCachedThreadPool();
        FutureTask<String> f = new FutureTask<String>(()->{
            Thread.sleep(2000);
            log.info("Async");
            return "Hello";
        }) {
            @Override
            protected void done() {
                try {
                    System.out.println(get());
                }catch(InterruptedException e) {
                    e.printStackTrace();
                }catch(ExecutionException e) {
                    e.printStackTrace();
                }
            }
        };
        es.execute(f);
        es.shutdown();
    }
}



■ Callback

package toby;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.concurrent.Callable;
@SpringBootApplication
@Slf4j
@EnableAsync
public class Tobytv8Application {
    @RestController
    public static class MyController {
        @GetMapping("/callable")
        public Callable<String> async() throws InterruptedException {
            log.info("callable");
            return ()-> {
                log.info("async");
                Thread.sleep(2000);
                return "hello";
            };
        }
    }
    public static void main(String[] args) {
        SpringApplication.run(Tobytv8Application.class, args);
    }
}



package toby;
import lombok.extern.slf4j.Slf4j;
import java.util.Objects;
import java.util.concurrent.*;
@Slf4j
public class FutureEx {
    interface SuccessCallback {
        void onSuccess(String result);
    }
    public static class CallbackFutureTask extends FutureTask<String> {
        SuccessCallback sc;
        public CallbackFutureTask(Callable<String> callable, SuccessCallback sc) {
            super(callable);
            this.sc = Objects.requireNonNull(sc);
        }
        @Override
        protected void done() {
            try {
                sc.onSuccess(get());
            }catch(InterruptedException e){
                e.printStackTrace();
            }catch(ExecutionException e){
                e.printStackTrace();
            }
        }
    }
    public static void main(String[] args) throws InterruptedException, ExecutionException{
        ExecutorService es = Executors.newCachedThreadPool();
        CallbackFutureTask f = new CallbackFutureTask(() -> {
            Thread.sleep(2000);
            log.info("Async");
            return "Hello";
        }, System.out::println);
        es.execute(f);
        es.shutdown();
    }
}




package toby;
import lombok.extern.slf4j.Slf4j;
import java.util.Objects;
import java.util.concurrent.*;
@Slf4j
public class FutureEx {
    interface SuccessCallback {
        void onSuccess(String result);
    }
    interface ExceptionCallback {
        void onError(Throwable t);
    }
    public static class CallbackFutureTask extends FutureTask<String> {
        SuccessCallback sc;
        ExceptionCallback ec;
        public CallbackFutureTask(Callable<String> callable, SuccessCallback sc, ExceptionCallback ec) {
            super(callable);
            this.sc = Objects.requireNonNull(sc);
            this.ec = Objects.requireNonNull(ec);
        }
        @Override
        protected void done() {
            try {
                sc.onSuccess(get());
            }catch(InterruptedException e){
                Thread.currentThread().interrupt();
            }catch(ExecutionException e){
                ec.onError(e.getCause());
            }
        }
    }
    public static void main(String[] args) throws InterruptedException, ExecutionException{
        ExecutorService es = Executors.newCachedThreadPool();
        CallbackFutureTask f = new CallbackFutureTask(() -> {
            Thread.sleep(2000);
            if(1==1) throw new RuntimeException("Async ERROR!!!");
            log.info("Async");
            return "Hello";
        },
                s-> System.out.println("Result: " + s),
                e-> System.out.println("Error: " + e.getMessage()));
        es.execute(f);
        es.shutdown();
    }
}




■ ListernableFuture
package toby;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
@SpringBootApplication
@Slf4j
@EnableAsync
public class Tobytv8Application {
    @Component
    public static class MyService {
        @Async("tp")
        public ListenableFuture<String> hello() throws InterruptedException {
            log.debug("hello()");
            Thread.sleep(2000);
            return new AsyncResult<>( "Hello");
        }
    }
    @Bean // @Async 대신 사용할 수 있는 방법
    ThreadPoolTaskExecutor tp() {
        ThreadPoolTaskExecutor te = new ThreadPoolTaskExecutor();
        te.setCorePoolSize(10);
        te.setMaxPoolSize(100);
        te.setQueueCapacity(200);
        te.setThreadNamePrefix("mythread");
        te.initialize();
        return te;
    }
    public static void main(String[] args) {
        try (ConfigurableApplicationContext    c = SpringApplication.run(Tobytv8Application.class, args)) {
        }
    }
    @Autowired
    MyService myService;
    @Bean
    ApplicationRunner run() {
        return args -> {
            log.info("run()");
            ListenableFuture<String> f = myService.hello();
            f.addCallback(s-> System.out.println(s), e-> System.out.println(e.getMessage()));
            log.info("exit");
        };
    }
}



@Async // 계속 쓰레드를 생성하므로 실전에 절대 사용 하면 안됨
@Async("tp") // 비동기 작업이 여러개 이고, 쓰레드 풀을 분리해서 적용하는 방법


■ CompletableFuture
(생략)




- Servlet은 기본적으로 Blocking I/0 방식이다.





■ Callable 관련 Load Test

- LoadTest.java
package toby;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StopWatch;
import org.springframework.web.client.RestTemplate;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
public class LoadTest {
    static AtomicInteger counter = new AtomicInteger(0);
    public static void main(String[] args) throws InterruptedException{
        ExecutorService es = Executors.newFixedThreadPool(100);
        RestTemplate rt = new RestTemplate();
        String url = "http://localhost:8080/callable";;
        StopWatch main = new StopWatch();
        main.start();
        for(int i=0; i<100; i++){
            es.execute(()-> {
                int idx = counter.addAndGet(1);
                log.info("Thread: {}", idx);
                StopWatch sw = new StopWatch();
                sw.start();
                rt.getForObject(url, String.class);
                sw.stop();
                log.info("Elapsed: {} {}", idx ,sw.getTotalTimeSeconds());
            });
        }
        es.shutdown();
        es.awaitTermination(100, TimeUnit.SECONDS);
        main.stop();
        log.info("Total: {}", main.getTotalTimeSeconds());
    }
}


- Tobytv8Applicaiton.java
package toby;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.concurrent.Callable;
@SpringBootApplication
@Slf4j
@EnableAsync
public class Tobytv8Application {
    @RestController
    public static class MyController {
        @GetMapping("/callable")
        public Callable<String> callable() throws InterruptedException {
            log.info("callable");
            return ()-> {
                log.info("async");
                Thread.sleep(2000);
                return "hello";
            };
        }
//        public String callable() throws InterruptedException {
//            log.info("async");
//            Thread.sleep(2000);
//            return "hello";
//        }
    }
    public static void main(String[] args) {
        SpringApplication.run(Tobytv8Application.class, args);
    }
}



- application.properties
server.tomcat.max-threads=1


■ DeferredResult : 어떤 요청에 의해서, 기존의 지연되어있는 HTTP응답을 나중에 써줄 수 있게 하는 기술

package toby;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.async.DeferredResult;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedDeque;
@SpringBootApplication
@Slf4j
@EnableAsync
public class Tobytv8Application {
    @RestController
    public static class MyController {
        Queue<DeferredResult<String>> results = new ConcurrentLinkedDeque<>();
        @GetMapping("/dr")
        public DeferredResult<String> callable() throws InterruptedException {
            log.info("dr");
            DeferredResult<String> dr = new DeferredResult<>(600000L);
            results.add(dr);
            return dr;
        }
        @GetMapping("/dr/count")
        public String drcount() {
            return String.valueOf(results.size());
        }
        @GetMapping("/dr/event")
        public String drevent(String msg) {
            for(DeferredResult<String> dr : results) {
                dr.setResult("Hello " + msg);
                results.remove(dr);
            }
            return "OK";
        }
    }
    public static void main(String[] args) {
        SpringApplication.run(Tobytv8Application.class, args);
    }
}




■ DeferredResult 관련 Load Test
- LoadTest.java
package toby;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StopWatch;
import org.springframework.web.client.RestTemplate;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
public class LoadTest {
    static AtomicInteger counter = new AtomicInteger(0);
    public static void main(String[] args) throws InterruptedException{
        ExecutorService es = Executors.newFixedThreadPool(100);
        RestTemplate rt = new RestTemplate();
        String url = "http://localhost:8080/dr";;
        StopWatch main = new StopWatch();
        main.start();
        for(int i=0; i<100; i++){
            es.execute(()-> {
                int idx = counter.addAndGet(1);
                log.info("Thread: {}", idx);
                StopWatch sw = new StopWatch();
                sw.start();
                rt.getForObject(url, String.class);
                sw.stop();
                log.info("Elapsed: {} {}", idx ,sw.getTotalTimeSeconds());
            });
        }
        es.shutdown();
        es.awaitTermination(100, TimeUnit.SECONDS);
        main.stop();
        log.info("Total: {}", main.getTotalTimeSeconds());
    }
}




■ emitter
package toby;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.async.DeferredResult;
import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter;
import java.util.concurrent.Executors;
@SpringBootApplication
@Slf4j
@EnableAsync
public class Tobytv8Application {
    @RestController
    public static class MyController {
        @GetMapping("/emitter")
        public ResponseBodyEmitter emitter() throws InterruptedException {
            ResponseBodyEmitter emitter = new ResponseBodyEmitter();
            Executors.newSingleThreadExecutor().submit(() -> {
                for (int i = 1; i <= 50; i++) {
                    try {
                        emitter.send("<p>Stream " + i + "</p>");
                        Thread.sleep(100);
                    } catch (Exception e) {
                    }
                }
            });
            return emitter;
        }
    }
    public static void main(String[] args) {
        SpringApplication.run(Tobytv8Application.class, args);
    }
}



(시청일 : 20171105)




■ Reactive Streams - Schedulers

package toby.live;
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Slf4j
public class ScheduleEx {
    public static void main(String[] args) {
        //pub
        Publisher<Integer> pub = sub -> {
            sub.onSubscribe(new Subscription() {
                @Override
                public void request(long n) {
                    sub.onNext(1);
                    sub.onNext(2);
                    sub.onNext(3);
                    sub.onNext(4);
                    sub.onNext(5);
                    sub.onComplete();
                }
                @Override
                public void cancel() {
                }
            });
        };
        /* SubscribeOn */
//        Publisher<Integer> subOnPub = sub -> {
//            ExecutorService es = Executors.newSingleThreadExecutor(new CustomizableThreadFactory(){
//                    @Override
//                    public String getThreadNamePrefix() {
//                        return "pubOn-";
//                    }
//            });
//            es.execute(()->pub.subscribe(sub));
//        };
        /* PublishOn */
        Publisher<Integer> pubOnSub = sub -> {
            pub.subscribe(new Subscriber<Integer>() {
                ExecutorService es = Executors.newSingleThreadExecutor(new CustomizableThreadFactory(){
                    @Override
                    public String getThreadNamePrefix() {
                        return "subOn-";
                    }
                });
                @Override
                public void onSubscribe(Subscription s) {
                    sub.onSubscribe(s);
                }
                @Override
                public void onNext(Integer integer) {
                    sub.onNext(integer);
                }
                @Override
                public void onError(Throwable t) {
                     es.execute(()->sub.onError(t));
                     es.shutdown();
                }
                @Override
                public void onComplete() {
                    es.execute(()->sub.onComplete());
                    es.shutdown();
                }
            });
        };
        //sub
        pubOnSub.subscribe(new Subscriber<Integer>() {
            @Override
            public void onSubscribe(Subscription s) {
                log.debug("onSubscribe");
                s.request(Long.MAX_VALUE);
            }
            @Override
            public void onNext(Integer integer) {
                log.debug("onNext:{}", integer);
            }
            @Override
            public void onError(Throwable t) {
                log.debug("onError:{}",t);
            }
            @Override
            public void onComplete() {
                log.debug("onComplete");
            }
        });
        System.out.println("exit");
    }
}


package toby.live;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
public class FluxScEx {
    public static void main(String[] args) {
        Flux.range(1,10)
                .publishOn(Schedulers.newSingle("pub"))
                .log()
                //.subscribeOn(Schedulers.newSingle("sub"))
                .subscribe(System.out::println);
        System.out.println("exit");
    }
}



<쓰레드>
- user : user 쓰레드가 하나라도 남아있으면, JVM은 user 쓰레드를 종료시키지 않음
- daemon :  user 쓰레드가 하나도 남지않고 daemon 쓰레드만 남아 있을 경우, JVM은 daemon 쓰레드를 모두 강제 종료 시킴. user 쓰레드가 하나라도 남아 있으면,  JVM은 daemon 쓰레드를 종료시키지 않음.


■ interval 예제
package toby.live;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import java.time.Duration;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@Slf4j
public class FluxScEx {
    public static void main(String[] args) throws InterruptedException {
//        Flux.interval(Duration.ofMillis(500))
//                .subscribe(s->log.debug("onNext:{}", s));
//        log.debug("exit");
//        TimeUnit.SECONDS.sleep(5);
//        Executors.newSingleThreadExecutor().execute(() -> {
//            try {
//                TimeUnit.SECONDS.sleep(2);
//            }catch (InterruptedException e) {}
//            System.out.println("Hello");
//        });
//        System.out.println("exit");
        Flux.interval(Duration.ofMillis(200))
                .take(10)
                .subscribe(s->log.debug("onNext:{}",s));
        TimeUnit.SECONDS.sleep(10);
    }
}


package toby.live;
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@Slf4j
public class IntervalEx {
    public static void main(String[] args) {
        Publisher<Integer> pub = sub -> {
            sub.onSubscribe(new Subscription() {
                int no = 0;
                volatile boolean cancelled = false;
                @Override
                public void request(long n) {
                    ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor();
                    exec.scheduleAtFixedRate(() -> {
                        if(cancelled) {
                            exec.shutdown();
                            return;
                        }
                        sub.onNext(no++);
                    }, 0, 500, TimeUnit.NANOSECONDS);
                }
                @Override
                public void cancel() {
                    cancelled = true;
                }
            });
        };
        Publisher<Integer> takePub = sub -> {
            pub.subscribe(new Subscriber<Integer>() {
                int count = 0;
                Subscription subsc;
                @Override
                public void onSubscribe(Subscription s) {
                    subsc = s;
                    sub.onSubscribe(s);
                }
                @Override
                public void onNext(Integer integer) {
                    sub.onNext(integer);
                    if(++count >= 5) {
                        subsc.cancel();
                    }
                }
                @Override
                public void onError(Throwable t) {
                    sub.onError(t);
                }
                @Override
                public void onComplete() {
                    sub.onComplete();
                }
            });
        };
        pub.subscribe(new Subscriber<Integer>() {
            @Override
            public void onSubscribe(Subscription s) {
                log.debug("onSubscribe");
                s.request(Long.MAX_VALUE);
            }
            @Override
            public void onNext(Integer integer) {
                log.debug("onNext:{}", integer);
            }
            @Override
            public void onError(Throwable t) {
                log.debug("onError:{}",t);
            }
            @Override
            public void onComplete() {
                log.debug("onComplete");
            }
        });
    }
}




+ Recent posts