토비의 봄 TV 6회 스프링 리액티브 프로그래밍 (2) - Reactive Streams - OperatorsIT/Spring Framework2018. 2. 3. 23:32
Table of Contents
(시청일 : 20171105)
■ Reactive Streams - Operators
<Operator의 역할>
- 데이터 제어(변환/조작 등)
- 스케쥴링
- 퍼블리싱 제어 ex) take
Publisher -> [Data1] -> op1 -> [Data2] -> op2 -> [Data3] -> Subscriber
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import java.util.List;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/*
Publisher -> [Data1] -> mapPub -> [Data2] -> logSub
<- Subscribe(logSub)
-> on Subscribe(s)
-> onNext
-> onNext
-> onComplete
1. map (d1 -> f -> d2)
*/
@Slf4j
public class PubSub {
public static void main(String[] args) {
Publisher<Integer> pub = iterPub(Stream.iterate(1, a->a+1).limit(10).collect(Collectors.toList()));
Publisher<Integer> mapPub = mapPub(pub, s -> s * 10);
//Publisher<Integer> sumPub = sumPub(pub);
//Publisher<Integer> reducePub = reducePub(pub, 0, (BiFunction<Integer,Integer,Integer>)(a, b)->a+b);
mapPub.subscribe(logSub());
}
private static Publisher<Integer> reducePub(Publisher<Integer> pub, int init, BiFunction<Integer, Integer, Integer> bf) {
return new Publisher<Integer>() {
@Override
public void subscribe(Subscriber<? super Integer> sub) {
pub.subscribe(new DelegateSub(sub) {
int result = init;
@Override
public void onNext(Integer i) {
result = bf.apply(result,i);
}
@Override
public void onComplete() {
sub.onNext(result);
sub.onComplete();
}
});
}
};
}
private static Publisher<Integer> sumPub(Publisher<Integer> pub) {
return new Publisher<Integer>() {
@Override
public void subscribe(Subscriber<? super Integer> sub) {
pub.subscribe(new DelegateSub(sub) {
int sum = 0 ;
@Override
public void onNext(Integer i) {
sum += i;
}
@Override
public void onComplete() {
sub.onNext(sum);
sub.onComplete();
}
}
);
}
};
}
private static Publisher<Integer> mapPub(Publisher<Integer> pub, Function<Integer, Integer> f) {
return new Publisher<Integer>() {
@Override
public void subscribe(Subscriber<? super Integer> sub) {
pub.subscribe(new DelegateSub(sub) {
@Override
public void onNext(Integer i) {
sub.onNext(f.apply(i));
}
});
}
};
}
private static Subscriber<Integer> logSub() {
return new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
log.debug("onSubscribe");
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Integer i) {
log.debug("onNext:{}", i);
}
@Override
public void onError(Throwable t) {
log.debug("onError:{}", t);
}
@Override
public void onComplete() {
log.debug("onComplete");
}
};
}
private static Publisher<Integer> iterPub(List<Integer> iter) {
return new Publisher<Integer>() {
@Override
public void subscribe(Subscriber<? super Integer> sub) {
sub.onSubscribe(new Subscription() {
@Override
public void request(long n) {
try {
iter.forEach(s -> sub.onNext(s));
sub.onComplete();
}catch(Throwable t) {
sub.onError(t);
}
}
@Override
public void cancel() {
}
});
}
};
}
}
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import java.util.List;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/*
Publisher -> [Data1] -> mapPub -> [Data2] -> logSub
<- Subscribe(logSub)
-> on Subscribe(s)
-> onNext
-> onNext
-> onComplete
1. map (d1 -> f -> d2)
*/
@Slf4j
public class PubSub {
public static void main(String[] args) {
Publisher<Integer> pub = iterPub(Stream.iterate(1, a->a+1).limit(10).collect(Collectors.toList()));
//Publisher<String> mapPub = mapPub(pub, s -> "["+s+"]");
//Publisher<Integer> sumPub = sumPub(pub);
Publisher<StringBuilder> reducePub = reducePub(pub, new StringBuilder(), (a,b)->a.append(b+","));
reducePub.subscribe(logSub());
}
private static <T,R> Publisher<R> reducePub(Publisher<T> pub, R init, BiFunction<R, T, R> bf) {
return new Publisher<R>() {
@Override
public void subscribe(Subscriber<? super R> sub) {
pub.subscribe(new DelegateSub<T, R>(sub) {
R result = init;
@Override
public void onNext(T i) {
result = bf.apply(result,i);
}
@Override
public void onComplete() {
sub.onNext(result);
sub.onComplete();
}
});
}
};
}
//
// private static Publisher<Integer> sumPub(Publisher<Integer> pub) {
// return new Publisher<Integer>() {
// @Override
// public void subscribe(Subscriber<? super Integer> sub) {
// pub.subscribe(new DelegateSub(sub) {
// int sum = 0 ;
//
// @Override
// public void onNext(Integer i) {
// sum += i;
// }
//
// @Override
// public void onComplete() {
// sub.onNext(sum);
// sub.onComplete();
// }
// }
//
// );
// }
// };
// }
// T -> R
private static <T,R> Publisher<R> mapPub(Publisher<T> pub, Function<T, R> f) {
return new Publisher<R>() {
@Override
public void subscribe(Subscriber<? super R> sub) {
pub.subscribe(new DelegateSub<T,R>(sub) {
@Override
public void onNext(T i) {
sub.onNext(f.apply(i));
}
});
}
};
}
private static <T> Subscriber<T> logSub() {
return new Subscriber<T>() {
@Override
public void onSubscribe(Subscription s) {
log.debug("onSubscribe");
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(T i) {
log.debug("onNext:{}", i);
}
@Override
public void onError(Throwable t) {
log.debug("onError:{}", t);
}
@Override
public void onComplete() {
log.debug("onComplete");
}
};
}
private static Publisher<Integer> iterPub(List<Integer> iter) {
return new Publisher<Integer>() {
@Override
public void subscribe(Subscriber<? super Integer> sub) {
sub.onSubscribe(new Subscription() {
@Override
public void request(long n) {
try {
iter.forEach(s -> sub.onNext(s));
sub.onComplete();
}catch(Throwable t) {
sub.onError(t);
}
}
@Override
public void cancel() {
}
});
}
};
}
}
■ Reactor
import reactor.core.publisher.Flux;
public class ReactorEx {
public static void main(String[] args) {
Flux.<Integer>create(e->{
e.next(1);
e.next(2);
e.next(3);
e.complete();
})
.log()
.map(s->s*10)
.reduce(0, (a,b)->a+b)
.log()
.subscribe(System.out::println);
}
}
package toby;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@SpringBootApplication
public class Tobytv007Application {
@RestController
public static class Controller {
@RequestMapping("/hello")
public Publisher<String> hello(String name) {
return new Publisher<String>() {
@Override
public void subscribe(Subscriber<? super String> s) {
s.onSubscribe(new Subscription() {
@Override
public void request(long n) {
s.onNext("Hello " + name);
s.onComplete();
}
@Override
public void cancel() {
}
});
}
};
}
}
public static void main(String[] args) {
SpringApplication.run(Tobytv007Application.class, args);
}
}
'IT > Spring Framework' 카테고리의 다른 글
토비의 봄 TV 8회 스프링 리액티브 프로그래밍 (4) 자바와 스프링의 비동기 기술 (0) | 2018.02.03 |
---|---|
토비의 봄 TV 7회 스프링 리액티브 프로그래밍 (3) - Reactive Streams - Schedulers (0) | 2018.02.03 |
토비의 봄 TV 5회 스프링 리액티브 프로그래밍 (1) - Reactive Streams (0) | 2018.02.03 |
토비의 봄 TV 4회 (2) Generics에서 와일드카드 활용법, 람다와 인터섹션 타입을 이용한 동적인 기능확장법 (0) | 2018.02.03 |
토비의 봄 TV 4회 (1) 자바 Generics (0) | 2018.02.03 |
@DEAN :: Dean Story
포스팅이 좋았다면 "좋아요❤️" 또는 "구독👍🏻" 해주세요!