토비의 봄 TV 5회 스프링 리액티브 프로그래밍 (1) - Reactive StreamsIT/Spring Framework2018. 2. 3. 23:31
Table of Contents
(시청일 : 20171029)
- Reactive Streams => http://www.reactive-streams.org
- Reactive Streams Specification => https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.1/README.md#specification
■ Reactive : 외부 이벤트나 데이터 등이 발생할 때, 이에 대응하여 동작하는 프로그래밍 방식
<관련 용어>
- FRP(Functional Reactive Programming)
- RFP(Reactive Functional Programming)
- RX(Reactive Extension)
<다룰 내용>
- Duality
- Observer pattern
- Reactive Streams - 표준 - Java9 API
■ for-each문에다 Iterable을 쓸 수 있다. => collection이 아니어도 순회할 수 있는 object는 for-each문에 쓸 수 있다.
import java.util.Arrays;
public class Ob {
public static void main(String[] args) {
Iterable<Integer> iter = Arrays.asList(1,2,3,4,5);
for(Integer i : iter) { // for-each
System.out.println(i);
}
}
}
■ Iterable과 Observable의 쌍대성(duality) : 서로 기능은 같지만 서로 반대 방향으로 표현.
- DATA method(void) <---> void method(DATA)
import javafx.collections.ObservableArrayBase;
import java.util.Iterator;
import java.util.Observable;
import java.util.Observer;
public class Ob {
// Iterable과 Observable의 쌍대성(duality) : 서로 기능은 같지만 서로 반대 방향으로 표현
// Iterable은 Pull 방식. Observable은 Push 방식
// Iterable은 Pull 방식. Observable은 Push 방식.
// Observable은 Event/Data를 Observer에 전달하는 Source
/*
public static void main(String[] args) {
//Iterable<Integer> iter = new Iterable<Integer>() {
// @Override
// public Iterator<Integer> iterator() {
// return null;
// }
//};
Iterable<Integer> iter = () ->
new Iterator<Integer>() {
int i = 0;
final static int MAX = 10;
public boolean hasNext() {
return i < MAX;
}
public Integer next() {
return ++i;
}
};
for(Integer i : iter) { // for-each
System.out.println(i);
}
for(Iterator<Integer> it = iter.iterator(); it.hasNext();) {
}
System.out.println(it.next()); // pull 방식
}
*/
static class IntObservable extends Observable implements Runnable {
@Override
public void run() {
for(int i=1; i<=10; i++){
setChanged();
notifyObservers(i); // push 방식 <---> int i = it.next(); // pull 방식
}
}
}
@SuppressWarnings("deprecation")
public static void main(String[] args) {
Observer ob = new Observer() {
@Override
public void update(Observable o, Object arg) {
System.out.println(arg);
}
};
IntObservable io = new IntObservable();
io.addObserver(ob);
io.run();
}
}
■ push 방식의 observer pattern
package toby.live;
import javafx.collections.ObservableArrayBase;
import java.util.Iterator;
import java.util.Observable;
import java.util.Observer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Ob {
static class IntObservable extends Observable implements Runnable {
@Override
public void run() {
for(int i=1; i<=10; i++){
setChanged();
notifyObservers(i); // push 방식 <---> int i = it.next(); // pull 방식
}
}
}
@SuppressWarnings("deprecation")
public static void main(String[] args) {
Observer ob = new Observer() {
@Override
public void update(Observable o, Object arg) {
System.out.println(Thread.currentThread().getName() + " " + arg);
}
};
IntObservable io = new IntObservable();
io.addObserver(ob);
ExecutorService es = Executors.newSingleThreadExecutor();
es.execute(io);
System.out.println(Thread.currentThread().getName() + " EXIT");
es.shutdown();
}
}
// 결과
/*
main EXIT
pool-1-thread-1 1
pool-1-thread-1 2
pool-1-thread-1 3
pool-1-thread-1 4
pool-1-thread-1 5
pool-1-thread-1 6
pool-1-thread-1 7
pool-1-thread-1 8
pool-1-thread-1 9
pool-1-thread-1 10
*/
■Complete, Error 개념이 추가된 Reactive observer pattern
- A Publisher is a provider of a potentially unbounded number of sequenced elements, publishing them according to the demand received from its Subscriber(s).
import java.util.Arrays;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.TimeUnit;
public class PubSub {
// Publisher <- Observable
// Subscriber <- Observer
public static void main(String[] args) throws InterruptedException {
Iterable<Integer> itr = Arrays.asList(1,2,3,4,5);
ExecutorService es = Executors.newCachedThreadPool();
Publisher p = new Publisher() {
@Override
public void subscribe(Subscriber subscriber) {
Iterator<Integer> it = itr.iterator();
subscriber.onSubscribe(new Subscription() {
@Override
public void request(long n) {
es.execute(()->{
int i = 0;
try {
while (i++ < n) {
if (it.hasNext()) {
subscriber.onNext(it.next());
} else {
subscriber.onComplete();
break;
}
}
}catch(RuntimeException e) {
subscriber.onError(e);
}
});
}
@Override
public void cancel() {
}
});
}
};
Subscriber<Integer> s = new Subscriber<Integer>() {
Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
System.out.println(Thread.currentThread().getName() + " onSubscribe");
this.subscription = subscription;
this.subscription.request(1);
}
@Override
public void onNext(Integer item) {
System.out.println(Thread.currentThread().getName() + " on Next " + item);
this.subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
System.out.println("onError : " + throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
};
p.subscribe(s);
es.awaitTermination(10, TimeUnit.HOURS);
es.shutdown();
}
}
//결과
/*
main onSubscribe
pool-1-thread-1 on Next 1
pool-1-thread-2 on Next 2
pool-1-thread-3 on Next 3
pool-1-thread-2 on Next 4
pool-1-thread-3 on Next 5
onComplete
*/
- GRPC => https://grpc.io
'IT > Spring Framework' 카테고리의 다른 글
토비의 봄 TV 7회 스프링 리액티브 프로그래밍 (3) - Reactive Streams - Schedulers (0) | 2018.02.03 |
---|---|
토비의 봄 TV 6회 스프링 리액티브 프로그래밍 (2) - Reactive Streams - Operators (0) | 2018.02.03 |
토비의 봄 TV 4회 (2) Generics에서 와일드카드 활용법, 람다와 인터섹션 타입을 이용한 동적인 기능확장법 (0) | 2018.02.03 |
토비의 봄 TV 4회 (1) 자바 Generics (0) | 2018.02.03 |
토비의 봄 TV 2.5회 - 수퍼 타입 토큰(2), 스프링 ResolvableType (0) | 2018.02.03 |
@DEAN :: Dean Story
포스팅이 좋았다면 "좋아요❤️" 또는 "구독👍🏻" 해주세요!