(시청일 : 20171105)




■ Reactive Streams - Operators
<Operator의 역할>
  1. 데이터 제어(변환/조작 등)
  2. 스케쥴링
  3. 퍼블리싱 제어 ex) take

Publisher -> [Data1] -> op1 -> [Data2] -> op2 -> [Data3] -> Subscriber 

import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import java.util.List;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/*
Publisher -> [Data1] -> mapPub -> [Data2] -> logSub
                               <- Subscribe(logSub)
                                -> on Subscribe(s)
                                -> onNext
                                -> onNext
                                -> onComplete
1. map (d1 -> f -> d2)
*/
@Slf4j
public class PubSub {
    public static void main(String[] args) {
        Publisher<Integer> pub = iterPub(Stream.iterate(1, a->a+1).limit(10).collect(Collectors.toList()));
        Publisher<Integer> mapPub = mapPub(pub, s -> s * 10);
        //Publisher<Integer> sumPub = sumPub(pub);
        //Publisher<Integer> reducePub = reducePub(pub, 0, (BiFunction<Integer,Integer,Integer>)(a, b)->a+b);
        mapPub.subscribe(logSub());
    }
    private static Publisher<Integer> reducePub(Publisher<Integer> pub, int init, BiFunction<Integer, Integer, Integer> bf) {
        return new Publisher<Integer>() {
            @Override
            public void subscribe(Subscriber<? super Integer> sub) {
                pub.subscribe(new DelegateSub(sub) {
                    int result = init;
                    @Override
                    public void onNext(Integer i) {
                        result = bf.apply(result,i);
                    }
                    @Override
                    public void onComplete() {
                        sub.onNext(result);
                        sub.onComplete();
                    }
                });
            }
        };
    }
    private static Publisher<Integer> sumPub(Publisher<Integer> pub) {
        return new Publisher<Integer>() {
            @Override
            public void subscribe(Subscriber<? super Integer> sub) {
                pub.subscribe(new DelegateSub(sub) {
                                int sum = 0 ;
                                  @Override
                                  public void onNext(Integer i) {
                                      sum += i;
                                  }
                                  @Override
                                  public void onComplete() {
                                      sub.onNext(sum);
                                      sub.onComplete();
                                  }
                                }
                );
            }
        };
    }
    private static Publisher<Integer> mapPub(Publisher<Integer> pub, Function<Integer, Integer> f) {
        return new Publisher<Integer>() {
            @Override
            public void subscribe(Subscriber<? super Integer> sub) {
                pub.subscribe(new DelegateSub(sub) {
                    @Override
                    public void onNext(Integer i) {
                        sub.onNext(f.apply(i));
                    }
                });
            }
        };
    }
    private static Subscriber<Integer> logSub() {
        return new Subscriber<Integer>() {
            @Override
            public void onSubscribe(Subscription s) {
                log.debug("onSubscribe");
                s.request(Long.MAX_VALUE);
            }
            @Override
            public void onNext(Integer i) {
                log.debug("onNext:{}", i);
            }
            @Override
            public void onError(Throwable t) {
                log.debug("onError:{}", t);
            }
            @Override
            public void onComplete() {
                log.debug("onComplete");
            }
        };
    }
    private static Publisher<Integer> iterPub(List<Integer> iter) {
        return new Publisher<Integer>() {
            @Override
            public void subscribe(Subscriber<? super Integer> sub) {
                sub.onSubscribe(new Subscription() {
                    @Override
                    public void request(long n) {
                        try {
                            iter.forEach(s -> sub.onNext(s));
                            sub.onComplete();
                        }catch(Throwable t) {
                            sub.onError(t);
                        }
                    }
                    @Override
                    public void cancel() {
                    }
                });
            }
        };
    }
}

      



import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import java.util.List;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/*
Publisher -> [Data1] -> mapPub -> [Data2] -> logSub
                               <- Subscribe(logSub)
                                -> on Subscribe(s)
                                -> onNext
                                -> onNext
                                -> onComplete
1. map (d1 -> f -> d2)
*/
@Slf4j
public class PubSub {
    public static void main(String[] args) {
        Publisher<Integer> pub = iterPub(Stream.iterate(1, a->a+1).limit(10).collect(Collectors.toList()));
        //Publisher<String> mapPub = mapPub(pub, s -> "["+s+"]");
        //Publisher<Integer> sumPub = sumPub(pub);
        Publisher<StringBuilder> reducePub = reducePub(pub, new StringBuilder(), (a,b)->a.append(b+","));
        reducePub.subscribe(logSub());
    }
    private static <T,R> Publisher<R> reducePub(Publisher<T> pub, R init, BiFunction<R, T, R> bf) {
        return new Publisher<R>() {
            @Override
            public void subscribe(Subscriber<? super R> sub) {
                pub.subscribe(new DelegateSub<T, R>(sub) {
                    R result = init;
                    @Override
                    public void onNext(T i) {
                        result = bf.apply(result,i);
                    }
                    @Override
                    public void onComplete() {
                        sub.onNext(result);
                        sub.onComplete();
                    }
                });
            }
        };
    }
//
//    private static Publisher<Integer> sumPub(Publisher<Integer> pub) {
//        return new Publisher<Integer>() {
//            @Override
//            public void subscribe(Subscriber<? super Integer> sub) {
//                pub.subscribe(new DelegateSub(sub) {
//                                int sum = 0 ;
//
//                                  @Override
//                                  public void onNext(Integer i) {
//                                      sum += i;
//                                  }
//
//                                  @Override
//                                  public void onComplete() {
//                                      sub.onNext(sum);
//                                      sub.onComplete();
//                                  }
//                                }
//
//                );
//            }
//        };
//    }
    // T -> R
    private static <T,R> Publisher<R> mapPub(Publisher<T> pub, Function<T, R> f) {
        return new Publisher<R>() {
            @Override
            public void subscribe(Subscriber<? super R> sub) {
                pub.subscribe(new DelegateSub<T,R>(sub) {
                    @Override
                    public void onNext(T i) {
                        sub.onNext(f.apply(i));
                    }
                });
            }
        };
    }
    private static <T> Subscriber<T> logSub() {
        return new Subscriber<T>() {
            @Override
            public void onSubscribe(Subscription s) {
                log.debug("onSubscribe");
                s.request(Long.MAX_VALUE);
            }
            @Override
            public void onNext(T i) {
                log.debug("onNext:{}", i);
            }
            @Override
            public void onError(Throwable t) {
                log.debug("onError:{}", t);
            }
            @Override
            public void onComplete() {
                log.debug("onComplete");
            }
        };
    }
    private static Publisher<Integer> iterPub(List<Integer> iter) {
        return new Publisher<Integer>() {
            @Override
            public void subscribe(Subscriber<? super Integer> sub) {
                sub.onSubscribe(new Subscription() {
                    @Override
                    public void request(long n) {
                        try {
                            iter.forEach(s -> sub.onNext(s));
                            sub.onComplete();
                        }catch(Throwable t) {
                            sub.onError(t);
                        }
                    }
                    @Override
                    public void cancel() {
                    }
                });
            }
        };
    }
}




■ Reactor
import reactor.core.publisher.Flux;
public class ReactorEx {
    public static void main(String[] args) {
        Flux.<Integer>create(e->{
            e.next(1);
            e.next(2);
            e.next(3);
            e.complete();
        })
                .log()
                .map(s->s*10)
                .reduce(0, (a,b)->a+b)
                .log()
                .subscribe(System.out::println);
    }
}


package toby;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@SpringBootApplication
public class Tobytv007Application {
    @RestController
    public static class Controller {
        @RequestMapping("/hello")
        public Publisher<String> hello(String name) {
            return new Publisher<String>() {
                @Override
                public void subscribe(Subscriber<? super String> s) {
                    s.onSubscribe(new Subscription() {
                        @Override
                        public void request(long n) {
                            s.onNext("Hello " + name);
                            s.onComplete();
                        }
                        @Override
                        public void cancel() {
                        }
                    });
                }
            };
        }
    }
    public static void main(String[] args) {
        SpringApplication.run(Tobytv007Application.class, args);
    }
}


+ Recent posts