(시청일 : 20180114)









■ 예제 1
package toby.tobytv014;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
@SpringBootApplication
@Slf4j
@RestController
public class Tobytv014Application {
    @GetMapping("/event/{id}")
    Mono<Event> hello(@PathVariable long id) {
        return Mono.just(new Event(id, "event" + id));
    }
    public static void main(String[] args) {
        SpringApplication.run(Tobytv014Application.class, args);
    }
    @Data    @AllArgsConstructor
    public static class Event {
        long id;
        String value;
    }
}


<Terminal>
C:\Users\Soohyeon\Desktop\curl_7_53_1_openssl_nghttp2_x64>curl localhost:8080/event/123
{"id":123,"value":"event123"}




■ 예제 2
package toby.tobytv014;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@SpringBootApplication
@Slf4j
@RestController
public class Tobytv014Application {
    @GetMapping("/event/{id}")
    Mono<Event> hello(@PathVariable long id) {
        return Mono.just(new Event(id, "event" + id));
    }
    @GetMapping("/events")
    Flux<Event> events() {
        return Flux.just(new Event(1L, "event1"), new Event(2L, "event2"));
    }
    public static void main(String[] args) {
        SpringApplication.run(Tobytv014Application.class, args);
    }
    @Data    @AllArgsConstructor
    public static class Event {
        long id;
        String value;
    }
}




<Terminal>
C:\Users\Soohyeon\Desktop\curl_7_53_1_openssl_nghttp2_x64>curl localhost:8080/events
[{"id":1,"value":"event1"},{"id":2,"value":"event2"}]



■ 예제 3
package toby.tobytv014;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.Arrays;
import java.util.List;
@SpringBootApplication
@Slf4j
@RestController
public class Tobytv014Application {
    @GetMapping("/event/{id}")
    Mono<List<Event>> hello(@PathVariable long id) {
        List<Event> list = Arrays.asList(new Event(1L, "event1"), new Event(2L, "event2"));
        return Mono.just(list);
    }
    @GetMapping("/events")
    Flux<Event> events() {
        return Flux.just(new Event(1L, "event1"), new Event(2L, "event2"));
    }
    public static void main(String[] args) {
        SpringApplication.run(Tobytv014Application.class, args);
    }
    @Data    @AllArgsConstructor
    public static class Event {
        long id;
        String value;
    }
}




<Terminal>
C:\Users\Soohyeon\Desktop\curl_7_53_1_openssl_nghttp2_x64>curl localhost:8080/event/1
[{"id":1,"value":"event1"},{"id":2,"value":"event2"}]
C:\Users\Soohyeon\Desktop\curl_7_53_1_openssl_nghttp2_x64>curl localhost:8080/events
[{"id":1,"value":"event1"},{"id":2,"value":"event2"}]





■ 예제 3
package toby.tobytv014;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.Arrays;
import java.util.List;
@SpringBootApplication
@Slf4j
@RestController
public class Tobytv014Application {
    @GetMapping("/event/{id}")
    Mono<List<Event>> hello(@PathVariable long id) {
        List<Event> list = Arrays.asList(new Event(1L, "event1"), new Event(2L, "event2"));
        return Mono.just(list);
    }
    @GetMapping(value="/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    Flux<Event> events() {
        List<Event> list = Arrays.asList(new Event(1L, "event1"), new Event(2L, "event2"));
        return Flux.fromIterable(list);
    }
    public static void main(String[] args) {
        SpringApplication.run(Tobytv014Application.class, args);
    }
    @Data    @AllArgsConstructor
    public static class Event {
        long id;
        String value;
    }
}





<Terminal>
C:\Users\Soohyeon\Desktop\curl_7_53_1_openssl_nghttp2_x64>curl localhost:8080/events
data:{"id":1,"value":"event1"}
data:{"id":2,"value":"event2"}




<Flux>






<Flux.fromIterable()>






■ 예제 4
package toby.tobytv014;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Stream;
@SpringBootApplication
@Slf4j
@RestController
public class Tobytv014Application {
    @GetMapping("/event/{id}")
    Mono<List<Event>> hello(@PathVariable long id) {
        List<Event> list = Arrays.asList(new Event(1L, "event1"), new Event(2L, "event2"));
        return Mono.just(list);
    }
    @GetMapping(value="/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    Flux<Event> events() {
        return Flux
                .fromStream(Stream.generate(() -> new Event(System.currentTimeMillis(), "value")))
                .delayElements(Duration.ofSeconds(1))
                .take(10);
    }
    public static void main(String[] args) {
        SpringApplication.run(Tobytv014Application.class, args);
    }
    @Data    @AllArgsConstructor
    public static class Event {
        long id;
        String value;
    }
}







<Terminal>
C:\Users\Soohyeon\Desktop\curl_7_53_1_openssl_nghttp2_x64>curl localhost:8080/events
data:{"id":1516531806083,"value":"value"}
data:{"id":1516531806192,"value":"value"}
data:{"id":1516531806194,"value":"value"}
data:{"id":1516531806195,"value":"value"}
data:{"id":1516531806210,"value":"value"}
data:{"id":1516531806213,"value":"value"}
data:{"id":1516531806215,"value":"value"}
data:{"id":1516531806216,"value":"value"}
data:{"id":1516531806217,"value":"value"}
data:{"id":1516531806219,"value":"value"}


<Flux.take()>








■ 예제 5
package toby.tobytv014;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Stream;
@SpringBootApplication
@Slf4j
@RestController
public class Tobytv014Application {
    @GetMapping("/event/{id}")
    Mono<List<Event>> hello(@PathVariable long id) {
        List<Event> list = Arrays.asList(new Event(1L, "event1"), new Event(2L, "event2"));
        return Mono.just(list);
    }
    @GetMapping(value="/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    Flux<Event> events() {
        return Flux
                .<Event>generate(sink -> sink.next(new Event(System.currentTimeMillis(), "value")))
                .delayElements(Duration.ofSeconds(1))
                .take(10);
    }
    public static void main(String[] args) {
        SpringApplication.run(Tobytv014Application.class, args);
    }
    @Data    @AllArgsConstructor
    public static class Event {
        long id;
        String value;
    }
}





<Terminal>
C:\Users\Soohyeon\Desktop\curl_7_53_1_openssl_nghttp2_x64>curl localhost:8080/events
data:{"id":1516533395000,"value":"value"}
data:{"id":1516533396018,"value":"value"}
data:{"id":1516533397019,"value":"value"}
data:{"id":1516533398021,"value":"value"}
data:{"id":1516533399023,"value":"value"}
data:{"id":1516533400023,"value":"value"}
data:{"id":1516533401024,"value":"value"}
data:{"id":1516533402025,"value":"value"}
data:{"id":1516533403026,"value":"value"}
data:{"id":1516533404027,"value":"value"}






■ 예제 6
package toby.tobytv014;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Stream;
@SpringBootApplication
@Slf4j
@RestController
public class Tobytv014Application {
    @GetMapping("/event/{id}")
    Mono<List<Event>> hello(@PathVariable long id) {
        List<Event> list = Arrays.asList(new Event(1L, "event1"), new Event(2L, "event2"));
        return Mono.just(list);
    }
    @GetMapping(value="/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    Flux<Event> events() {
        return Flux
                .<Event, Long>generate(()->1L, (id, sink) -> {
                    sink.next(new Event(id, "value" + id));
                    return id+1;
                })
                .delayElements(Duration.ofSeconds(1))
                .take(10);
    }
    public static void main(String[] args) {
        SpringApplication.run(Tobytv014Application.class, args);
    }
    @Data    @AllArgsConstructor
    public static class Event {
        long id;
        String value;
    }
}



<Terminal>
C:\Users\Soohyeon\Desktop\curl_7_53_1_openssl_nghttp2_x64>curl localhost:8080/events
data:{"id":1,"value":"value1"}
data:{"id":2,"value":"value2"}
data:{"id":3,"value":"value3"}
data:{"id":4,"value":"value4"}
data:{"id":5,"value":"value5"}
data:{"id":6,"value":"value6"}
data:{"id":7,"value":"value7"}
data:{"id":8,"value":"value8"}
data:{"id":9,"value":"value9"}
data:{"id":10,"value":"value10"}




■ 예제 7
package toby.tobytv014;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Stream;
@SpringBootApplication
@Slf4j
@RestController
public class Tobytv014Application {
    @GetMapping("/event/{id}")
    Mono<List<Event>> hello(@PathVariable long id) {
        List<Event> list = Arrays.asList(new Event(1L, "event1"), new Event(2L, "event2"));
        return Mono.just(list);
    }
    @GetMapping(value="/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    Flux<Event> events() {
        Flux<Event> es = Flux
                .<Event, Long>generate(()->1L, (id, sink) -> {
                    sink.next(new Event(id, "value" + id));
                    return id+1;
                });
        Flux<Long> interval = Flux.interval(Duration.ofSeconds(1));
        return Flux.zip(es, interval).map(tu->tu.getT1()).take(10);
    }
    public static void main(String[] args) {
        SpringApplication.run(Tobytv014Application.class, args);
    }
    @Data    @AllArgsConstructor
    public static class Event {
        long id;
        String value;
    }
}



<Terminal>
C:\Users\Soohyeon\Desktop\curl_7_53_1_openssl_nghttp2_x64>curl localhost:8080/events
data:{"id":1,"value":"value1"}
data:{"id":2,"value":"value2"}
data:{"id":3,"value":"value3"}
data:{"id":4,"value":"value4"}
data:{"id":5,"value":"value5"}
data:{"id":6,"value":"value6"}
data:{"id":7,"value":"value7"}
data:{"id":8,"value":"value8"}
data:{"id":9,"value":"value9"}
data:{"id":10,"value":"value10"}





■ 예제 8
package toby.tobytv014;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Stream;
@SpringBootApplication
@Slf4j
@RestController
public class Tobytv014Application {
    @GetMapping("/event/{id}")
    Mono<List<Event>> hello(@PathVariable long id) {
        List<Event> list = Arrays.asList(new Event(1L, "event1"), new Event(2L, "event2"));
        return Mono.just(list);
    }
    @GetMapping(value="/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    Flux<Event> events() {
        Flux<String> es = Flux.generate(sink -> sink.next("Value"));
        Flux<Long> interval = Flux.interval(Duration.ofSeconds(1));
        return Flux.zip(es, interval).map(tu -> new Event(tu.getT2()+1, tu.getT1())).take(10);
    }
    public static void main(String[] args) {
        SpringApplication.run(Tobytv014Application.class, args);
    }
    @Data    @AllArgsConstructor
    public static class Event {
        long id;
        String value;
    }
}



<Terminal>
C:\Users\Soohyeon\Desktop\curl_7_53_1_openssl_nghttp2_x64>curl localhost:8080/events
data:{"id":1,"value":"Value"}
data:{"id":2,"value":"Value"}
data:{"id":3,"value":"Value"}
data:{"id":4,"value":"Value"}
data:{"id":5,"value":"Value"}
data:{"id":6,"value":"Value"}
data:{"id":7,"value":"Value"}
data:{"id":8,"value":"Value"}
data:{"id":9,"value":"Value"}
data:{"id":10,"value":"Value"}




<Flux.generate()>





<Flux.interval()>





<Flux.zip()>





(시청일 : 20170114)



스프링 5.0 WebFlux에서 사용되는 Reactor의 Mono의 기본 동작방식을 살펴봅니다.



■ 예제 1
package toby;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
@SpringBootApplication
@RestController
@Slf4j
public class Toby013Application {
    @GetMapping("/")
    Mono<String> hello() {
        return Mono.just("Hello WebFlux").log(); // Publisher -> (Publisher) -> (Publisher) -> Subscriber
    }
    public static void main(String[] args) {
        SpringApplication.run(Toby013Application.class, args);
    }
}


<결과>
2018-01-21 17:15:04.761  INFO 8148 --- [ctor-http-nio-2] reactor.Mono.Just.1                      : | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
2018-01-21 17:15:04.764  INFO 8148 --- [ctor-http-nio-2] reactor.Mono.Just.1                      : | request(1)
2018-01-21 17:15:04.764  INFO 8148 --- [ctor-http-nio-2] reactor.Mono.Just.1                      : | onNext(Hello WebFlux)
2018-01-21 17:15:04.787  INFO 8148 --- [ctor-http-nio-2] reactor.Mono.Just.1                      : | request(1)
2018-01-21 17:15:04.787  INFO 8148 --- [ctor-http-nio-2] reactor.Mono.Just.1                      : | request(31)
2018-01-21 17:15:04.787  INFO 8148 --- [ctor-http-nio-2] reactor.Mono.Just.1                      : | onComplete()
2018-01-21 17:15:05.264  INFO 8148 --- [ctor-http-nio-2] reactor.Mono.Just.1                      : | cancel()



■ 예제 2
package toby;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
@SpringBootApplication
@RestController
@Slf4j
public class Toby013Application {
    @GetMapping("/")
    Mono<String> hello() {
        log.info("pos1");
        Mono m = Mono.just("Hello WebFlux").log(); // Publisher -> (Publisher) -> (Publisher) -> Subscriber
        log.info("pos2");
        return m;
    }
    public static void main(String[] args) {
        SpringApplication.run(Toby013Application.class, args);
    }
}




<결과>
2018-01-21 17:18:46.265  INFO 13388 --- [ctor-http-nio-2] toby.Toby013Application                  : pos1
2018-01-21 17:18:46.268  INFO 13388 --- [ctor-http-nio-2] toby.Toby013Application                  : pos2
2018-01-21 17:18:46.284  INFO 13388 --- [ctor-http-nio-2] reactor.Mono.Just.1                      : | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
2018-01-21 17:18:46.285  INFO 13388 --- [ctor-http-nio-2] reactor.Mono.Just.1                      : | request(1)
2018-01-21 17:18:46.286  INFO 13388 --- [ctor-http-nio-2] reactor.Mono.Just.1                      : | onNext(Hello WebFlux)
2018-01-21 17:18:46.307  INFO 13388 --- [ctor-http-nio-2] reactor.Mono.Just.1                      : | request(1)
2018-01-21 17:18:46.308  INFO 13388 --- [ctor-http-nio-2] reactor.Mono.Just.1                      : | request(31)
2018-01-21 17:18:46.308  INFO 13388 --- [ctor-http-nio-2] reactor.Mono.Just.1                      : | onComplete()
2018-01-21 17:18:46.746  INFO 13388 --- [ctor-http-nio-2] reactor.Mono.Just.1                      : | cancel()




■ 예제 3
package toby;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
@SpringBootApplication
@RestController
@Slf4j
public class Toby013Application {
    @GetMapping("/")
    Mono<String> hello() {
        log.info("pos1");
        Mono m = Mono.just("Hello WebFlux").doOnNext(c->log.info(c)).log(); // Publisher -> (Publisher) -> (Publisher) -> Subscriber
        log.info("pos2");
        return m;
    }
    public static void main(String[] args) {
        SpringApplication.run(Toby013Application.class, args);
    }
}




<결과>
2018-01-21 17:20:50.096  INFO 13748 --- [ctor-http-nio-2] toby.Toby013Application                  : pos1
2018-01-21 17:20:50.099  INFO 13748 --- [ctor-http-nio-2] toby.Toby013Application                  : pos2
2018-01-21 17:20:50.115  INFO 13748 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | onSubscribe([Fuseable] FluxPeekFuseable.PeekFuseableSubscriber)
2018-01-21 17:20:50.117  INFO 13748 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | request(1)
2018-01-21 17:20:50.117  INFO 13748 --- [ctor-http-nio-2] toby.Toby013Application                  : Hello WebFlux
2018-01-21 17:20:50.117  INFO 13748 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | onNext(Hello WebFlux)
2018-01-21 17:20:50.138  INFO 13748 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | request(1)
2018-01-21 17:20:50.138  INFO 13748 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | request(31)
2018-01-21 17:20:50.139  INFO 13748 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | onComplete()
2018-01-21 17:20:50.229  INFO 13748 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | cancel()




■ 예제 4
package toby;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
@SpringBootApplication
@RestController
@Slf4j
public class Toby013Application {
    @GetMapping("/")
    Mono<String> hello() {
        log.info("pos1");
        Mono m = Mono.just(generateHello()).doOnNext(c->log.info(c)).log(); // Publisher -> (Publisher) -> (Publisher) -> Subscriber
        log.info("pos2");
        return m;
    }
    private String generateHello() {
        log.info("method generateHello()");
        return "Hello Mono";
    }
    public static void main(String[] args) {
        SpringApplication.run(Toby013Application.class, args);
    }
}




<결과>
2018-01-21 17:26:20.531  INFO 14112 --- [ctor-http-nio-2] toby.Toby013Application                  : pos1
2018-01-21 17:26:20.531  INFO 14112 --- [ctor-http-nio-2] toby.Toby013Application                  : method generateHello()
2018-01-21 17:26:20.534  INFO 14112 --- [ctor-http-nio-2] toby.Toby013Application                  : pos2
2018-01-21 17:26:20.552  INFO 14112 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | onSubscribe([Fuseable] FluxPeekFuseable.PeekFuseableSubscriber)
2018-01-21 17:26:20.553  INFO 14112 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | request(1)
2018-01-21 17:26:20.553  INFO 14112 --- [ctor-http-nio-2] toby.Toby013Application                  : Hello Mono
2018-01-21 17:26:20.553  INFO 14112 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | onNext(Hello Mono)
2018-01-21 17:26:20.575  INFO 14112 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | request(1)
2018-01-21 17:26:20.575  INFO 14112 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | request(31)
2018-01-21 17:26:20.575  INFO 14112 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | onComplete()
2018-01-21 17:26:20.755  INFO 14112 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | cancel()


■ 예제 5
package toby;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
@SpringBootApplication
@RestController
@Slf4j
public class Toby013Application {
    @GetMapping("/")
    Mono<String> hello() {
        log.info("pos1");
        Mono m = Mono.fromSupplier(() -> generateHello()).doOnNext(c->log.info(c)).log(); // Publisher -> (Publisher) -> (Publisher) -> Subscriber
        log.info("pos2");
        return m;
    }
    private String generateHello() {
        log.info("method generateHello()");
        return "Hello Mono";
    }
    public static void main(String[] args) {
        SpringApplication.run(Toby013Application.class, args);
    }
}





<결과>
2018-01-21 17:29:04.571  INFO 13256 --- [ctor-http-nio-2] toby.Toby013Application                  : pos1
2018-01-21 17:29:04.575  INFO 13256 --- [ctor-http-nio-2] toby.Toby013Application                  : pos2
2018-01-21 17:29:04.589  INFO 13256 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | onSubscribe([Fuseable] FluxPeekFuseable.PeekFuseableSubscriber)
2018-01-21 17:29:04.591  INFO 13256 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | request(1)
2018-01-21 17:29:04.591  INFO 13256 --- [ctor-http-nio-2] toby.Toby013Application                  : method generateHello()
2018-01-21 17:29:04.591  INFO 13256 --- [ctor-http-nio-2] toby.Toby013Application                  : Hello Mono
2018-01-21 17:29:04.591  INFO 13256 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | onNext(Hello Mono)
2018-01-21 17:29:04.622  INFO 13256 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | request(1)
2018-01-21 17:29:04.622  INFO 13256 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | request(31)
2018-01-21 17:29:04.623  INFO 13256 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | onComplete()



■ 예제 6
package toby;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
@SpringBootApplication
@RestController
@Slf4j
public class Toby013Application {
    @GetMapping("/")
    Mono<String> hello() {
        log.info("pos1");
        Mono m = Mono.fromSupplier(() -> generateHello()).doOnNext(c->log.info(c)).log(); // Publisher -> (Publisher) -> (Publisher) -> Subscriber
        m.subscribe();
        log.info("pos2");
        return m;
    }
    private String generateHello() {
        log.info("method generateHello()");
        return "Hello Mono";
    }
    public static void main(String[] args) {
        SpringApplication.run(Toby013Application.class, args);
    }
}





<결과>
2018-01-21 17:33:30.588  INFO 13316 --- [ctor-http-nio-2] toby.Toby013Application                  : pos1
2018-01-21 17:33:30.596  INFO 13316 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | onSubscribe([Fuseable] FluxPeekFuseable.PeekFuseableSubscriber)
2018-01-21 17:33:30.598  INFO 13316 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | request(unbounded)
2018-01-21 17:33:30.598  INFO 13316 --- [ctor-http-nio-2] toby.Toby013Application                  : method generateHello()
2018-01-21 17:33:30.598  INFO 13316 --- [ctor-http-nio-2] toby.Toby013Application                  : Hello Mono
2018-01-21 17:33:30.598  INFO 13316 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | onNext(Hello Mono)
2018-01-21 17:33:30.599  INFO 13316 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | onComplete()
2018-01-21 17:33:30.599  INFO 13316 --- [ctor-http-nio-2] toby.Toby013Application                  : pos2
2018-01-21 17:33:30.612  INFO 13316 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | onSubscribe([Fuseable] FluxPeekFuseable.PeekFuseableSubscriber)
2018-01-21 17:33:30.612  INFO 13316 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | request(1)
2018-01-21 17:33:30.612  INFO 13316 --- [ctor-http-nio-2] toby.Toby013Application                  : method generateHello()
2018-01-21 17:33:30.612  INFO 13316 --- [ctor-http-nio-2] toby.Toby013Application                  : Hello Mono
2018-01-21 17:33:30.612  INFO 13316 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | onNext(Hello Mono)
2018-01-21 17:33:30.633  INFO 13316 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | request(1)
2018-01-21 17:33:30.633  INFO 13316 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | request(31)
2018-01-21 17:33:30.634  INFO 13316 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | onComplete()
2018-01-21 17:33:31.249  INFO 13316 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | cancel()




■ 예제 7
package toby;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
@SpringBootApplication
@RestController
@Slf4j
public class Toby013Application {
    @GetMapping("/")
    Mono<String> hello() {
        log.info("pos1");
        String msg = generateHello();
        Mono<String> m = Mono.just(msg).doOnNext(c->log.info(c)).log(); // Publisher -> (Publisher) -> (Publisher) -> Subscriber
        String msg2 = m.block();
        log.info("pos2: " + msg2);
        return m;
    }
    private String generateHello() {
        log.info("method generateHello()");
        return "Hello Mono";
    }
    public static void main(String[] args) {
        SpringApplication.run(Toby013Application.class, args);
    }
}






<결과>
2018-01-21 17:39:20.356  INFO 11348 --- [ctor-http-nio-2] toby.Toby013Application                  : pos1
2018-01-21 17:39:20.356  INFO 11348 --- [ctor-http-nio-2] toby.Toby013Application                  : method generateHello()
2018-01-21 17:39:20.362  INFO 11348 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | onSubscribe([Fuseable] FluxPeekFuseable.PeekFuseableSubscriber)
2018-01-21 17:39:20.364  INFO 11348 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | request(unbounded)
2018-01-21 17:39:20.364  INFO 11348 --- [ctor-http-nio-2] toby.Toby013Application                  : Hello Mono
2018-01-21 17:39:20.364  INFO 11348 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | onNext(Hello Mono)
2018-01-21 17:39:20.364  INFO 11348 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | onComplete()
2018-01-21 17:39:20.365  INFO 11348 --- [ctor-http-nio-2] toby.Toby013Application                  : pos2: Hello Mono
2018-01-21 17:39:20.381  INFO 11348 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | onSubscribe([Fuseable] FluxPeekFuseable.PeekFuseableSubscriber)
2018-01-21 17:39:20.381  INFO 11348 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | request(1)
2018-01-21 17:39:20.381  INFO 11348 --- [ctor-http-nio-2] toby.Toby013Application                  : Hello Mono
2018-01-21 17:39:20.381  INFO 11348 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | onNext(Hello Mono)
2018-01-21 17:39:20.407  INFO 11348 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | request(1)
2018-01-21 17:39:20.407  INFO 11348 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | request(31)
2018-01-21 17:39:20.407  INFO 11348 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | onComplete()
2018-01-21 17:39:20.579  INFO 11348 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | cancel()


- block : publisher가 제공하는 결과값을 꺼내서 Mono나 Flux 같은 컨테이너를 제거하고 값을 넘겨주는 것이 목적.


■ 예제 8
package toby;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
@SpringBootApplication
@RestController
@Slf4j
public class Toby013Application {
    @GetMapping("/")
    Mono<String> hello() {
        log.info("pos1");
        String msg = generateHello();
        Mono<String> m = Mono.just(msg).doOnNext(c->log.info(c)).log(); // Publisher -> (Publisher) -> (Publisher) -> Subscriber
        String msg2 = m.block();
        log.info("pos2: " + msg2);
        return Mono.just(msg2);
    }
    private String generateHello() {
        log.info("method generateHello()");
        return "Hello Mono";
    }
    public static void main(String[] args) {
        SpringApplication.run(Toby013Application.class, args);
    }
}




<결과>
2018-01-21 17:48:23.009  INFO 13012 --- [ctor-http-nio-2] toby.Toby013Application                  : pos1
2018-01-21 17:48:23.009  INFO 13012 --- [ctor-http-nio-2] toby.Toby013Application                  : method generateHello()
2018-01-21 17:48:23.015  INFO 13012 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | onSubscribe([Fuseable] FluxPeekFuseable.PeekFuseableSubscriber)
2018-01-21 17:48:23.016  INFO 13012 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | request(unbounded)
2018-01-21 17:48:23.016  INFO 13012 --- [ctor-http-nio-2] toby.Toby013Application                  : Hello Mono
2018-01-21 17:48:23.017  INFO 13012 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | onNext(Hello Mono)
2018-01-21 17:48:23.017  INFO 13012 --- [ctor-http-nio-2] reactor.Mono.PeekFuseable.1              : | onComplete()
2018-01-21 17:48:23.018  INFO 13012 --- [ctor-http-nio-2] toby.Toby013Application                  : pos2: Hello Mono




(시청일 : 20170114)



- 책 추천 : RXJava를 활용한 리액티브 프로그래밍


■ 프로젝트 환경
- 스프링부트 버전 : SpringBoot 2.0 M1
- 스프링 프레임워크 버전 : SpringFramework 5.0 RC1
- Core : Lombok
- Web : Reactive Web  
           (Web과 Reactive Web는 서로 배타적이라 한 프로젝트에 동시에 적용될 수 없다. 두가지 모두 선택하게 되면 Web이 우선시된다)


■ 예제 1
/* 
*
* LoadTest.java 
*
*/
package toby.live;

import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StopWatch;
import org.springframework.web.client.RestTemplate;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

@Slf4j
public class LoadTest {
static AtomicInteger counter = new AtomicInteger(0);

public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
ExecutorService es = Executors.newFixedThreadPool(100);

RestTemplate rt = new RestTemplate();


CyclicBarrier barrier = new CyclicBarrier(101);


StopWatch main = new StopWatch();
main.start();

for (int i=0; i<100; i++){
es.submit(()->{
int idx = counter.addAndGet(1);

barrier.await();

log.info("Thread {}", idx);

StopWatch sw = new StopWatch();
sw.start();

String res = rt.getForObject(url,String.class, idx);

sw.stop();
log.info("Elapsed: {} {} / {}", idx, sw.getTotalTimeSeconds(), res);

return null;
});
}

barrier.await();
es.shutdown();
es.awaitTermination(100, TimeUnit.SECONDS);

main.stop();
log.info("Total: {}", main.getTotalTimeSeconds());

}

}



/*
* Toby012Application.java
(원래는 Netty에서 돌아가야하지만, 한 프로젝트에서 Tomcat과 Netty를 동시에 돌릴 수 없어서 Tomcat에서 실행하였음.
*/
package toby.live;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.stereotype.Service;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
@SpringBootApplication
@RestController
@EnableAsync
public class Toby012Application {
    static final String URL1 = "http://localhost:8081/service?req={req}";;
    static final String URL2 = "http://localhost:8081/service2?req={req}";;
    @Autowired
    MyService myService;
    WebClient client = WebClient.create();
    @GetMapping("/rest")
    public Mono<String> rest(int idx) {
        return client.get().uri(URL1, idx).exchange() // Mono<ClientResponse>
                .flatMap(c ->  c.bodyToMono(String.class)) // Mono<String>
                .flatMap((String res1) -> client.get().uri(URL2, res1).exchange()) // Mono<ClientResponse>
                .flatMap(c -> c.bodyToMono((String.class)));  // Mono<String>
    }
    public static void main(String[] args) {
        System.setProperty("reactor.ipc.netty.workerCount", "2");
        System.setProperty("reactor.ipc.netty.pool.maxConnections", "2000");
        SpringApplication.run(Toby012Application.class, args);
    }
    @Service
    public static class MyService {
        public String work(String req) {
            return req + "/asyncwork";
        }
    }
}





/*
* RemoteService.java
(Tomcat에서 실행)
*/
package toby.live;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@SpringBootApplication
public class RemoteService {
    @RestController
    public static class MyController {
        @GetMapping("/service")
        public String service(String req) throws InterruptedException {
            Thread.sleep(1000);
            //throw new RuntimeException();
            return req + "/service1";  // html/text
        }
        @GetMapping("/service2")
        public String service2(String req) throws InterruptedException {
            Thread.sleep(1000);
            return req + "/service2";  // html/text
        }
    }
    public static void main(String[] args) {
        System.setProperty("SERVER.PORT","8081");
        System.setProperty("server.tomcat.max-threads","1000");
        SpringApplication.run(RemoteService.class, args);
    }
}


 


■ 예제 2
/* 
*
* LoadTest.java 
*
*/
package toby.live;

import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StopWatch;
import org.springframework.web.client.RestTemplate;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

@Slf4j
public class LoadTest {
static AtomicInteger counter = new AtomicInteger(0);

public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
ExecutorService es = Executors.newFixedThreadPool(100);

RestTemplate rt = new RestTemplate();


CyclicBarrier barrier = new CyclicBarrier(101);


StopWatch main = new StopWatch();
main.start();

for (int i=0; i<100; i++){
es.submit(()->{
int idx = counter.addAndGet(1);

barrier.await();

log.info("Thread {}", idx);

StopWatch sw = new StopWatch();
sw.start();

String res = rt.getForObject(url,String.class, idx);

sw.stop();
log.info("Elapsed: {} {} / {}", idx, sw.getTotalTimeSeconds(), res);

return null;
});
}

barrier.await();
es.shutdown();
es.awaitTermination(100, TimeUnit.SECONDS);

main.stop();
log.info("Total: {}", main.getTotalTimeSeconds());

}

}



/*
* Toby012Application.java
(원래는 Netty에서 돌아가야하지만, 한 프로젝트에서 Tomcat과 Netty를 동시에 돌릴 수 없어서 Tomcat에서 실행하였음.
*/
package toby.live;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.stereotype.Service;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
import java.util.concurrent.CompletableFuture;
@SpringBootApplication
@RestController
@Slf4j
@EnableAsync
public class Toby012Application {
    static final String URL1 = "http://localhost:8081/service?req={req}";;
    static final String URL2 = "http://localhost:8081/service2?req={req}";;
    @Autowired
    MyService myService;
    WebClient client = WebClient.create();
    @GetMapping("/rest")
    public Mono<String> rest(int idx) {
        return client.get().uri(URL1, idx).exchange() // Mono<ClientResponse>
                .flatMap(c ->  c.bodyToMono(String.class)) // Mono<String>
                //.doOnNext(c -> log.info(c))
                .flatMap(res1 -> client.get().uri(URL2, res1).exchange()) // Mono<ClientResponse>
                .flatMap(c -> c.bodyToMono((String.class))) // Mono<String>
                .doOnNext(c -> log.info(c))
                .flatMap(res2 -> Mono.fromCompletionStage(myService.work(res2)))      // CompletableFuture<String> -> Mono<String>
                .doOnNext(c -> log.info(c));
    }
    public static void main(String[] args) {
        System.setProperty("reactor.ipc.netty.workerCount", "2");
        System.setProperty("reactor.ipc.netty.pool.maxConnections", "2000");
        SpringApplication.run(Toby012Application.class, args);
    }
    @Service
    public static class MyService {
        @Async
        public CompletableFuture<String> work(String req) {
            return CompletableFuture.completedFuture(req + "/asyncwork");
        }
    }
}





/*
* RemoteService.java
(Tomcat에서 실행)
*/
package toby.live;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@SpringBootApplication
public class RemoteService {
    @RestController
    public static class MyController {
        @GetMapping("/service")
        public String service(String req) throws InterruptedException {
            Thread.sleep(1000);
            //throw new RuntimeException();
            return req + "/service1";  // html/text
        }
        @GetMapping("/service2")
        public String service2(String req) throws InterruptedException {
            Thread.sleep(1000);
            return req + "/service2";  // html/text
        }
    }
    public static void main(String[] args) {
        System.setProperty("SERVER.PORT","8081");
        System.setProperty("server.tomcat.max-threads","1000");
        SpringApplication.run(RemoteService.class, args);
    }
}



■ application.properties

server.tomcat.max-threads=1


(시청일 : 20180113)




(중략)

그 동안 java 에서 제공하던 Future interface 가 제공한 Async 이지만 get() 메서드로 Blocking 을 유발하는 단점이 있었다.
이런 불편함은 Spring Framework에서 제공하는 LinstenableFuture 로 해소 할 수 있었다. (Spring Framework를 사용하는 개발자라면 RestTemplated 의 Async 버젼인 AsyncRestTemplate에 대해 관심을 가져 보기 바란다.)
하지만 ListenableFuture를 써 본 개발자라면 한번쯤은 Callback 작성의 복잡함에 고민해 왔을 것이다. 심지어 Callback Hell 이라는 말이 커뮤니티에는 많이 돌고 있기도 하다.
그 불편함을 알았던지 Java 8 에서 드디어 CompletableFuture 를 제공한다.
CompletableFuture는 아래와 같은 패턴으로 개발이 가능하여 비동기 서비스 코드의 가독성이 좋아졌다.
(중략)


- Future : 비동기 작업의 결과를 담고있는 오브젝트
- ListenableFuture : 콜백 구조로 결과가 완료되는 시점에 Hooking을 걸어서 결과를 가져오는 기법. 비동기 메소드를 호출하면서 콜백 메소드를 지정할 수 있도록 되어 있다.
- CompletableFuture : CompletableFuture 리스트의 모든값이 완료될 때까지 기다릴지 아니면 하나의 값만 완료되길 기다릴지 선택할 수 있다는 것이 장점이다. CompletableFuture 클래스는 람다 표현식과 파이프라이닝을 활용하면 구조적으로 예쁘게 만들 수 있다. 병렬성과(Parallelism)과 동시성(Concurrency)에서 CompletableFuture가 중요한데, 여러개의 cpu core 사이에 지연 실행이나 예외를 callable하게 처리할 수 있어서 명시적인 처리가 가능해진다.


■ 예제 1
package toby.live;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
@Slf4j
public class CFuture {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        /*
        CompletableFuture<Integer> f = new CompletableFuture<>();
        //f.complete(2);
        f.completeExceptionally(new RuntimeException());
        System.out.println(f.get());
        */
        CompletableFuture
                .runAsync(() -> log.info("runAsync"))
                .thenRun(() -> log.info("thenRun"))
                .thenRun(() -> log.info("thenRun"));
        log.info("exit");
        ForkJoinPool.commonPool().shutdown();
        ForkJoinPool.commonPool().awaitTermination(10, TimeUnit.SECONDS);
    }
}

<결과>
18:13:38.852 [ForkJoinPool.commonPool-worker-9] INFO toby.live.CFuture - runAsync
18:13:38.852 [main] INFO toby.live.CFuture - exit
18:13:38.859 [ForkJoinPool.commonPool-worker-9] INFO toby.live.CFuture - thenRun
18:13:38.860 [ForkJoinPool.commonPool-worker-9] INFO toby.live.CFuture - thenRun
Process finished with exit code 0


■ 예제 2
package toby.live;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
@Slf4j
public class CFuture {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        /*
        CompletableFuture<Integer> f = new CompletableFuture<>();
        //f.complete(2);
        f.completeExceptionally(new RuntimeException());
        System.out.println(f.get());
        */
        CompletableFuture
                .supplyAsync(() -> {
                    log.info("runAsync");
                    return 1;
                })
                .thenApply(s -> {
                    log.info("thenApply {}",s);
                    return s + 1;
                })
                .thenApply(s2 -> {
                    log.info("thenApply {}",s2);
                    return s2 * 3;
                })
                .thenAccept(s3 -> log.info("thenAccept {}",s3));
        log.info("exit");
        ForkJoinPool.commonPool().shutdown();
        ForkJoinPool.commonPool().awaitTermination(10, TimeUnit.SECONDS);
    }
}


<결과>
18:20:54.956 [main] INFO toby.live.CFuture - exit
18:20:54.956 [ForkJoinPool.commonPool-worker-9] INFO toby.live.CFuture - runAsync
18:20:54.962 [ForkJoinPool.commonPool-worker-9] INFO toby.live.CFuture - thenApply 1
18:20:54.966 [ForkJoinPool.commonPool-worker-9] INFO toby.live.CFuture - thenApply 2
18:20:54.966 [ForkJoinPool.commonPool-worker-9] INFO toby.live.CFuture - thenAccept 6
Process finished with exit code 0



■ 예제 3
package toby.live;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
@Slf4j
public class CFuture {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        /*
        CompletableFuture<Integer> f = new CompletableFuture<>();
        //f.complete(2);
        f.completeExceptionally(new RuntimeException());
        System.out.println(f.get());
        */
        CompletableFuture
                .supplyAsync(() -> {
                    log.info("runAsync");
                    return 1;
                })
                .thenCompose(s -> {
                    log.info("thenApply {}",s);
                    return CompletableFuture.completedFuture(s + 1);
                })
                .thenApply(s2 -> {
                    log.info("thenApply {}",s2);
                    return s2 * 3;
                })
                .thenAccept(s3 -> log.info("thenAccept {}",s3));
        log.info("exit");
        ForkJoinPool.commonPool().shutdown();
        ForkJoinPool.commonPool().awaitTermination(10, TimeUnit.SECONDS);
    }
}


<결과>
18:25:36.218 [main] INFO toby.live.CFuture - exit
18:25:36.218 [ForkJoinPool.commonPool-worker-9] INFO toby.live.CFuture - runAsync
18:25:36.226 [ForkJoinPool.commonPool-worker-9] INFO toby.live.CFuture - thenCompose 1
18:25:36.230 [ForkJoinPool.commonPool-worker-9] INFO toby.live.CFuture - thenApply 2
18:25:36.230 [ForkJoinPool.commonPool-worker-9] INFO toby.live.CFuture - thenAccept 6
Process finished with exit code 0



■ 예제 4
package toby.live;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
@Slf4j
public class CFuture {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        /*
        CompletableFuture<Integer> f = new CompletableFuture<>();
        //f.complete(2);
        f.completeExceptionally(new RuntimeException());
        System.out.println(f.get());
        */
        CompletableFuture
                .supplyAsync(() -> {
                    log.info("runAsync");
                    if(1==1) throw new RuntimeException();  // 강제 오류 발생!!
                    return 1;
                })
                .thenCompose(s -> {
                    log.info("thenCompose {}",s);
                    return CompletableFuture.completedFuture(s + 1);
                })
                .thenApply(s2 -> {
                    log.info("thenApply {}",s2);
                    return s2 * 3;
                })
                .exceptionally(e -> -10)
                .thenAccept(s3 -> log.info("thenAccept {}",s3));
        log.info("exit");
        ForkJoinPool.commonPool().shutdown();
        ForkJoinPool.commonPool().awaitTermination(10, TimeUnit.SECONDS);
    }
}



<결과>
18:27:08.541 [main] INFO toby.live.CFuture - exit
18:27:08.541 [ForkJoinPool.commonPool-worker-9] INFO toby.live.CFuture - runAsync
18:27:08.546 [ForkJoinPool.commonPool-worker-9] INFO toby.live.CFuture - thenAccept -10
Process finished with exit code 0



■ 예제 5
package toby.live;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.*;
@Slf4j
public class CFuture {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        /*
        CompletableFuture<Integer> f = new CompletableFuture<>();
        //f.complete(2);
        f.completeExceptionally(new RuntimeException());
        System.out.println(f.get());
        */
        ExecutorService es = Executors.newFixedThreadPool(10);
        CompletableFuture
                .supplyAsync(() -> {
                    log.info("runAsync");
                    return 1;
                })
                .thenCompose(s -> {
                    log.info("thenCompose {}",s);
                    return CompletableFuture.completedFuture(s + 1);
                })
                .thenApplyAsync(s2 -> {
                    log.info("thenApplyAsync {}",s2);
                    return s2 * 3;
                }, es)
                .exceptionally(e -> -10)
                .thenAcceptAsync(s3 -> log.info("thenAcceptAsync {}",s3), es);
        log.info("exit");
        ForkJoinPool.commonPool().shutdown();
        ForkJoinPool.commonPool().awaitTermination(10, TimeUnit.SECONDS);
    }
}


<결과>
18:32:24.425 [main] INFO toby.live.CFuture - exit
18:32:24.425 [ForkJoinPool.commonPool-worker-9] INFO toby.live.CFuture - runAsync
18:32:24.431 [ForkJoinPool.commonPool-worker-9] INFO toby.live.CFuture - thenCompose 1
18:32:24.441 [pool-1-thread-1] INFO toby.live.CFuture - thenApplyAsync 2
18:32:24.447 [pool-1-thread-2] INFO toby.live.CFuture - thenAcceptAsync 6





■ LoadTest.java
package toby.live;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StopWatch;
import org.springframework.web.client.RestTemplate;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
public class LoadTest {
    static AtomicInteger counter = new AtomicInteger(0);
    public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
        ExecutorService es = Executors.newFixedThreadPool(100);
        RestTemplate rt = new RestTemplate();
        String url = "http://localhost:8080/rest?idx={idx}";;
        CyclicBarrier barrier = new CyclicBarrier(101);
        StopWatch main = new StopWatch();
        main.start();
        for (int i=0; i<100; i++){
            es.submit(()->{
                int idx = counter.addAndGet(1);
                barrier.await();
                log.info("Thread {}", idx);
                StopWatch sw = new StopWatch();
                sw.start();
                String res = rt.getForObject(url,String.class, idx);
                sw.stop();
                log.info("Elapsed: {} {} / {}", idx, sw.getTotalTimeSeconds(), res);
                return null;
            });
        }
        barrier.await();
        es.shutdown();
        es.awaitTermination(100, TimeUnit.SECONDS);
        main.stop();
        log.info("Total: {}", main.getTotalTimeSeconds());
    }
}


■ Tobytv011Application.java
package toby.live;
import io.netty.channel.nio.NioEventLoopGroup;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.http.ResponseEntity;
import org.springframework.http.client.Netty4ClientHttpRequestFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.AsyncRestTemplate;
import org.springframework.web.context.request.async.DeferredResult;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Function;
@SpringBootApplication
@RestController
@EnableAsync
public class Tobytv011Application {
    AsyncRestTemplate rt = new AsyncRestTemplate(new Netty4ClientHttpRequestFactory(new NioEventLoopGroup(1)));
    @Autowired
    MyService myService;
    static final String URL1 = "http://localhost:8081/service?req={req}";;
    static final String URL2 = "http://localhost:8081/service2?req={req}";;
    @GetMapping("/rest")
    public DeferredResult<String> rest(int idx) {
        DeferredResult<String> dr = new DeferredResult<>();
        toCF(rt.getForEntity(URL1, String.class, "h" + idx))
        .thenCompose(s -> {
            //if (1==1) throw new RuntimeException("ERROR");
            return toCF(rt.getForEntity(URL2, String.class, s.getBody()));
        })
        .thenApplyAsync(s2 -> myService.work(s2.getBody()))
        .thenAccept(s3 -> dr.setResult(s3))
        .exceptionally(e -> {dr.setErrorResult(e.getMessage()); return (Void)null;});
        return dr;
    }
    <T> CompletableFuture<T> toCF(ListenableFuture<T> lf) {
        CompletableFuture<T> cf = new CompletableFuture<T>();
        lf.addCallback(s -> cf.complete(s), e -> cf.completeExceptionally(e));
        return cf;
    }
    public static class AcceptCompletion<S> extends Completion<S,Void> {
        Consumer<S> con;
        public AcceptCompletion(Consumer<S> con) {
            this.con = con;
        }
        @Override
        void run(S value) {
            con.accept(value);
        }
    }
    public static class ErrorCompletion<T> extends Completion<T,T> {
        Consumer<Throwable> econ;
        public ErrorCompletion(Consumer<Throwable> econ) {
            this.econ = econ;
        }
        @Override
        void run(T value) {
            if (next != null) next.run(value);
        }
    }
    public static class ApplyCompletion<S,T> extends Completion<S,T> {
        Function<S, ListenableFuture<T>> fn;
        public ApplyCompletion(Function<S, ListenableFuture<T>> fn) {
            this.fn = fn;
        }
        @Override
        void run(S value) {
            ListenableFuture<T> lf = fn.apply(value);
            lf.addCallback(s->complete(s), e->error(e));
        }
    }
    public static class Completion<S,T>{
        Completion next;
        public void andAccept(Consumer<T> con){
            Completion<T, Void> c = new AcceptCompletion(con);
            this.next = c;
        }
        public Completion<T,T> andError(Consumer<Throwable> econ){
            Completion<T,T> c = new ErrorCompletion<>(econ);
            this.next = c;
            return c;
        }
        public <V> Completion<T,V> andApply(Function<T, ListenableFuture<V>> fn){
            Completion<T,V> c = new ApplyCompletion<>(fn);
            this.next = c;
            return c;
        }
        public static <S,T> Completion<S,T> from(ListenableFuture<T> lf) {
            Completion<S,T> c = new Completion<>();
            lf.addCallback(s->{
                c.complete(s);
            }, e->{
                c.error(e);
            });
            return c;
        }
        void error(Throwable e) {
            if (next != null) next.error(e);
        }
        void complete(T s) {
            if (next != null) next.run(s);
        }
        void run(S value) {
        }
    }
    @Service
    public static class MyService {
        public String work(String req) {
            return req + "/asyncwork";
        }
    }
    @Bean
    public ThreadPoolTaskExecutor myThreadPool() {
        ThreadPoolTaskExecutor te = new ThreadPoolTaskExecutor();
        te.setCorePoolSize(1);
        te.setMaxPoolSize(1);
        return te;
    }
    public static void main(String[] args) {
        SpringApplication.run(Tobytv011Application.class, args);
    }
}



■ ReomoteService.java
package toby.live;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@SpringBootApplication
public class RemoteService {
@RestController
public static class MyController {
@GetMapping("/service")
public String service(String req) throws InterruptedException {
Thread.sleep(1000);
//throw new RuntimeException();
return req + "/service1"; // html/text
}

@GetMapping("/service2")
public String service2(String req) throws InterruptedException {
Thread.sleep(1000);
return req + "/service2"; // html/text
}
}


public static void main(String[] args) {
System.setProperty("SERVER.PORT","8081");
System.setProperty("server.tomcat.max-threads","1000");

SpringApplication.run(RemoteService.class, args);
}


}

■ application.properties
server.tomcat.max-threads=1



(시청일 : 20171126)






■ LoadTest.java
package toby.live;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StopWatch;
import org.springframework.web.client.RestTemplate;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
public class LoadTest {
    static AtomicInteger counter = new AtomicInteger(0);
    public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
        ExecutorService es = Executors.newFixedThreadPool(100);
        RestTemplate rt = new RestTemplate();
        String url = "http://localhost:8080/rest?idx={idx}";;
        CyclicBarrier barrier = new CyclicBarrier(101);
        StopWatch main = new StopWatch();
        main.start();
        for (int i=0; i<100; i++){
            es.submit(()->{
                int idx = counter.addAndGet(1);
                barrier.await();
                log.info("Thread {}", idx);
                StopWatch sw = new StopWatch();
                sw.start();
                String res = rt.getForObject(url,String.class, idx);
                sw.stop();
                log.info("Elapsed: {} {} / {}", idx, sw.getTotalTimeSeconds(), res);
                return null;
            });
        }
        barrier.await();
        es.shutdown();
        es.awaitTermination(100, TimeUnit.SECONDS);
        main.stop();
        log.info("Total: {}", main.getTotalTimeSeconds());
    }
}



■ Tobytv010Application.java
package toby.live;
import io.netty.channel.nio.NioEventLoopGroup;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.http.ResponseEntity;
import org.springframework.http.client.Netty4ClientHttpRequestFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.AsyncRestTemplate;
import org.springframework.web.context.request.async.DeferredResult;
import java.util.function.Consumer;
import java.util.function.Function;
@SpringBootApplication
@RestController
@EnableAsync
public class Tobytv010Application {
    AsyncRestTemplate rt = new AsyncRestTemplate(new Netty4ClientHttpRequestFactory(new NioEventLoopGroup(1)));
    @Autowired
    MyService myService;
    static final String URL1 = "http://localhost:8081/service?req={req}";;
    static final String URL2 = "http://localhost:8081/service2?req={req}";;
    @GetMapping("/rest")
    public DeferredResult<String> rest(int idx) {
        DeferredResult<String> dr = new DeferredResult<>();
//            ListenableFuture<ResponseEntity<String>> f1 = rt.getForEntity(URL1, String.class, "h" + idx);
//            f1.addCallback(s->{
//                ListenableFuture<ResponseEntity<String>> f2 = rt.getForEntity(URL2, String.class, s.getBody());
//
//                f2.addCallback(s2->{
//                    ListenableFuture<String> f3 = myService.work(s2.getBody());
//                    f3.addCallback(s3->{
//                        dr.setResult(s3);
//                    }, e->{
//                        dr.setErrorResult(e.getMessage());
//                    });
//                }, e->{
//                    dr.setErrorResult(e.getMessage());
//                });
//            }, e->{
//                dr.setErrorResult(e.getMessage());
//            });
        Completion
                .from(rt.getForEntity(URL1, String.class, "h" + idx))
                .andApply(s->rt.getForEntity(URL2, String.class, s.getBody()))
                .andApply(s->myService.work(s.getBody()))
                .andError(e->dr.setErrorResult("Error [" + e.toString() + "]"))
                .andAccept(s->dr.setResult(s));
        return dr;
    }
    
    public static class AcceptCompletion<S> extends Completion<S,Void> {
        Consumer<S> con;
        public AcceptCompletion(Consumer<S> con) {
            this.con = con;
        }
        @Override
        void run(S value) {
            con.accept(value);
        }
    }
    public static class ErrorCompletion<T> extends Completion<T,T> {
        Consumer<Throwable> econ;
        public ErrorCompletion(Consumer<Throwable> econ) {
            this.econ = econ;
        }
        @Override
        void run(T value) {
            if (next != null) next.run(value);
        }
    }
    public static class ApplyCompletion<S,T> extends Completion<S,T> {
        Function<S, ListenableFuture<T>> fn;
        public ApplyCompletion(Function<S, ListenableFuture<T>> fn) {
            this.fn = fn;
        }
        @Override
        void run(S value) {
            ListenableFuture<T> lf = fn.apply(value);
            lf.addCallback(s->complete(s), e->error(e));
        }
    }
    public static class Completion<S,T>{
        Completion next;
        public void andAccept(Consumer<T> con){
            Completion<T, Void> c = new AcceptCompletion(con);
            this.next = c;
        }
        public Completion<T,T> andError(Consumer<Throwable> econ){
            Completion<T,T> c = new ErrorCompletion<>(econ);
            this.next = c;
            return c;
        }
        public <V> Completion<T,V> andApply(Function<T, ListenableFuture<V>> fn){
            Completion<T,V> c = new ApplyCompletion<>(fn);
            this.next = c;
            return c;
        }
        public static <S,T> Completion<S,T> from(ListenableFuture<T> lf) {
            Completion<S,T> c = new Completion<>();
            lf.addCallback(s->{
                c.complete(s);
            }, e->{
                c.error(e);
            });
            return c;
        }
        void error(Throwable e) {
            if (next != null) next.error(e);
        }
        void complete(T s) {
            if (next != null) next.run(s);
        }
        void run(S value) {
        }
    }
    @Service
    public static class MyService {
        @Async
        public ListenableFuture<String> work(String req) { return new AsyncResult<>(req + "/asyncwork"); }
    }
    @Bean
    public ThreadPoolTaskExecutor myThreadPool() {
        ThreadPoolTaskExecutor te = new ThreadPoolTaskExecutor();
        te.setCorePoolSize(1);
        te.setMaxPoolSize(1);
        return te;
    }
    public static void main(String[] args) {
        SpringApplication.run(Tobytv010Application.class, args);
    }
}




■ RemoteService.java
package toby.live;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@SpringBootApplication
public class RemoteService {
    @RestController
    public static class MyController {
        @GetMapping("/service")
        public String service(String req) throws InterruptedException {
            Thread.sleep(1000);
            //throw new RuntimeException();
            return req + "/service1";  // html/text
        }
        @GetMapping("/service2")
        public String service2(String req) throws InterruptedException {
            Thread.sleep(1000);
            return req + "/service2";  // html/text
        }
    }
    public static void main(String[] args) {
        System.setProperty("SERVER.PORT","8081");
        System.setProperty("server.tomcat.max-threads","1000");
        SpringApplication.run(RemoteService.class, args);
    }
}


■ application.properties

server.tomcat.max-threads=1


(시청일 : 20171119)


[스프링 리액티브 웹 개발 5부. 비동기 RestTemplate과 비동기 MVC의 결합]




- CyclicBarrier : 자바에서 사용하는 simple 동기화 기법

<Callable과 Runnable 의  차이점>

Callable
Runnable
리턴값 존재 여부
X
Exception 던지도록 선언되어있는지 여부
X

- ListenableFuture : 비동기 작업 결과를 가져올 수 있는 future 타입인데, 성공/실패 시의 콜백을 등록할 수 있다.

- DeferredResult : 컨트롤러의 리턴을 별도의 작업에서 할 수 있게 해준다. 하지만 이스레드는 스프링에의해 관리 되는 것이 아니다.
 DeferredResult클래스는 어떤 요청에 대한 응답을 이벤트를 Queue에 저장하고 있다가 DeferredResult.setResult() 메소드가 호출되면 DispatcherSerlvet으로 응답을 보낸다.  즉 서버가 Push하는 기술들을 구현할 수 있게 해준다.



<자바의 쓰레드풀의 동작원리>
큐를 먼저 채우다가 큐가 다 차면, maxPoolSize까지 쓰레드를 더 추가로 늘렸다가 쓰레드가 다 차면, 오류 발생



■ Tobytv009Application.java
package toby.live.asyncrest;
import io.netty.channel.nio.NioEventLoopGroup;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.AutoConfigureOrder;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.http.ResponseEntity;
import org.springframework.http.client.Netty4ClientHttpRequestFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.AsyncRestTemplate;
import org.springframework.web.client.RestTemplate;
import org.springframework.web.context.request.async.DeferredResult;
@SpringBootApplication
@EnableAsync
public class Tobytv009Application {
    @RestController
    public static class MyController {
        //RestTemplate rt = new RestTemplate(); // 블로킹 방식  => 각각 2초 작업시간이 걸리는 100개의 API 호출 시에 하나의 쓰레드로 처리하기 때문에 약 200초 걸림
        //AsyncRestTemplate rt = new AsyncRestTemplate(); // 논블로킹 방식 => 각각 2초 작업시간이 걸리는 100개의 API 호출 시에 100개의 쓰레드로 처리하기 때문에 약 2초 걸림
        AsyncRestTemplate rt = new AsyncRestTemplate(new Netty4ClientHttpRequestFactory(new NioEventLoopGroup(1))); // 논블로킹 방식 => 각각 2초 작업시간이 걸리는 100개의 API 호출 시에 1개의 쓰레드로 처리하기 때문에 약 2초 걸림
//        @GetMapping("/rest")
//        public String rest(int idx) {
//            String res = rt.getForObject("http://localhost:8081/service?req={req}";, String.class, "hello" + idx);
//
//            return res;  // html/text
//        }
//        @GetMapping("/rest")
//        public ListenableFuture<ResponseEntity<String>> rest(int idx) {
//            return rt.getForEntity("http://localhost:8081/service?req={req}";, String.class, "hello" + idx);
//        }
        @Autowired MyService myService;
        @GetMapping("/rest")
        public DeferredResult<String> rest(int idx) {   // 결과를 가공하여 리턴 또는 다른 API와 의존적인 관계로 만드는 법
            DeferredResult<String> dr = new DeferredResult<>();
            ListenableFuture<ResponseEntity<String>> f1 = rt.getForEntity("http://localhost:8081/service1?req={req}";, String.class, "hello" + idx);
            f1.addCallback(s1->{
                ListenableFuture<ResponseEntity<String>> f2 = rt.getForEntity("http://localhost:8081/service2?req={req}";, String.class, s1.getBody());
                f2.addCallback(s2->{
                     //dr.setResult(s2.getBody());
                    ListenableFuture<String> f3 =  myService.work(s2.getBody());
                    f3.addCallback(s3->{
                        dr.setErrorResult(s3);
                    },e3->{
                        dr.setErrorResult(e3.getMessage());
                    });
                },e2->{
                    dr.setErrorResult(e2.getMessage());
                });
            }, e1->{
                dr.setErrorResult(e1.getMessage());
            });
            return dr;
        }
    }
    @Service
    public static class MyService {
        @Async
        public ListenableFuture<String> work(String req) {
            return new AsyncResult<>(req + "/asyncwork");
        }
    }
    @Bean
    ThreadPoolTaskExecutor myThreadPool() {  //
        ThreadPoolTaskExecutor te = new ThreadPoolTaskExecutor();
        te.setCorePoolSize(1);
        te.setMaxPoolSize(1);
        te.initialize();
        return te;
    }
    public static void main(String[] args) {
        SpringApplication.run(Tobytv009Application.class, args);
    }
}


■ RemoteService.java
package toby.live.asyncrest;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@SpringBootApplication
public class RemoteService {
    @RestController
    public static class MyController {
        @GetMapping("/service1")
        public String service1(String req) throws InterruptedException {
            Thread.sleep(2000);
            return req + "/service1";  // html/text
        }
        @GetMapping("/service2")
        public String service2(String req) throws InterruptedException {
            Thread.sleep(2000);
            return req + "/service2";  // html/text
        }
    }
    public static void main(String[] args) {
        System.setProperty("SERVER.PORT","8081");
        System.setProperty("server.tomcat.max-threads","1000");
        SpringApplication.run(RemoteService.class, args);
    }
}


■ LoadTest.java

package toby.live.asyncrest;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StopWatch;
import org.springframework.web.client.RestTemplate;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
public class LoadTest {
    static AtomicInteger counter = new AtomicInteger(0);
    public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
        ExecutorService es = Executors.newFixedThreadPool(100);
        RestTemplate rt = new RestTemplate();
        String url = "http://localhost:8080/rest?idx={idx}";;
        CyclicBarrier barrier = new CyclicBarrier(101);
        StopWatch main = new StopWatch();
        main.start();
        for (int i=0; i<100; i++){
            es.submit(()->{
                int idx = counter.addAndGet(1);
                barrier.await();
                log.info("Thread {}", idx);
                StopWatch sw = new StopWatch();
                sw.start();
                String res = rt.getForObject(url,String.class, idx);
                sw.stop();
                log.info("Elapsed: {} {} / {}", idx, sw.getTotalTimeSeconds(), res);
                return null;
            });
        }
        barrier.await();
        es.shutdown();
        es.awaitTermination(100, TimeUnit.SECONDS);
        main.stop();
        log.info("Total: {}", main.getTotalTimeSeconds());
    }
}


(시청일 : 20171112)




<자바의 비동기 기술>
  1. Future
  2. Callback

<Spring MVC의 비동기 기술>
  1. Callable
  2. Deferred Result
  3. Response body emitter



■ Future  : 비동기 작업의 결과를 가져오는 핸들러
package toby;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@Slf4j
public class FutureEx {
    public static void main(String[] args) throws InterruptedException{
        ExecutorService es = Executors.newCachedThreadPool();
        es.execute(()-> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {}
            log.info("Async");
        });
        log.info("Exit");
    }
}


package toby;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@Slf4j
public class FutureEx {
    public static void main(String[] args) throws InterruptedException, ExecutionException{
        ExecutorService es = Executors.newCachedThreadPool();
        Future<String> f = es.submit(()-> {
            Thread.sleep(2000);
            log.info("Async");
            return "Hello";
        });
        System.out.println(f.isDone());
        Thread.sleep(2100);
        log.info("Exit");
        System.out.println(f.isDone());
        System.out.println(f.get()); // Bloking method
    }
}


package toby;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.stereotype.Component;
import java.util.concurrent.Future;
@SpringBootApplication
@Slf4j
@EnableAsync
public class Tobytv8Application {
    @Component
    public static class MyService {
        @Async
        public Future<String> hello() throws InterruptedException {
            log.debug("hello()");
            Thread.sleep(2000);
            return new AsyncResult<>( "Hello");
        }
    }
    public static void main(String[] args) {
        try (ConfigurableApplicationContext    c = SpringApplication.run(Tobytv8Application.class, args)) {
        }
    }
    @Autowired
    MyService myService;
    @Bean
    ApplicationRunner run() {
        return args -> {
            log.info("run()");
            Future<String> f = myService.hello();
            log.info("exit: " + f.isDone());
            log.info("result: " + f.get());
        };
    }
}


package toby;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.*;
@Slf4j
public class FutureEx {
    public static void main(String[] args) throws InterruptedException, ExecutionException{
        ExecutorService es = Executors.newCachedThreadPool();
        FutureTask<String> f = new FutureTask<String>(()->{
            Thread.sleep(2000);
            log.info("Async");
            return "Hello";
        }) {
            @Override
            protected void done() {
                try {
                    System.out.println(get());
                }catch(InterruptedException e) {
                    e.printStackTrace();
                }catch(ExecutionException e) {
                    e.printStackTrace();
                }
            }
        };
        es.execute(f);
        es.shutdown();
    }
}



■ Callback

package toby;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.concurrent.Callable;
@SpringBootApplication
@Slf4j
@EnableAsync
public class Tobytv8Application {
    @RestController
    public static class MyController {
        @GetMapping("/callable")
        public Callable<String> async() throws InterruptedException {
            log.info("callable");
            return ()-> {
                log.info("async");
                Thread.sleep(2000);
                return "hello";
            };
        }
    }
    public static void main(String[] args) {
        SpringApplication.run(Tobytv8Application.class, args);
    }
}



package toby;
import lombok.extern.slf4j.Slf4j;
import java.util.Objects;
import java.util.concurrent.*;
@Slf4j
public class FutureEx {
    interface SuccessCallback {
        void onSuccess(String result);
    }
    public static class CallbackFutureTask extends FutureTask<String> {
        SuccessCallback sc;
        public CallbackFutureTask(Callable<String> callable, SuccessCallback sc) {
            super(callable);
            this.sc = Objects.requireNonNull(sc);
        }
        @Override
        protected void done() {
            try {
                sc.onSuccess(get());
            }catch(InterruptedException e){
                e.printStackTrace();
            }catch(ExecutionException e){
                e.printStackTrace();
            }
        }
    }
    public static void main(String[] args) throws InterruptedException, ExecutionException{
        ExecutorService es = Executors.newCachedThreadPool();
        CallbackFutureTask f = new CallbackFutureTask(() -> {
            Thread.sleep(2000);
            log.info("Async");
            return "Hello";
        }, System.out::println);
        es.execute(f);
        es.shutdown();
    }
}




package toby;
import lombok.extern.slf4j.Slf4j;
import java.util.Objects;
import java.util.concurrent.*;
@Slf4j
public class FutureEx {
    interface SuccessCallback {
        void onSuccess(String result);
    }
    interface ExceptionCallback {
        void onError(Throwable t);
    }
    public static class CallbackFutureTask extends FutureTask<String> {
        SuccessCallback sc;
        ExceptionCallback ec;
        public CallbackFutureTask(Callable<String> callable, SuccessCallback sc, ExceptionCallback ec) {
            super(callable);
            this.sc = Objects.requireNonNull(sc);
            this.ec = Objects.requireNonNull(ec);
        }
        @Override
        protected void done() {
            try {
                sc.onSuccess(get());
            }catch(InterruptedException e){
                Thread.currentThread().interrupt();
            }catch(ExecutionException e){
                ec.onError(e.getCause());
            }
        }
    }
    public static void main(String[] args) throws InterruptedException, ExecutionException{
        ExecutorService es = Executors.newCachedThreadPool();
        CallbackFutureTask f = new CallbackFutureTask(() -> {
            Thread.sleep(2000);
            if(1==1) throw new RuntimeException("Async ERROR!!!");
            log.info("Async");
            return "Hello";
        },
                s-> System.out.println("Result: " + s),
                e-> System.out.println("Error: " + e.getMessage()));
        es.execute(f);
        es.shutdown();
    }
}




■ ListernableFuture
package toby;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
@SpringBootApplication
@Slf4j
@EnableAsync
public class Tobytv8Application {
    @Component
    public static class MyService {
        @Async("tp")
        public ListenableFuture<String> hello() throws InterruptedException {
            log.debug("hello()");
            Thread.sleep(2000);
            return new AsyncResult<>( "Hello");
        }
    }
    @Bean // @Async 대신 사용할 수 있는 방법
    ThreadPoolTaskExecutor tp() {
        ThreadPoolTaskExecutor te = new ThreadPoolTaskExecutor();
        te.setCorePoolSize(10);
        te.setMaxPoolSize(100);
        te.setQueueCapacity(200);
        te.setThreadNamePrefix("mythread");
        te.initialize();
        return te;
    }
    public static void main(String[] args) {
        try (ConfigurableApplicationContext    c = SpringApplication.run(Tobytv8Application.class, args)) {
        }
    }
    @Autowired
    MyService myService;
    @Bean
    ApplicationRunner run() {
        return args -> {
            log.info("run()");
            ListenableFuture<String> f = myService.hello();
            f.addCallback(s-> System.out.println(s), e-> System.out.println(e.getMessage()));
            log.info("exit");
        };
    }
}



@Async // 계속 쓰레드를 생성하므로 실전에 절대 사용 하면 안됨
@Async("tp") // 비동기 작업이 여러개 이고, 쓰레드 풀을 분리해서 적용하는 방법


■ CompletableFuture
(생략)




- Servlet은 기본적으로 Blocking I/0 방식이다.





■ Callable 관련 Load Test

- LoadTest.java
package toby;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StopWatch;
import org.springframework.web.client.RestTemplate;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
public class LoadTest {
    static AtomicInteger counter = new AtomicInteger(0);
    public static void main(String[] args) throws InterruptedException{
        ExecutorService es = Executors.newFixedThreadPool(100);
        RestTemplate rt = new RestTemplate();
        String url = "http://localhost:8080/callable";;
        StopWatch main = new StopWatch();
        main.start();
        for(int i=0; i<100; i++){
            es.execute(()-> {
                int idx = counter.addAndGet(1);
                log.info("Thread: {}", idx);
                StopWatch sw = new StopWatch();
                sw.start();
                rt.getForObject(url, String.class);
                sw.stop();
                log.info("Elapsed: {} {}", idx ,sw.getTotalTimeSeconds());
            });
        }
        es.shutdown();
        es.awaitTermination(100, TimeUnit.SECONDS);
        main.stop();
        log.info("Total: {}", main.getTotalTimeSeconds());
    }
}


- Tobytv8Applicaiton.java
package toby;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.concurrent.Callable;
@SpringBootApplication
@Slf4j
@EnableAsync
public class Tobytv8Application {
    @RestController
    public static class MyController {
        @GetMapping("/callable")
        public Callable<String> callable() throws InterruptedException {
            log.info("callable");
            return ()-> {
                log.info("async");
                Thread.sleep(2000);
                return "hello";
            };
        }
//        public String callable() throws InterruptedException {
//            log.info("async");
//            Thread.sleep(2000);
//            return "hello";
//        }
    }
    public static void main(String[] args) {
        SpringApplication.run(Tobytv8Application.class, args);
    }
}



- application.properties
server.tomcat.max-threads=1


■ DeferredResult : 어떤 요청에 의해서, 기존의 지연되어있는 HTTP응답을 나중에 써줄 수 있게 하는 기술

package toby;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.async.DeferredResult;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedDeque;
@SpringBootApplication
@Slf4j
@EnableAsync
public class Tobytv8Application {
    @RestController
    public static class MyController {
        Queue<DeferredResult<String>> results = new ConcurrentLinkedDeque<>();
        @GetMapping("/dr")
        public DeferredResult<String> callable() throws InterruptedException {
            log.info("dr");
            DeferredResult<String> dr = new DeferredResult<>(600000L);
            results.add(dr);
            return dr;
        }
        @GetMapping("/dr/count")
        public String drcount() {
            return String.valueOf(results.size());
        }
        @GetMapping("/dr/event")
        public String drevent(String msg) {
            for(DeferredResult<String> dr : results) {
                dr.setResult("Hello " + msg);
                results.remove(dr);
            }
            return "OK";
        }
    }
    public static void main(String[] args) {
        SpringApplication.run(Tobytv8Application.class, args);
    }
}




■ DeferredResult 관련 Load Test
- LoadTest.java
package toby;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StopWatch;
import org.springframework.web.client.RestTemplate;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
public class LoadTest {
    static AtomicInteger counter = new AtomicInteger(0);
    public static void main(String[] args) throws InterruptedException{
        ExecutorService es = Executors.newFixedThreadPool(100);
        RestTemplate rt = new RestTemplate();
        String url = "http://localhost:8080/dr";;
        StopWatch main = new StopWatch();
        main.start();
        for(int i=0; i<100; i++){
            es.execute(()-> {
                int idx = counter.addAndGet(1);
                log.info("Thread: {}", idx);
                StopWatch sw = new StopWatch();
                sw.start();
                rt.getForObject(url, String.class);
                sw.stop();
                log.info("Elapsed: {} {}", idx ,sw.getTotalTimeSeconds());
            });
        }
        es.shutdown();
        es.awaitTermination(100, TimeUnit.SECONDS);
        main.stop();
        log.info("Total: {}", main.getTotalTimeSeconds());
    }
}




■ emitter
package toby;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.async.DeferredResult;
import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter;
import java.util.concurrent.Executors;
@SpringBootApplication
@Slf4j
@EnableAsync
public class Tobytv8Application {
    @RestController
    public static class MyController {
        @GetMapping("/emitter")
        public ResponseBodyEmitter emitter() throws InterruptedException {
            ResponseBodyEmitter emitter = new ResponseBodyEmitter();
            Executors.newSingleThreadExecutor().submit(() -> {
                for (int i = 1; i <= 50; i++) {
                    try {
                        emitter.send("<p>Stream " + i + "</p>");
                        Thread.sleep(100);
                    } catch (Exception e) {
                    }
                }
            });
            return emitter;
        }
    }
    public static void main(String[] args) {
        SpringApplication.run(Tobytv8Application.class, args);
    }
}



(시청일 : 20171105)




■ Reactive Streams - Schedulers

package toby.live;
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Slf4j
public class ScheduleEx {
    public static void main(String[] args) {
        //pub
        Publisher<Integer> pub = sub -> {
            sub.onSubscribe(new Subscription() {
                @Override
                public void request(long n) {
                    sub.onNext(1);
                    sub.onNext(2);
                    sub.onNext(3);
                    sub.onNext(4);
                    sub.onNext(5);
                    sub.onComplete();
                }
                @Override
                public void cancel() {
                }
            });
        };
        /* SubscribeOn */
//        Publisher<Integer> subOnPub = sub -> {
//            ExecutorService es = Executors.newSingleThreadExecutor(new CustomizableThreadFactory(){
//                    @Override
//                    public String getThreadNamePrefix() {
//                        return "pubOn-";
//                    }
//            });
//            es.execute(()->pub.subscribe(sub));
//        };
        /* PublishOn */
        Publisher<Integer> pubOnSub = sub -> {
            pub.subscribe(new Subscriber<Integer>() {
                ExecutorService es = Executors.newSingleThreadExecutor(new CustomizableThreadFactory(){
                    @Override
                    public String getThreadNamePrefix() {
                        return "subOn-";
                    }
                });
                @Override
                public void onSubscribe(Subscription s) {
                    sub.onSubscribe(s);
                }
                @Override
                public void onNext(Integer integer) {
                    sub.onNext(integer);
                }
                @Override
                public void onError(Throwable t) {
                     es.execute(()->sub.onError(t));
                     es.shutdown();
                }
                @Override
                public void onComplete() {
                    es.execute(()->sub.onComplete());
                    es.shutdown();
                }
            });
        };
        //sub
        pubOnSub.subscribe(new Subscriber<Integer>() {
            @Override
            public void onSubscribe(Subscription s) {
                log.debug("onSubscribe");
                s.request(Long.MAX_VALUE);
            }
            @Override
            public void onNext(Integer integer) {
                log.debug("onNext:{}", integer);
            }
            @Override
            public void onError(Throwable t) {
                log.debug("onError:{}",t);
            }
            @Override
            public void onComplete() {
                log.debug("onComplete");
            }
        });
        System.out.println("exit");
    }
}


package toby.live;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
public class FluxScEx {
    public static void main(String[] args) {
        Flux.range(1,10)
                .publishOn(Schedulers.newSingle("pub"))
                .log()
                //.subscribeOn(Schedulers.newSingle("sub"))
                .subscribe(System.out::println);
        System.out.println("exit");
    }
}



<쓰레드>
- user : user 쓰레드가 하나라도 남아있으면, JVM은 user 쓰레드를 종료시키지 않음
- daemon :  user 쓰레드가 하나도 남지않고 daemon 쓰레드만 남아 있을 경우, JVM은 daemon 쓰레드를 모두 강제 종료 시킴. user 쓰레드가 하나라도 남아 있으면,  JVM은 daemon 쓰레드를 종료시키지 않음.


■ interval 예제
package toby.live;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import java.time.Duration;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@Slf4j
public class FluxScEx {
    public static void main(String[] args) throws InterruptedException {
//        Flux.interval(Duration.ofMillis(500))
//                .subscribe(s->log.debug("onNext:{}", s));
//        log.debug("exit");
//        TimeUnit.SECONDS.sleep(5);
//        Executors.newSingleThreadExecutor().execute(() -> {
//            try {
//                TimeUnit.SECONDS.sleep(2);
//            }catch (InterruptedException e) {}
//            System.out.println("Hello");
//        });
//        System.out.println("exit");
        Flux.interval(Duration.ofMillis(200))
                .take(10)
                .subscribe(s->log.debug("onNext:{}",s));
        TimeUnit.SECONDS.sleep(10);
    }
}


package toby.live;
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@Slf4j
public class IntervalEx {
    public static void main(String[] args) {
        Publisher<Integer> pub = sub -> {
            sub.onSubscribe(new Subscription() {
                int no = 0;
                volatile boolean cancelled = false;
                @Override
                public void request(long n) {
                    ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor();
                    exec.scheduleAtFixedRate(() -> {
                        if(cancelled) {
                            exec.shutdown();
                            return;
                        }
                        sub.onNext(no++);
                    }, 0, 500, TimeUnit.NANOSECONDS);
                }
                @Override
                public void cancel() {
                    cancelled = true;
                }
            });
        };
        Publisher<Integer> takePub = sub -> {
            pub.subscribe(new Subscriber<Integer>() {
                int count = 0;
                Subscription subsc;
                @Override
                public void onSubscribe(Subscription s) {
                    subsc = s;
                    sub.onSubscribe(s);
                }
                @Override
                public void onNext(Integer integer) {
                    sub.onNext(integer);
                    if(++count >= 5) {
                        subsc.cancel();
                    }
                }
                @Override
                public void onError(Throwable t) {
                    sub.onError(t);
                }
                @Override
                public void onComplete() {
                    sub.onComplete();
                }
            });
        };
        pub.subscribe(new Subscriber<Integer>() {
            @Override
            public void onSubscribe(Subscription s) {
                log.debug("onSubscribe");
                s.request(Long.MAX_VALUE);
            }
            @Override
            public void onNext(Integer integer) {
                log.debug("onNext:{}", integer);
            }
            @Override
            public void onError(Throwable t) {
                log.debug("onError:{}",t);
            }
            @Override
            public void onComplete() {
                log.debug("onComplete");
            }
        });
    }
}




(시청일 : 20171105)




■ Reactive Streams - Operators
<Operator의 역할>
  1. 데이터 제어(변환/조작 등)
  2. 스케쥴링
  3. 퍼블리싱 제어 ex) take

Publisher -> [Data1] -> op1 -> [Data2] -> op2 -> [Data3] -> Subscriber 

import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import java.util.List;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/*
Publisher -> [Data1] -> mapPub -> [Data2] -> logSub
                               <- Subscribe(logSub)
                                -> on Subscribe(s)
                                -> onNext
                                -> onNext
                                -> onComplete
1. map (d1 -> f -> d2)
*/
@Slf4j
public class PubSub {
    public static void main(String[] args) {
        Publisher<Integer> pub = iterPub(Stream.iterate(1, a->a+1).limit(10).collect(Collectors.toList()));
        Publisher<Integer> mapPub = mapPub(pub, s -> s * 10);
        //Publisher<Integer> sumPub = sumPub(pub);
        //Publisher<Integer> reducePub = reducePub(pub, 0, (BiFunction<Integer,Integer,Integer>)(a, b)->a+b);
        mapPub.subscribe(logSub());
    }
    private static Publisher<Integer> reducePub(Publisher<Integer> pub, int init, BiFunction<Integer, Integer, Integer> bf) {
        return new Publisher<Integer>() {
            @Override
            public void subscribe(Subscriber<? super Integer> sub) {
                pub.subscribe(new DelegateSub(sub) {
                    int result = init;
                    @Override
                    public void onNext(Integer i) {
                        result = bf.apply(result,i);
                    }
                    @Override
                    public void onComplete() {
                        sub.onNext(result);
                        sub.onComplete();
                    }
                });
            }
        };
    }
    private static Publisher<Integer> sumPub(Publisher<Integer> pub) {
        return new Publisher<Integer>() {
            @Override
            public void subscribe(Subscriber<? super Integer> sub) {
                pub.subscribe(new DelegateSub(sub) {
                                int sum = 0 ;
                                  @Override
                                  public void onNext(Integer i) {
                                      sum += i;
                                  }
                                  @Override
                                  public void onComplete() {
                                      sub.onNext(sum);
                                      sub.onComplete();
                                  }
                                }
                );
            }
        };
    }
    private static Publisher<Integer> mapPub(Publisher<Integer> pub, Function<Integer, Integer> f) {
        return new Publisher<Integer>() {
            @Override
            public void subscribe(Subscriber<? super Integer> sub) {
                pub.subscribe(new DelegateSub(sub) {
                    @Override
                    public void onNext(Integer i) {
                        sub.onNext(f.apply(i));
                    }
                });
            }
        };
    }
    private static Subscriber<Integer> logSub() {
        return new Subscriber<Integer>() {
            @Override
            public void onSubscribe(Subscription s) {
                log.debug("onSubscribe");
                s.request(Long.MAX_VALUE);
            }
            @Override
            public void onNext(Integer i) {
                log.debug("onNext:{}", i);
            }
            @Override
            public void onError(Throwable t) {
                log.debug("onError:{}", t);
            }
            @Override
            public void onComplete() {
                log.debug("onComplete");
            }
        };
    }
    private static Publisher<Integer> iterPub(List<Integer> iter) {
        return new Publisher<Integer>() {
            @Override
            public void subscribe(Subscriber<? super Integer> sub) {
                sub.onSubscribe(new Subscription() {
                    @Override
                    public void request(long n) {
                        try {
                            iter.forEach(s -> sub.onNext(s));
                            sub.onComplete();
                        }catch(Throwable t) {
                            sub.onError(t);
                        }
                    }
                    @Override
                    public void cancel() {
                    }
                });
            }
        };
    }
}

      



import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import java.util.List;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/*
Publisher -> [Data1] -> mapPub -> [Data2] -> logSub
                               <- Subscribe(logSub)
                                -> on Subscribe(s)
                                -> onNext
                                -> onNext
                                -> onComplete
1. map (d1 -> f -> d2)
*/
@Slf4j
public class PubSub {
    public static void main(String[] args) {
        Publisher<Integer> pub = iterPub(Stream.iterate(1, a->a+1).limit(10).collect(Collectors.toList()));
        //Publisher<String> mapPub = mapPub(pub, s -> "["+s+"]");
        //Publisher<Integer> sumPub = sumPub(pub);
        Publisher<StringBuilder> reducePub = reducePub(pub, new StringBuilder(), (a,b)->a.append(b+","));
        reducePub.subscribe(logSub());
    }
    private static <T,R> Publisher<R> reducePub(Publisher<T> pub, R init, BiFunction<R, T, R> bf) {
        return new Publisher<R>() {
            @Override
            public void subscribe(Subscriber<? super R> sub) {
                pub.subscribe(new DelegateSub<T, R>(sub) {
                    R result = init;
                    @Override
                    public void onNext(T i) {
                        result = bf.apply(result,i);
                    }
                    @Override
                    public void onComplete() {
                        sub.onNext(result);
                        sub.onComplete();
                    }
                });
            }
        };
    }
//
//    private static Publisher<Integer> sumPub(Publisher<Integer> pub) {
//        return new Publisher<Integer>() {
//            @Override
//            public void subscribe(Subscriber<? super Integer> sub) {
//                pub.subscribe(new DelegateSub(sub) {
//                                int sum = 0 ;
//
//                                  @Override
//                                  public void onNext(Integer i) {
//                                      sum += i;
//                                  }
//
//                                  @Override
//                                  public void onComplete() {
//                                      sub.onNext(sum);
//                                      sub.onComplete();
//                                  }
//                                }
//
//                );
//            }
//        };
//    }
    // T -> R
    private static <T,R> Publisher<R> mapPub(Publisher<T> pub, Function<T, R> f) {
        return new Publisher<R>() {
            @Override
            public void subscribe(Subscriber<? super R> sub) {
                pub.subscribe(new DelegateSub<T,R>(sub) {
                    @Override
                    public void onNext(T i) {
                        sub.onNext(f.apply(i));
                    }
                });
            }
        };
    }
    private static <T> Subscriber<T> logSub() {
        return new Subscriber<T>() {
            @Override
            public void onSubscribe(Subscription s) {
                log.debug("onSubscribe");
                s.request(Long.MAX_VALUE);
            }
            @Override
            public void onNext(T i) {
                log.debug("onNext:{}", i);
            }
            @Override
            public void onError(Throwable t) {
                log.debug("onError:{}", t);
            }
            @Override
            public void onComplete() {
                log.debug("onComplete");
            }
        };
    }
    private static Publisher<Integer> iterPub(List<Integer> iter) {
        return new Publisher<Integer>() {
            @Override
            public void subscribe(Subscriber<? super Integer> sub) {
                sub.onSubscribe(new Subscription() {
                    @Override
                    public void request(long n) {
                        try {
                            iter.forEach(s -> sub.onNext(s));
                            sub.onComplete();
                        }catch(Throwable t) {
                            sub.onError(t);
                        }
                    }
                    @Override
                    public void cancel() {
                    }
                });
            }
        };
    }
}




■ Reactor
import reactor.core.publisher.Flux;
public class ReactorEx {
    public static void main(String[] args) {
        Flux.<Integer>create(e->{
            e.next(1);
            e.next(2);
            e.next(3);
            e.complete();
        })
                .log()
                .map(s->s*10)
                .reduce(0, (a,b)->a+b)
                .log()
                .subscribe(System.out::println);
    }
}


package toby;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@SpringBootApplication
public class Tobytv007Application {
    @RestController
    public static class Controller {
        @RequestMapping("/hello")
        public Publisher<String> hello(String name) {
            return new Publisher<String>() {
                @Override
                public void subscribe(Subscriber<? super String> s) {
                    s.onSubscribe(new Subscription() {
                        @Override
                        public void request(long n) {
                            s.onNext("Hello " + name);
                            s.onComplete();
                        }
                        @Override
                        public void cancel() {
                        }
                    });
                }
            };
        }
    }
    public static void main(String[] args) {
        SpringApplication.run(Tobytv007Application.class, args);
    }
}


(시청일 : 20171029)

- Reactive Streams => http://www.reactive-streams.org


■ Reactive : 외부 이벤트나 데이터 등이 발생할 때, 이에 대응하여 동작하는 프로그래밍 방식

<관련 용어>
- FRP(Functional Reactive Programming)
- RFP(Reactive Functional Programming)
- RX(Reactive Extension)


<다룰 내용>
- Duality
- Observer pattern
- Reactive Streams - 표준 - Java9 API


■ for-each문에다 Iterable을 쓸 수 있다. => collection이 아니어도 순회할 수 있는 object는 for-each문에 쓸 수 있다.
import java.util.Arrays;

public class Ob {
    public static void main(String[] args) {
        Iterable<Integer> iter = Arrays.asList(1,2,3,4,5);
        for(Integer i : iter) {  // for-each
            System.out.println(i);
        }
    }
}




■ Iterable과 Observable의 쌍대성(duality) : 서로 기능은 같지만 서로 반대 방향으로 표현. 
- DATA method(void) <---> void method(DATA)


import javafx.collections.ObservableArrayBase;
import java.util.Iterator;
import java.util.Observable;
import java.util.Observer;
public class Ob {
    // Iterable과 Observable의 쌍대성(duality) : 서로 기능은 같지만 서로 반대 방향으로 표현
    // Iterable은 Pull 방식. Observable은 Push 방식
    // Iterable은 Pull 방식. Observable은 Push 방식.
    // Observable은 Event/Data를 Observer에 전달하는 Source
    /*
    public static void main(String[] args) {
        //Iterable<Integer> iter = new Iterable<Integer>() {
        //    @Override
        //    public Iterator<Integer> iterator() {
        //        return null;
        //   }
        //};
        Iterable<Integer> iter = () ->
                new Iterator<Integer>() {
                    int i = 0;
                    final static int MAX = 10;
                    public boolean hasNext() {
                        return i < MAX;
                    }
                    public Integer next() {
                        return ++i;
                    }
                };
        for(Integer i : iter) {  // for-each
            System.out.println(i);
        }
        for(Iterator<Integer> it = iter.iterator(); it.hasNext();) {
        }
        System.out.println(it.next()); // pull 방식
    }
    */
    static class IntObservable extends Observable implements Runnable {
        @Override
        public void run() {
            for(int i=1; i<=10; i++){
                setChanged();
                notifyObservers(i); // push 방식   <---> int i = it.next();  // pull 방식
            }
        }
    }
    @SuppressWarnings("deprecation")
    public static void main(String[] args) {
        Observer ob = new Observer() {
            @Override
            public void update(Observable o, Object arg) {
                System.out.println(arg);
            }
        };
        IntObservable io = new IntObservable();
        io.addObserver(ob);
        io.run();
    }
}






■ push 방식의 observer pattern
package toby.live;
import javafx.collections.ObservableArrayBase;
import java.util.Iterator;
import java.util.Observable;
import java.util.Observer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Ob {
    static class IntObservable extends Observable implements Runnable {
        @Override
        public void run() {
            for(int i=1; i<=10; i++){
                setChanged();
                notifyObservers(i); // push 방식   <---> int i = it.next();  // pull 방식
            }
        }
    }
    @SuppressWarnings("deprecation")
    public static void main(String[] args) {
        Observer ob = new Observer() {
            @Override
            public void update(Observable o, Object arg) {
                System.out.println(Thread.currentThread().getName() + " " + arg);
            }
        };
        IntObservable io = new IntObservable();
        io.addObserver(ob);
        ExecutorService es = Executors.newSingleThreadExecutor();
        es.execute(io);
        System.out.println(Thread.currentThread().getName() + " EXIT");
        es.shutdown();
    }
}

// 결과
/*
main EXIT
pool-1-thread-1 1
pool-1-thread-1 2
pool-1-thread-1 3
pool-1-thread-1 4
pool-1-thread-1 5
pool-1-thread-1 6
pool-1-thread-1 7
pool-1-thread-1 8
pool-1-thread-1 9
pool-1-thread-1 10
*/





■Complete, Error 개념이 추가된 Reactive observer pattern
A Publisher is a provider of a potentially unbounded number of sequenced elements, publishing them according to the demand received from its Subscriber(s).




import java.util.Arrays;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.TimeUnit;
public class PubSub {
    // Publisher <- Observable
    // Subscriber <- Observer
    public static void main(String[] args) throws InterruptedException {
        Iterable<Integer> itr = Arrays.asList(1,2,3,4,5);
        ExecutorService es = Executors.newCachedThreadPool();
        Publisher p = new Publisher() {
            @Override
            public void subscribe(Subscriber subscriber) {
                    Iterator<Integer> it = itr.iterator();
                    subscriber.onSubscribe(new Subscription() {
                        @Override
                        public void request(long n) {
                            es.execute(()->{
                                int i = 0;
                                try {
                                    while (i++ < n) {
                                        if (it.hasNext()) {
                                            subscriber.onNext(it.next());
                                        } else {
                                            subscriber.onComplete();
                                            break;
                                        }
                                    }
                                }catch(RuntimeException e) {
                                    subscriber.onError(e);
                                }
                            });
                        }
                        @Override
                        public void cancel() {
                        }
                    });
            }
        };
        Subscriber<Integer> s = new Subscriber<Integer>() {
            Subscription subscription;
            @Override
            public void onSubscribe(Subscription subscription) {
                 System.out.println(Thread.currentThread().getName() + " onSubscribe");
                 this.subscription = subscription;
                 this.subscription.request(1);
            }
            @Override
            public void onNext(Integer item) {
                System.out.println(Thread.currentThread().getName() + " on Next " +  item);
                this.subscription.request(1);
            }
            @Override
            public void onError(Throwable throwable) {
                System.out.println("onError : " + throwable.getMessage());
            }
            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        };
        p.subscribe(s);
        es.awaitTermination(10, TimeUnit.HOURS);
        es.shutdown();
    }
}
//결과
/*
main onSubscribe
pool-1-thread-1 on Next 1
pool-1-thread-2 on Next 2
pool-1-thread-3 on Next 3
pool-1-thread-2 on Next 4
pool-1-thread-3 on Next 5
onComplete
*/


- GRPC => https://grpc.io


+ Recent posts