IT/Spring Framework

토비의 봄 TV 5회 스프링 리액티브 프로그래밍 (1) - Reactive Streams

DEAN 2018. 2. 3. 23:31
(시청일 : 20171029)

- Reactive Streams => http://www.reactive-streams.org


■ Reactive : 외부 이벤트나 데이터 등이 발생할 때, 이에 대응하여 동작하는 프로그래밍 방식

<관련 용어>
- FRP(Functional Reactive Programming)
- RFP(Reactive Functional Programming)
- RX(Reactive Extension)


<다룰 내용>
- Duality
- Observer pattern
- Reactive Streams - 표준 - Java9 API


■ for-each문에다 Iterable을 쓸 수 있다. => collection이 아니어도 순회할 수 있는 object는 for-each문에 쓸 수 있다.
import java.util.Arrays;

public class Ob {
    public static void main(String[] args) {
        Iterable<Integer> iter = Arrays.asList(1,2,3,4,5);
        for(Integer i : iter) {  // for-each
            System.out.println(i);
        }
    }
}




■ Iterable과 Observable의 쌍대성(duality) : 서로 기능은 같지만 서로 반대 방향으로 표현. 
- DATA method(void) <---> void method(DATA)


import javafx.collections.ObservableArrayBase;
import java.util.Iterator;
import java.util.Observable;
import java.util.Observer;
public class Ob {
    // Iterable과 Observable의 쌍대성(duality) : 서로 기능은 같지만 서로 반대 방향으로 표현
    // Iterable은 Pull 방식. Observable은 Push 방식
    // Iterable은 Pull 방식. Observable은 Push 방식.
    // Observable은 Event/Data를 Observer에 전달하는 Source
    /*
    public static void main(String[] args) {
        //Iterable<Integer> iter = new Iterable<Integer>() {
        //    @Override
        //    public Iterator<Integer> iterator() {
        //        return null;
        //   }
        //};
        Iterable<Integer> iter = () ->
                new Iterator<Integer>() {
                    int i = 0;
                    final static int MAX = 10;
                    public boolean hasNext() {
                        return i < MAX;
                    }
                    public Integer next() {
                        return ++i;
                    }
                };
        for(Integer i : iter) {  // for-each
            System.out.println(i);
        }
        for(Iterator<Integer> it = iter.iterator(); it.hasNext();) {
        }
        System.out.println(it.next()); // pull 방식
    }
    */
    static class IntObservable extends Observable implements Runnable {
        @Override
        public void run() {
            for(int i=1; i<=10; i++){
                setChanged();
                notifyObservers(i); // push 방식   <---> int i = it.next();  // pull 방식
            }
        }
    }
    @SuppressWarnings("deprecation")
    public static void main(String[] args) {
        Observer ob = new Observer() {
            @Override
            public void update(Observable o, Object arg) {
                System.out.println(arg);
            }
        };
        IntObservable io = new IntObservable();
        io.addObserver(ob);
        io.run();
    }
}






■ push 방식의 observer pattern
package toby.live;
import javafx.collections.ObservableArrayBase;
import java.util.Iterator;
import java.util.Observable;
import java.util.Observer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Ob {
    static class IntObservable extends Observable implements Runnable {
        @Override
        public void run() {
            for(int i=1; i<=10; i++){
                setChanged();
                notifyObservers(i); // push 방식   <---> int i = it.next();  // pull 방식
            }
        }
    }
    @SuppressWarnings("deprecation")
    public static void main(String[] args) {
        Observer ob = new Observer() {
            @Override
            public void update(Observable o, Object arg) {
                System.out.println(Thread.currentThread().getName() + " " + arg);
            }
        };
        IntObservable io = new IntObservable();
        io.addObserver(ob);
        ExecutorService es = Executors.newSingleThreadExecutor();
        es.execute(io);
        System.out.println(Thread.currentThread().getName() + " EXIT");
        es.shutdown();
    }
}

// 결과
/*
main EXIT
pool-1-thread-1 1
pool-1-thread-1 2
pool-1-thread-1 3
pool-1-thread-1 4
pool-1-thread-1 5
pool-1-thread-1 6
pool-1-thread-1 7
pool-1-thread-1 8
pool-1-thread-1 9
pool-1-thread-1 10
*/





■Complete, Error 개념이 추가된 Reactive observer pattern
A Publisher is a provider of a potentially unbounded number of sequenced elements, publishing them according to the demand received from its Subscriber(s).




import java.util.Arrays;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.TimeUnit;
public class PubSub {
    // Publisher <- Observable
    // Subscriber <- Observer
    public static void main(String[] args) throws InterruptedException {
        Iterable<Integer> itr = Arrays.asList(1,2,3,4,5);
        ExecutorService es = Executors.newCachedThreadPool();
        Publisher p = new Publisher() {
            @Override
            public void subscribe(Subscriber subscriber) {
                    Iterator<Integer> it = itr.iterator();
                    subscriber.onSubscribe(new Subscription() {
                        @Override
                        public void request(long n) {
                            es.execute(()->{
                                int i = 0;
                                try {
                                    while (i++ < n) {
                                        if (it.hasNext()) {
                                            subscriber.onNext(it.next());
                                        } else {
                                            subscriber.onComplete();
                                            break;
                                        }
                                    }
                                }catch(RuntimeException e) {
                                    subscriber.onError(e);
                                }
                            });
                        }
                        @Override
                        public void cancel() {
                        }
                    });
            }
        };
        Subscriber<Integer> s = new Subscriber<Integer>() {
            Subscription subscription;
            @Override
            public void onSubscribe(Subscription subscription) {
                 System.out.println(Thread.currentThread().getName() + " onSubscribe");
                 this.subscription = subscription;
                 this.subscription.request(1);
            }
            @Override
            public void onNext(Integer item) {
                System.out.println(Thread.currentThread().getName() + " on Next " +  item);
                this.subscription.request(1);
            }
            @Override
            public void onError(Throwable throwable) {
                System.out.println("onError : " + throwable.getMessage());
            }
            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        };
        p.subscribe(s);
        es.awaitTermination(10, TimeUnit.HOURS);
        es.shutdown();
    }
}
//결과
/*
main onSubscribe
pool-1-thread-1 on Next 1
pool-1-thread-2 on Next 2
pool-1-thread-3 on Next 3
pool-1-thread-2 on Next 4
pool-1-thread-3 on Next 5
onComplete
*/


- GRPC => https://grpc.io