토비의 봄 TV 7회 스프링 리액티브 프로그래밍 (3) - Reactive Streams - SchedulersIT/Spring Framework2018. 2. 3. 23:34
Table of Contents
(시청일 : 20171105)
■ Reactive Streams - Schedulers
package toby.live;
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Slf4j
public class ScheduleEx {
public static void main(String[] args) {
//pub
Publisher<Integer> pub = sub -> {
sub.onSubscribe(new Subscription() {
@Override
public void request(long n) {
sub.onNext(1);
sub.onNext(2);
sub.onNext(3);
sub.onNext(4);
sub.onNext(5);
sub.onComplete();
}
@Override
public void cancel() {
}
});
};
/* SubscribeOn */
// Publisher<Integer> subOnPub = sub -> {
// ExecutorService es = Executors.newSingleThreadExecutor(new CustomizableThreadFactory(){
// @Override
// public String getThreadNamePrefix() {
// return "pubOn-";
// }
// });
// es.execute(()->pub.subscribe(sub));
// };
/* PublishOn */
Publisher<Integer> pubOnSub = sub -> {
pub.subscribe(new Subscriber<Integer>() {
ExecutorService es = Executors.newSingleThreadExecutor(new CustomizableThreadFactory(){
@Override
public String getThreadNamePrefix() {
return "subOn-";
}
});
@Override
public void onSubscribe(Subscription s) {
sub.onSubscribe(s);
}
@Override
public void onNext(Integer integer) {
sub.onNext(integer);
}
@Override
public void onError(Throwable t) {
es.execute(()->sub.onError(t));
es.shutdown();
}
@Override
public void onComplete() {
es.execute(()->sub.onComplete());
es.shutdown();
}
});
};
//sub
pubOnSub.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
log.debug("onSubscribe");
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Integer integer) {
log.debug("onNext:{}", integer);
}
@Override
public void onError(Throwable t) {
log.debug("onError:{}",t);
}
@Override
public void onComplete() {
log.debug("onComplete");
}
});
System.out.println("exit");
}
}
package toby.live;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
public class FluxScEx {
public static void main(String[] args) {
Flux.range(1,10)
.publishOn(Schedulers.newSingle("pub"))
.log()
//.subscribeOn(Schedulers.newSingle("sub"))
.subscribe(System.out::println);
System.out.println("exit");
}
}
<쓰레드>
- user : user 쓰레드가 하나라도 남아있으면, JVM은 user 쓰레드를 종료시키지 않음
- daemon : user 쓰레드가 하나도 남지않고 daemon 쓰레드만 남아 있을 경우, JVM은 daemon 쓰레드를 모두 강제 종료 시킴. user 쓰레드가 하나라도 남아 있으면, JVM은 daemon 쓰레드를 종료시키지 않음.
■ interval 예제
package toby.live;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import java.time.Duration;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@Slf4j
public class FluxScEx {
public static void main(String[] args) throws InterruptedException {
// Flux.interval(Duration.ofMillis(500))
// .subscribe(s->log.debug("onNext:{}", s));
// log.debug("exit");
// TimeUnit.SECONDS.sleep(5);
// Executors.newSingleThreadExecutor().execute(() -> {
// try {
// TimeUnit.SECONDS.sleep(2);
// }catch (InterruptedException e) {}
// System.out.println("Hello");
// });
// System.out.println("exit");
Flux.interval(Duration.ofMillis(200))
.take(10)
.subscribe(s->log.debug("onNext:{}",s));
TimeUnit.SECONDS.sleep(10);
}
}
package toby.live;
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@Slf4j
public class IntervalEx {
public static void main(String[] args) {
Publisher<Integer> pub = sub -> {
sub.onSubscribe(new Subscription() {
int no = 0;
volatile boolean cancelled = false;
@Override
public void request(long n) {
ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor();
exec.scheduleAtFixedRate(() -> {
if(cancelled) {
exec.shutdown();
return;
}
sub.onNext(no++);
}, 0, 500, TimeUnit.NANOSECONDS);
}
@Override
public void cancel() {
cancelled = true;
}
});
};
Publisher<Integer> takePub = sub -> {
pub.subscribe(new Subscriber<Integer>() {
int count = 0;
Subscription subsc;
@Override
public void onSubscribe(Subscription s) {
subsc = s;
sub.onSubscribe(s);
}
@Override
public void onNext(Integer integer) {
sub.onNext(integer);
if(++count >= 5) {
subsc.cancel();
}
}
@Override
public void onError(Throwable t) {
sub.onError(t);
}
@Override
public void onComplete() {
sub.onComplete();
}
});
};
pub.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
log.debug("onSubscribe");
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Integer integer) {
log.debug("onNext:{}", integer);
}
@Override
public void onError(Throwable t) {
log.debug("onError:{}",t);
}
@Override
public void onComplete() {
log.debug("onComplete");
}
});
}
}
'IT > Spring Framework' 카테고리의 다른 글
토비의 봄 TV 1회 - 재사용성과 다이나믹 디스패치, 더블 디스패치 (0) | 2018.02.03 |
---|---|
토비의 봄 TV 8회 스프링 리액티브 프로그래밍 (4) 자바와 스프링의 비동기 기술 (0) | 2018.02.03 |
토비의 봄 TV 6회 스프링 리액티브 프로그래밍 (2) - Reactive Streams - Operators (0) | 2018.02.03 |
토비의 봄 TV 5회 스프링 리액티브 프로그래밍 (1) - Reactive Streams (0) | 2018.02.03 |
토비의 봄 TV 4회 (2) Generics에서 와일드카드 활용법, 람다와 인터섹션 타입을 이용한 동적인 기능확장법 (0) | 2018.02.03 |
@DEAN :: Dean Story
포스팅이 좋았다면 "좋아요❤️" 또는 "구독👍🏻" 해주세요!