(시청일 : 20171105)




■ Reactive Streams - Schedulers

package toby.live;
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Slf4j
public class ScheduleEx {
    public static void main(String[] args) {
        //pub
        Publisher<Integer> pub = sub -> {
            sub.onSubscribe(new Subscription() {
                @Override
                public void request(long n) {
                    sub.onNext(1);
                    sub.onNext(2);
                    sub.onNext(3);
                    sub.onNext(4);
                    sub.onNext(5);
                    sub.onComplete();
                }
                @Override
                public void cancel() {
                }
            });
        };
        /* SubscribeOn */
//        Publisher<Integer> subOnPub = sub -> {
//            ExecutorService es = Executors.newSingleThreadExecutor(new CustomizableThreadFactory(){
//                    @Override
//                    public String getThreadNamePrefix() {
//                        return "pubOn-";
//                    }
//            });
//            es.execute(()->pub.subscribe(sub));
//        };
        /* PublishOn */
        Publisher<Integer> pubOnSub = sub -> {
            pub.subscribe(new Subscriber<Integer>() {
                ExecutorService es = Executors.newSingleThreadExecutor(new CustomizableThreadFactory(){
                    @Override
                    public String getThreadNamePrefix() {
                        return "subOn-";
                    }
                });
                @Override
                public void onSubscribe(Subscription s) {
                    sub.onSubscribe(s);
                }
                @Override
                public void onNext(Integer integer) {
                    sub.onNext(integer);
                }
                @Override
                public void onError(Throwable t) {
                     es.execute(()->sub.onError(t));
                     es.shutdown();
                }
                @Override
                public void onComplete() {
                    es.execute(()->sub.onComplete());
                    es.shutdown();
                }
            });
        };
        //sub
        pubOnSub.subscribe(new Subscriber<Integer>() {
            @Override
            public void onSubscribe(Subscription s) {
                log.debug("onSubscribe");
                s.request(Long.MAX_VALUE);
            }
            @Override
            public void onNext(Integer integer) {
                log.debug("onNext:{}", integer);
            }
            @Override
            public void onError(Throwable t) {
                log.debug("onError:{}",t);
            }
            @Override
            public void onComplete() {
                log.debug("onComplete");
            }
        });
        System.out.println("exit");
    }
}


package toby.live;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
public class FluxScEx {
    public static void main(String[] args) {
        Flux.range(1,10)
                .publishOn(Schedulers.newSingle("pub"))
                .log()
                //.subscribeOn(Schedulers.newSingle("sub"))
                .subscribe(System.out::println);
        System.out.println("exit");
    }
}



<쓰레드>
- user : user 쓰레드가 하나라도 남아있으면, JVM은 user 쓰레드를 종료시키지 않음
- daemon :  user 쓰레드가 하나도 남지않고 daemon 쓰레드만 남아 있을 경우, JVM은 daemon 쓰레드를 모두 강제 종료 시킴. user 쓰레드가 하나라도 남아 있으면,  JVM은 daemon 쓰레드를 종료시키지 않음.


■ interval 예제
package toby.live;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import java.time.Duration;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@Slf4j
public class FluxScEx {
    public static void main(String[] args) throws InterruptedException {
//        Flux.interval(Duration.ofMillis(500))
//                .subscribe(s->log.debug("onNext:{}", s));
//        log.debug("exit");
//        TimeUnit.SECONDS.sleep(5);
//        Executors.newSingleThreadExecutor().execute(() -> {
//            try {
//                TimeUnit.SECONDS.sleep(2);
//            }catch (InterruptedException e) {}
//            System.out.println("Hello");
//        });
//        System.out.println("exit");
        Flux.interval(Duration.ofMillis(200))
                .take(10)
                .subscribe(s->log.debug("onNext:{}",s));
        TimeUnit.SECONDS.sleep(10);
    }
}


package toby.live;
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@Slf4j
public class IntervalEx {
    public static void main(String[] args) {
        Publisher<Integer> pub = sub -> {
            sub.onSubscribe(new Subscription() {
                int no = 0;
                volatile boolean cancelled = false;
                @Override
                public void request(long n) {
                    ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor();
                    exec.scheduleAtFixedRate(() -> {
                        if(cancelled) {
                            exec.shutdown();
                            return;
                        }
                        sub.onNext(no++);
                    }, 0, 500, TimeUnit.NANOSECONDS);
                }
                @Override
                public void cancel() {
                    cancelled = true;
                }
            });
        };
        Publisher<Integer> takePub = sub -> {
            pub.subscribe(new Subscriber<Integer>() {
                int count = 0;
                Subscription subsc;
                @Override
                public void onSubscribe(Subscription s) {
                    subsc = s;
                    sub.onSubscribe(s);
                }
                @Override
                public void onNext(Integer integer) {
                    sub.onNext(integer);
                    if(++count >= 5) {
                        subsc.cancel();
                    }
                }
                @Override
                public void onError(Throwable t) {
                    sub.onError(t);
                }
                @Override
                public void onComplete() {
                    sub.onComplete();
                }
            });
        };
        pub.subscribe(new Subscriber<Integer>() {
            @Override
            public void onSubscribe(Subscription s) {
                log.debug("onSubscribe");
                s.request(Long.MAX_VALUE);
            }
            @Override
            public void onNext(Integer integer) {
                log.debug("onNext:{}", integer);
            }
            @Override
            public void onError(Throwable t) {
                log.debug("onError:{}",t);
            }
            @Override
            public void onComplete() {
                log.debug("onComplete");
            }
        });
    }
}




+ Recent posts