📌 subscribe() 를 사용하여 비동기적 방식으로 처리하기
앞서 봤던 block() 메소드는 Mono 또는 Flux 스트림을 동기적으로 블로킹하고 결과를 기다리는 데 사용됩니다. 리액티브 프로그래밍의 핵심 아이디어는 비동기적인 방식으로 작업을 수행하고 결과를 기다리지 않고 다른 작업을 수행하는 동안 리소스를 효율적으로 활용하는 것입니다.
✏️ block() 을 사용하지 않고 대신 subscribe() 를 사용하여 비동기적인 방식으로 접근해보도록 하겠습니다.
System.out.println("#1");
Mono<String> mono =
webClient.mutate()
.build().method(HttpMethod.GET)
.uri("http://ip.jsontest.com")
.headers(httpHeaders -> { httpHeaders.set("Content-Type", "application/json");})
.retrieve()
.bodyToMono(String.class);
mono.subscribe(response -> {
System.out.println("#2 print response : " + response);
}, error -> {
System.err.println("오류 발생:");
error.printStackTrace();
});
System.out.println("#3");
try {
Thread.sleep(3000); // 3초 대기
} catch (InterruptedException e) {
e.printStackTrace();
}
🔍 해당 소스 코드에 설명해 드리겠습니다.
- mono.subscribe(...): Mono를 구독합니다. 이 부분은 비동기적으로 요청 결과를 처리하는 부분입니다.
- subscribe() 메소드는 응답 데이터 또는 오류 처리를 위한 콜백 함수를 등록합니다.
- response -> {...}: 성공적인 응답을 처리하는 콜백 함수입니다. 이 경우 응답 데이터를 출력합니다.
- error -> {...}: 요청 또는 응답 중에 발생한 오류를 처리하는 콜백 함수입니다. 오류가 발생하면 오류 메시지와 스택 트레이스를 출력합니다.
- Thread.sleep(3000): 현재 스레드를 3초 동안 대기시킵니다. 이 부분은 비동기 코드가 완료될 때까지 대기하기 위한 것입니다. 이 부분이 없으면 메인 스레드가 종료되면서 프로그램이 종료될 수 있습니다.
🔍 실행 결과 확인
첫번째로 #1이 출력이 되고, WebClient는 API를 호출합니다. 비동기 방식이기 때문에 응답을 기다리지 않고, 바로 다음 라인으로 넘어갑니다. #3가 출력되고 나서 응답이 발생하면 subscribe 를 통해 #2를 출력합니다.
#1
#3
#2 print response : {"ip": "000.000.000.00"}
📌 (추가) JSON을 통해 String이 아닌 Vo로 데이터를 가져오려면?
위에 코드에서는 .bodyToMono(String.class) 를 통해 응답값을 String 객체에 맵핑했습니다. 만약에 Vo에 맵핑하라면 어떻게 하는지 보여드리겠습니다.
✏️ IpVo 라는 Class를 생성합니다. JSON의 Key값과 맵핑하기 위해 Ip라는 변수를 생성하고 Setter와 Getter를 만들었습니다.
class IpVo {
IpVo() {}
String ip = "";
public void setIp(String ip) {
this.ip = ip;
}
public String getIp() {
return this.ip;
}
}
✏️ .bodyToMono(String.class) 부분을 .bodyToMono(IpVo.class) 와 Mono 의 제네릭타입을 변경합니다.
System.out.println("#1");
Mono<IpVo> mono =
webClient.mutate()
.build().method(HttpMethod.GET)
.uri("http://ip.jsontest.com")
.headers(httpHeaders -> { httpHeaders.set("Content-Type", "application/json");})
.retrieve()
.bodyToMono(IpVo.class);
mono.subscribe(response -> {
System.out.println("#2 print response : " + response.getIp());
}, error -> {
System.err.println("오류 발생:");
error.printStackTrace();
});
System.out.println("#3");
try {
Thread.sleep(3000); // 3초 대기
} catch (InterruptedException e) {
e.printStackTrace();
}
🔍 실행 결과 확인
subscribe() 에 보면 getIp()를 통해 데이터를 출력했습니다. Mono와 Flux는 원하는 결과물에 따라 Vo를 생성하고 데이터에 접근할 수 있습니다.
#1
#3
#2 print response : 000.000.000.00
📌 1~3편까지의 전체 소스 코드입니다.
subscribe()를 사용하여 비동기적으로 결과를 처리하므로 다른 작업을 수행하면서도 응답을 기다릴 수 있습니다. 이러한 접근 방식은 리액티브 프로그래밍의 특징을 활용하여 비동기적인 작업을 보다 효율적으로 처리하는 데 도움이 됩니다. 전체 코드를 보시고 돌려보시면서 이해의 폭을 넓혀가시기 바랍니다.
import java.time.Duration;
import javax.net.ssl.SSLException;
import org.springframework.http.HttpMethod;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.web.reactive.function.client.WebClient;
import io.netty.channel.ChannelOption;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.handler.timeout.WriteTimeoutHandler;
import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient;
import reactor.netty.resources.ConnectionProvider;
public class ApiserviceApplication {
public static void main(String[] args) throws SSLException {
WebClient webClient = getWebClient();
String res =
webClient.mutate()
.build().method(HttpMethod.GET)
.uri("http://ip.jsontest.com")
.headers(httpHeaders -> { httpHeaders.set("Content-Type", "application/json");})
.retrieve()
.bodyToMono(String.class)
.block();
System.out.println(res);
IpVo ipVo = webClient.mutate()
.build().method(HttpMethod.GET)
.uri("http://ip.jsontest.com")
.headers(httpHeaders -> { httpHeaders.set("Content-Type", "application/json");})
.retrieve()
.bodyToMono(IpVo.class)
.block();
System.out.println(ipVo.getIp());
System.out.println("#1");
Mono<IpVo> mono =
webClient.mutate()
.build().method(HttpMethod.GET)
.uri("http://ip.jsontest.com")
.headers(httpHeaders -> { httpHeaders.set("Content-Type", "application/json");})
.retrieve()
.bodyToMono(IpVo.class);
mono.subscribe(response -> {
System.out.println("#2 print response : " + response.getIp());
}, error -> {
System.err.println("오류 발생:");
error.printStackTrace();
});
System.out.println("#3");
try {
Thread.sleep(3000); // 3초 대기
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private static WebClient getWebClient() throws SSLException {
ConnectionProvider provider = ConnectionProvider.builder("custom-provider")
.maxConnections(20)
.maxIdleTime(Duration.ofSeconds(58))
.maxLifeTime(Duration.ofSeconds(58))
.pendingAcquireTimeout(Duration.ofSeconds(60))
.pendingAcquireMaxCount(-1)
.evictInBackground(Duration.ofSeconds(30))
.lifo()
.build();
SslContext sslContext = SslContextBuilder
.forClient()
.trustManager(InsecureTrustManagerFactory.INSTANCE)
.build();
WebClient client = WebClient.builder()
.clientConnector(
new ReactorClientHttpConnector(
HttpClient.create(provider)
.secure(t -> t.sslContext(sslContext))
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 20000)
.doOnConnected(
conn -> conn.addHandlerLast(new ReadTimeoutHandler(30)) //sec
.addHandlerLast(new WriteTimeoutHandler(60)) //sec
)
.responseTimeout(Duration.ofSeconds(60))
))
.build();
return client;
}
}
class IpVo {
IpVo() {}
String ip = "";
public void setIp(String ip) {
this.ip = ip;
}
public String getIp() {
return this.ip;
}
}
WebClient는 범용적으로 사용할 수 있도록 만들어야 합니다. 스프링@Bean을 사용해서 객체를 재사용할 수 있도록 추가적인 작업이 필요합니다. @Bean 으로 만드는 작업은 다음편에 이어서 진행하도록 하겠습니다.
'Programing > Java & Spring' 카테고리의 다른 글
WebFlux, WebClient 사용하여 API 호출 - 2편 (0) | 2023.09.26 |
---|---|
WebFlux, WebClient 사용하여 API 호출 - 1편 (0) | 2023.09.26 |
함수형 인터페이스(Funcational Interface) (0) | 2023.09.22 |
스프링(Spring) 에서 RequestURI, RequestURL 차이 (0) | 2023.09.08 |
Java Default Method (디펄트 메소드) (0) | 2023.09.08 |
스프링(Spring)에서 RestTemplate, Https 통신 (0) | 2023.09.08 |
AOP(Aspect-Oriented Programming) 개념 및 예시 (0) | 2023.09.08 |
[JAVA] content-disposition (0) | 2013.12.31 |
[SPRING3.0] 메세지 처리 (0) | 2013.09.12 |
[SPRING3.0] 페이스북 공유하기① (0) | 2013.09.11 |