■ Gradle에서 Reactive Streams 라이브러리 추가 시   
compile group: 'org.reactivestreams', name: 'reactive-streams', version: '0.4.0.M2'
// Reactive Streams
compile "org.reactivestreams:reactive-streams:1.0.1"


■ Gradle에서 slf4j 라이브러리 추가 시
//slf4j
compile 'org.slf4j:slf4j-api:1.7.5'
//compile 'org.slf4j:slf4j-simple:1.7.5'
 

■ Gradle에서 log4j 라이브러리 추가 시
//log4j
compile group: 'org.apache.logging.log4j', name: 'log4j-api', version: '2.9.1'
compile group: 'org.apache.logging.log4j', name: 'log4j-core', version: '2.9.1'


■ Gradle에서 lombok 라이브러리 추가 시
//lombok
compileOnly 'org.projectlombok:lombok:1.16.20'



■ Gradle에서 Reactor Core 라이브러리 추가 시
// Reactor Core
compile 'io.projectreactor:reactor-core:3.1.1.RELEASE'


■ Gradle에서 Netty 라이브러리 추가 시
compile group: 'io.netty', name: 'netty-all', version: '4.0.4.Final'


■ Gradle에서 jsp 설정 시

/* jsp 설정 */
compile("javax.servlet:jstl")
compile("org.apache.tomcat.embed:tomcat-embed-jasper")


■ Gradle에서 JSONParser, JSONObject 설정 시

/* JSONParser, JSONObject설정 */
compile group: 'com.googlecode.json-simple', name: 'json-simple', version: '1.1'


(시청일 : 20171105)




■ Reactive Streams - Schedulers

package toby.live;
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Slf4j
public class ScheduleEx {
    public static void main(String[] args) {
        //pub
        Publisher<Integer> pub = sub -> {
            sub.onSubscribe(new Subscription() {
                @Override
                public void request(long n) {
                    sub.onNext(1);
                    sub.onNext(2);
                    sub.onNext(3);
                    sub.onNext(4);
                    sub.onNext(5);
                    sub.onComplete();
                }
                @Override
                public void cancel() {
                }
            });
        };
        /* SubscribeOn */
//        Publisher<Integer> subOnPub = sub -> {
//            ExecutorService es = Executors.newSingleThreadExecutor(new CustomizableThreadFactory(){
//                    @Override
//                    public String getThreadNamePrefix() {
//                        return "pubOn-";
//                    }
//            });
//            es.execute(()->pub.subscribe(sub));
//        };
        /* PublishOn */
        Publisher<Integer> pubOnSub = sub -> {
            pub.subscribe(new Subscriber<Integer>() {
                ExecutorService es = Executors.newSingleThreadExecutor(new CustomizableThreadFactory(){
                    @Override
                    public String getThreadNamePrefix() {
                        return "subOn-";
                    }
                });
                @Override
                public void onSubscribe(Subscription s) {
                    sub.onSubscribe(s);
                }
                @Override
                public void onNext(Integer integer) {
                    sub.onNext(integer);
                }
                @Override
                public void onError(Throwable t) {
                     es.execute(()->sub.onError(t));
                     es.shutdown();
                }
                @Override
                public void onComplete() {
                    es.execute(()->sub.onComplete());
                    es.shutdown();
                }
            });
        };
        //sub
        pubOnSub.subscribe(new Subscriber<Integer>() {
            @Override
            public void onSubscribe(Subscription s) {
                log.debug("onSubscribe");
                s.request(Long.MAX_VALUE);
            }
            @Override
            public void onNext(Integer integer) {
                log.debug("onNext:{}", integer);
            }
            @Override
            public void onError(Throwable t) {
                log.debug("onError:{}",t);
            }
            @Override
            public void onComplete() {
                log.debug("onComplete");
            }
        });
        System.out.println("exit");
    }
}


package toby.live;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
public class FluxScEx {
    public static void main(String[] args) {
        Flux.range(1,10)
                .publishOn(Schedulers.newSingle("pub"))
                .log()
                //.subscribeOn(Schedulers.newSingle("sub"))
                .subscribe(System.out::println);
        System.out.println("exit");
    }
}



<쓰레드>
- user : user 쓰레드가 하나라도 남아있으면, JVM은 user 쓰레드를 종료시키지 않음
- daemon :  user 쓰레드가 하나도 남지않고 daemon 쓰레드만 남아 있을 경우, JVM은 daemon 쓰레드를 모두 강제 종료 시킴. user 쓰레드가 하나라도 남아 있으면,  JVM은 daemon 쓰레드를 종료시키지 않음.


■ interval 예제
package toby.live;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import java.time.Duration;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@Slf4j
public class FluxScEx {
    public static void main(String[] args) throws InterruptedException {
//        Flux.interval(Duration.ofMillis(500))
//                .subscribe(s->log.debug("onNext:{}", s));
//        log.debug("exit");
//        TimeUnit.SECONDS.sleep(5);
//        Executors.newSingleThreadExecutor().execute(() -> {
//            try {
//                TimeUnit.SECONDS.sleep(2);
//            }catch (InterruptedException e) {}
//            System.out.println("Hello");
//        });
//        System.out.println("exit");
        Flux.interval(Duration.ofMillis(200))
                .take(10)
                .subscribe(s->log.debug("onNext:{}",s));
        TimeUnit.SECONDS.sleep(10);
    }
}


package toby.live;
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@Slf4j
public class IntervalEx {
    public static void main(String[] args) {
        Publisher<Integer> pub = sub -> {
            sub.onSubscribe(new Subscription() {
                int no = 0;
                volatile boolean cancelled = false;
                @Override
                public void request(long n) {
                    ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor();
                    exec.scheduleAtFixedRate(() -> {
                        if(cancelled) {
                            exec.shutdown();
                            return;
                        }
                        sub.onNext(no++);
                    }, 0, 500, TimeUnit.NANOSECONDS);
                }
                @Override
                public void cancel() {
                    cancelled = true;
                }
            });
        };
        Publisher<Integer> takePub = sub -> {
            pub.subscribe(new Subscriber<Integer>() {
                int count = 0;
                Subscription subsc;
                @Override
                public void onSubscribe(Subscription s) {
                    subsc = s;
                    sub.onSubscribe(s);
                }
                @Override
                public void onNext(Integer integer) {
                    sub.onNext(integer);
                    if(++count >= 5) {
                        subsc.cancel();
                    }
                }
                @Override
                public void onError(Throwable t) {
                    sub.onError(t);
                }
                @Override
                public void onComplete() {
                    sub.onComplete();
                }
            });
        };
        pub.subscribe(new Subscriber<Integer>() {
            @Override
            public void onSubscribe(Subscription s) {
                log.debug("onSubscribe");
                s.request(Long.MAX_VALUE);
            }
            @Override
            public void onNext(Integer integer) {
                log.debug("onNext:{}", integer);
            }
            @Override
            public void onError(Throwable t) {
                log.debug("onError:{}",t);
            }
            @Override
            public void onComplete() {
                log.debug("onComplete");
            }
        });
    }
}




(시청일 : 20171105)




■ Reactive Streams - Operators
<Operator의 역할>
  1. 데이터 제어(변환/조작 등)
  2. 스케쥴링
  3. 퍼블리싱 제어 ex) take

Publisher -> [Data1] -> op1 -> [Data2] -> op2 -> [Data3] -> Subscriber 

import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import java.util.List;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/*
Publisher -> [Data1] -> mapPub -> [Data2] -> logSub
                               <- Subscribe(logSub)
                                -> on Subscribe(s)
                                -> onNext
                                -> onNext
                                -> onComplete
1. map (d1 -> f -> d2)
*/
@Slf4j
public class PubSub {
    public static void main(String[] args) {
        Publisher<Integer> pub = iterPub(Stream.iterate(1, a->a+1).limit(10).collect(Collectors.toList()));
        Publisher<Integer> mapPub = mapPub(pub, s -> s * 10);
        //Publisher<Integer> sumPub = sumPub(pub);
        //Publisher<Integer> reducePub = reducePub(pub, 0, (BiFunction<Integer,Integer,Integer>)(a, b)->a+b);
        mapPub.subscribe(logSub());
    }
    private static Publisher<Integer> reducePub(Publisher<Integer> pub, int init, BiFunction<Integer, Integer, Integer> bf) {
        return new Publisher<Integer>() {
            @Override
            public void subscribe(Subscriber<? super Integer> sub) {
                pub.subscribe(new DelegateSub(sub) {
                    int result = init;
                    @Override
                    public void onNext(Integer i) {
                        result = bf.apply(result,i);
                    }
                    @Override
                    public void onComplete() {
                        sub.onNext(result);
                        sub.onComplete();
                    }
                });
            }
        };
    }
    private static Publisher<Integer> sumPub(Publisher<Integer> pub) {
        return new Publisher<Integer>() {
            @Override
            public void subscribe(Subscriber<? super Integer> sub) {
                pub.subscribe(new DelegateSub(sub) {
                                int sum = 0 ;
                                  @Override
                                  public void onNext(Integer i) {
                                      sum += i;
                                  }
                                  @Override
                                  public void onComplete() {
                                      sub.onNext(sum);
                                      sub.onComplete();
                                  }
                                }
                );
            }
        };
    }
    private static Publisher<Integer> mapPub(Publisher<Integer> pub, Function<Integer, Integer> f) {
        return new Publisher<Integer>() {
            @Override
            public void subscribe(Subscriber<? super Integer> sub) {
                pub.subscribe(new DelegateSub(sub) {
                    @Override
                    public void onNext(Integer i) {
                        sub.onNext(f.apply(i));
                    }
                });
            }
        };
    }
    private static Subscriber<Integer> logSub() {
        return new Subscriber<Integer>() {
            @Override
            public void onSubscribe(Subscription s) {
                log.debug("onSubscribe");
                s.request(Long.MAX_VALUE);
            }
            @Override
            public void onNext(Integer i) {
                log.debug("onNext:{}", i);
            }
            @Override
            public void onError(Throwable t) {
                log.debug("onError:{}", t);
            }
            @Override
            public void onComplete() {
                log.debug("onComplete");
            }
        };
    }
    private static Publisher<Integer> iterPub(List<Integer> iter) {
        return new Publisher<Integer>() {
            @Override
            public void subscribe(Subscriber<? super Integer> sub) {
                sub.onSubscribe(new Subscription() {
                    @Override
                    public void request(long n) {
                        try {
                            iter.forEach(s -> sub.onNext(s));
                            sub.onComplete();
                        }catch(Throwable t) {
                            sub.onError(t);
                        }
                    }
                    @Override
                    public void cancel() {
                    }
                });
            }
        };
    }
}

      



import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import java.util.List;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/*
Publisher -> [Data1] -> mapPub -> [Data2] -> logSub
                               <- Subscribe(logSub)
                                -> on Subscribe(s)
                                -> onNext
                                -> onNext
                                -> onComplete
1. map (d1 -> f -> d2)
*/
@Slf4j
public class PubSub {
    public static void main(String[] args) {
        Publisher<Integer> pub = iterPub(Stream.iterate(1, a->a+1).limit(10).collect(Collectors.toList()));
        //Publisher<String> mapPub = mapPub(pub, s -> "["+s+"]");
        //Publisher<Integer> sumPub = sumPub(pub);
        Publisher<StringBuilder> reducePub = reducePub(pub, new StringBuilder(), (a,b)->a.append(b+","));
        reducePub.subscribe(logSub());
    }
    private static <T,R> Publisher<R> reducePub(Publisher<T> pub, R init, BiFunction<R, T, R> bf) {
        return new Publisher<R>() {
            @Override
            public void subscribe(Subscriber<? super R> sub) {
                pub.subscribe(new DelegateSub<T, R>(sub) {
                    R result = init;
                    @Override
                    public void onNext(T i) {
                        result = bf.apply(result,i);
                    }
                    @Override
                    public void onComplete() {
                        sub.onNext(result);
                        sub.onComplete();
                    }
                });
            }
        };
    }
//
//    private static Publisher<Integer> sumPub(Publisher<Integer> pub) {
//        return new Publisher<Integer>() {
//            @Override
//            public void subscribe(Subscriber<? super Integer> sub) {
//                pub.subscribe(new DelegateSub(sub) {
//                                int sum = 0 ;
//
//                                  @Override
//                                  public void onNext(Integer i) {
//                                      sum += i;
//                                  }
//
//                                  @Override
//                                  public void onComplete() {
//                                      sub.onNext(sum);
//                                      sub.onComplete();
//                                  }
//                                }
//
//                );
//            }
//        };
//    }
    // T -> R
    private static <T,R> Publisher<R> mapPub(Publisher<T> pub, Function<T, R> f) {
        return new Publisher<R>() {
            @Override
            public void subscribe(Subscriber<? super R> sub) {
                pub.subscribe(new DelegateSub<T,R>(sub) {
                    @Override
                    public void onNext(T i) {
                        sub.onNext(f.apply(i));
                    }
                });
            }
        };
    }
    private static <T> Subscriber<T> logSub() {
        return new Subscriber<T>() {
            @Override
            public void onSubscribe(Subscription s) {
                log.debug("onSubscribe");
                s.request(Long.MAX_VALUE);
            }
            @Override
            public void onNext(T i) {
                log.debug("onNext:{}", i);
            }
            @Override
            public void onError(Throwable t) {
                log.debug("onError:{}", t);
            }
            @Override
            public void onComplete() {
                log.debug("onComplete");
            }
        };
    }
    private static Publisher<Integer> iterPub(List<Integer> iter) {
        return new Publisher<Integer>() {
            @Override
            public void subscribe(Subscriber<? super Integer> sub) {
                sub.onSubscribe(new Subscription() {
                    @Override
                    public void request(long n) {
                        try {
                            iter.forEach(s -> sub.onNext(s));
                            sub.onComplete();
                        }catch(Throwable t) {
                            sub.onError(t);
                        }
                    }
                    @Override
                    public void cancel() {
                    }
                });
            }
        };
    }
}




■ Reactor
import reactor.core.publisher.Flux;
public class ReactorEx {
    public static void main(String[] args) {
        Flux.<Integer>create(e->{
            e.next(1);
            e.next(2);
            e.next(3);
            e.complete();
        })
                .log()
                .map(s->s*10)
                .reduce(0, (a,b)->a+b)
                .log()
                .subscribe(System.out::println);
    }
}


package toby;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@SpringBootApplication
public class Tobytv007Application {
    @RestController
    public static class Controller {
        @RequestMapping("/hello")
        public Publisher<String> hello(String name) {
            return new Publisher<String>() {
                @Override
                public void subscribe(Subscriber<? super String> s) {
                    s.onSubscribe(new Subscription() {
                        @Override
                        public void request(long n) {
                            s.onNext("Hello " + name);
                            s.onComplete();
                        }
                        @Override
                        public void cancel() {
                        }
                    });
                }
            };
        }
    }
    public static void main(String[] args) {
        SpringApplication.run(Tobytv007Application.class, args);
    }
}


(시청일 : 20171029)

- Reactive Streams => http://www.reactive-streams.org


■ Reactive : 외부 이벤트나 데이터 등이 발생할 때, 이에 대응하여 동작하는 프로그래밍 방식

<관련 용어>
- FRP(Functional Reactive Programming)
- RFP(Reactive Functional Programming)
- RX(Reactive Extension)


<다룰 내용>
- Duality
- Observer pattern
- Reactive Streams - 표준 - Java9 API


■ for-each문에다 Iterable을 쓸 수 있다. => collection이 아니어도 순회할 수 있는 object는 for-each문에 쓸 수 있다.
import java.util.Arrays;

public class Ob {
    public static void main(String[] args) {
        Iterable<Integer> iter = Arrays.asList(1,2,3,4,5);
        for(Integer i : iter) {  // for-each
            System.out.println(i);
        }
    }
}




■ Iterable과 Observable의 쌍대성(duality) : 서로 기능은 같지만 서로 반대 방향으로 표현. 
- DATA method(void) <---> void method(DATA)


import javafx.collections.ObservableArrayBase;
import java.util.Iterator;
import java.util.Observable;
import java.util.Observer;
public class Ob {
    // Iterable과 Observable의 쌍대성(duality) : 서로 기능은 같지만 서로 반대 방향으로 표현
    // Iterable은 Pull 방식. Observable은 Push 방식
    // Iterable은 Pull 방식. Observable은 Push 방식.
    // Observable은 Event/Data를 Observer에 전달하는 Source
    /*
    public static void main(String[] args) {
        //Iterable<Integer> iter = new Iterable<Integer>() {
        //    @Override
        //    public Iterator<Integer> iterator() {
        //        return null;
        //   }
        //};
        Iterable<Integer> iter = () ->
                new Iterator<Integer>() {
                    int i = 0;
                    final static int MAX = 10;
                    public boolean hasNext() {
                        return i < MAX;
                    }
                    public Integer next() {
                        return ++i;
                    }
                };
        for(Integer i : iter) {  // for-each
            System.out.println(i);
        }
        for(Iterator<Integer> it = iter.iterator(); it.hasNext();) {
        }
        System.out.println(it.next()); // pull 방식
    }
    */
    static class IntObservable extends Observable implements Runnable {
        @Override
        public void run() {
            for(int i=1; i<=10; i++){
                setChanged();
                notifyObservers(i); // push 방식   <---> int i = it.next();  // pull 방식
            }
        }
    }
    @SuppressWarnings("deprecation")
    public static void main(String[] args) {
        Observer ob = new Observer() {
            @Override
            public void update(Observable o, Object arg) {
                System.out.println(arg);
            }
        };
        IntObservable io = new IntObservable();
        io.addObserver(ob);
        io.run();
    }
}






■ push 방식의 observer pattern
package toby.live;
import javafx.collections.ObservableArrayBase;
import java.util.Iterator;
import java.util.Observable;
import java.util.Observer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Ob {
    static class IntObservable extends Observable implements Runnable {
        @Override
        public void run() {
            for(int i=1; i<=10; i++){
                setChanged();
                notifyObservers(i); // push 방식   <---> int i = it.next();  // pull 방식
            }
        }
    }
    @SuppressWarnings("deprecation")
    public static void main(String[] args) {
        Observer ob = new Observer() {
            @Override
            public void update(Observable o, Object arg) {
                System.out.println(Thread.currentThread().getName() + " " + arg);
            }
        };
        IntObservable io = new IntObservable();
        io.addObserver(ob);
        ExecutorService es = Executors.newSingleThreadExecutor();
        es.execute(io);
        System.out.println(Thread.currentThread().getName() + " EXIT");
        es.shutdown();
    }
}

// 결과
/*
main EXIT
pool-1-thread-1 1
pool-1-thread-1 2
pool-1-thread-1 3
pool-1-thread-1 4
pool-1-thread-1 5
pool-1-thread-1 6
pool-1-thread-1 7
pool-1-thread-1 8
pool-1-thread-1 9
pool-1-thread-1 10
*/





■Complete, Error 개념이 추가된 Reactive observer pattern
A Publisher is a provider of a potentially unbounded number of sequenced elements, publishing them according to the demand received from its Subscriber(s).




import java.util.Arrays;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.TimeUnit;
public class PubSub {
    // Publisher <- Observable
    // Subscriber <- Observer
    public static void main(String[] args) throws InterruptedException {
        Iterable<Integer> itr = Arrays.asList(1,2,3,4,5);
        ExecutorService es = Executors.newCachedThreadPool();
        Publisher p = new Publisher() {
            @Override
            public void subscribe(Subscriber subscriber) {
                    Iterator<Integer> it = itr.iterator();
                    subscriber.onSubscribe(new Subscription() {
                        @Override
                        public void request(long n) {
                            es.execute(()->{
                                int i = 0;
                                try {
                                    while (i++ < n) {
                                        if (it.hasNext()) {
                                            subscriber.onNext(it.next());
                                        } else {
                                            subscriber.onComplete();
                                            break;
                                        }
                                    }
                                }catch(RuntimeException e) {
                                    subscriber.onError(e);
                                }
                            });
                        }
                        @Override
                        public void cancel() {
                        }
                    });
            }
        };
        Subscriber<Integer> s = new Subscriber<Integer>() {
            Subscription subscription;
            @Override
            public void onSubscribe(Subscription subscription) {
                 System.out.println(Thread.currentThread().getName() + " onSubscribe");
                 this.subscription = subscription;
                 this.subscription.request(1);
            }
            @Override
            public void onNext(Integer item) {
                System.out.println(Thread.currentThread().getName() + " on Next " +  item);
                this.subscription.request(1);
            }
            @Override
            public void onError(Throwable throwable) {
                System.out.println("onError : " + throwable.getMessage());
            }
            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        };
        p.subscribe(s);
        es.awaitTermination(10, TimeUnit.HOURS);
        es.shutdown();
    }
}
//결과
/*
main onSubscribe
pool-1-thread-1 on Next 1
pool-1-thread-2 on Next 2
pool-1-thread-3 on Next 3
pool-1-thread-2 on Next 4
pool-1-thread-3 on Next 5
onComplete
*/


- GRPC => https://grpc.io


+ Recent posts