본문 바로가기
Programing/Java & Spring

WebFlux, WebClient 사용하여 API 호출 - 3편

by 슈퍼와이비 2023. 10. 5.
반응형

📌 subscribe() 를 사용하여 비동기적 방식으로 처리하기

앞서 봤던 block() 메소드는 Mono 또는 Flux 스트림을 동기적으로 블로킹하고 결과를 기다리는 데 사용됩니다. 리액티브 프로그래밍의 핵심 아이디어는 비동기적인 방식으로 작업을 수행하고 결과를 기다리지 않고 다른 작업을 수행하는 동안 리소스를 효율적으로 활용하는 것입니다.

✏️ block() 을 사용하지 않고 대신 subscribe() 를 사용하여 비동기적인 방식으로 접근해보도록 하겠습니다.

System.out.println("#1");
Mono<String> mono = 
        webClient.mutate()
            .build().method(HttpMethod.GET)
            .uri("http://ip.jsontest.com")
            .headers(httpHeaders -> { httpHeaders.set("Content-Type", "application/json");})
            .retrieve()
            .bodyToMono(String.class);

mono.subscribe(response -> {
	System.out.println("#2 print response : " + response);
}, error -> {
	System.err.println("오류 발생:");
	error.printStackTrace();
});

System.out.println("#3");

try {
    Thread.sleep(3000); // 3초 대기
} catch (InterruptedException e) {
    e.printStackTrace();
}

 

🔍 해당 소스 코드에 설명해 드리겠습니다.

  1. mono.subscribe(...): Mono를 구독합니다. 이 부분은 비동기적으로 요청 결과를 처리하는 부분입니다.
  2. subscribe() 메소드는 응답 데이터 또는 오류 처리를 위한 콜백 함수를 등록합니다.
  3. response -> {...}: 성공적인 응답을 처리하는 콜백 함수입니다. 이 경우 응답 데이터를 출력합니다.
  4. error -> {...}: 요청 또는 응답 중에 발생한 오류를 처리하는 콜백 함수입니다. 오류가 발생하면 오류 메시지와 스택 트레이스를 출력합니다.
  5. Thread.sleep(3000): 현재 스레드를 3초 동안 대기시킵니다. 이 부분은 비동기 코드가 완료될 때까지 대기하기 위한 것입니다. 이 부분이 없으면 메인 스레드가 종료되면서 프로그램이 종료될 수 있습니다.

 

🔍 실행 결과 확인

첫번째로 #1이 출력이 되고, WebClient는  API를 호출합니다. 비동기 방식이기 때문에 응답을 기다리지 않고, 바로 다음 라인으로 넘어갑니다. #3가 출력되고 나서 응답이 발생하면 subscribe 를 통해 #2를 출력합니다.

#1
#3
#2 print response : {"ip": "000.000.000.00"}

 

📌 (추가) JSON을 통해 String이 아닌 Vo로 데이터를 가져오려면?

위에 코드에서는 .bodyToMono(String.class) 를 통해 응답값을 String 객체에 맵핑했습니다. 만약에 Vo에 맵핑하라면 어떻게 하는지 보여드리겠습니다.

✏️ IpVo 라는 Class를 생성합니다. JSON의 Key값과 맵핑하기 위해 Ip라는 변수를 생성하고 Setter와 Getter를 만들었습니다. 

class IpVo {

	IpVo() {}
	
	String ip = "";

	public void setIp(String ip) {
		this.ip = ip;
	}

	public String getIp() {
		return this.ip;
	}
}

 

✏️ .bodyToMono(String.class) 부분을 .bodyToMono(IpVo.class) 와 Mono 의 제네릭타입을 변경합니다. 

System.out.println("#1");
Mono<IpVo> mono = 
        webClient.mutate()
            .build().method(HttpMethod.GET)
            .uri("http://ip.jsontest.com")
            .headers(httpHeaders -> { httpHeaders.set("Content-Type", "application/json");})
            .retrieve()
            .bodyToMono(IpVo.class);

mono.subscribe(response -> {
    System.out.println("#2 print response : " + response.getIp());
}, error -> {
    System.err.println("오류 발생:");
    error.printStackTrace();
});

System.out.println("#3");

try {
    Thread.sleep(3000); // 3초 대기
} catch (InterruptedException e) {
    e.printStackTrace();
}

 

🔍 실행 결과 확인

subscribe() 에 보면 getIp()를 통해 데이터를 출력했습니다. Mono와 Flux는 원하는 결과물에 따라 Vo를 생성하고 데이터에 접근할 수 있습니다. 

#1
#3
#2 print response : 000.000.000.00

 

📌 1~3편까지의 전체 소스 코드입니다.

subscribe()를 사용하여 비동기적으로 결과를 처리하므로 다른 작업을 수행하면서도 응답을 기다릴 수 있습니다. 이러한 접근 방식은 리액티브 프로그래밍의 특징을 활용하여 비동기적인 작업을 보다 효율적으로 처리하는 데 도움이 됩니다. 전체 코드를 보시고 돌려보시면서 이해의 폭을 넓혀가시기 바랍니다.  

import java.time.Duration;

import javax.net.ssl.SSLException;

import org.springframework.http.HttpMethod;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.web.reactive.function.client.WebClient;

import io.netty.channel.ChannelOption;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.handler.timeout.WriteTimeoutHandler;
import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient;
import reactor.netty.resources.ConnectionProvider;

public class ApiserviceApplication {

	public static void main(String[] args) throws SSLException {
		
		WebClient webClient = getWebClient();
		
		String res = 
				webClient.mutate()
					.build().method(HttpMethod.GET)
					.uri("http://ip.jsontest.com")
					.headers(httpHeaders -> { httpHeaders.set("Content-Type", "application/json");})
					.retrieve()
					.bodyToMono(String.class)
					.block();
		
		System.out.println(res);
		
		IpVo ipVo = webClient.mutate()
				.build().method(HttpMethod.GET)
				.uri("http://ip.jsontest.com")
				.headers(httpHeaders -> { httpHeaders.set("Content-Type", "application/json");})
				.retrieve()
				.bodyToMono(IpVo.class)
				.block();
		
		System.out.println(ipVo.getIp());
		
		System.out.println("#1");
		Mono<IpVo> mono = 
				webClient.mutate()
					.build().method(HttpMethod.GET)
					.uri("http://ip.jsontest.com")
					.headers(httpHeaders -> { httpHeaders.set("Content-Type", "application/json");})
					.retrieve()
					.bodyToMono(IpVo.class);
		
		mono.subscribe(response -> {
			System.out.println("#2 print response : " + response.getIp());
		}, error -> {
			System.err.println("오류 발생:");
			error.printStackTrace();
		});
		
		System.out.println("#3");
		
		try {
			Thread.sleep(3000); // 3초 대기
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		
	}

	private static WebClient getWebClient() throws SSLException {
		ConnectionProvider provider = ConnectionProvider.builder("custom-provider")
				.maxConnections(20)
				.maxIdleTime(Duration.ofSeconds(58))
				.maxLifeTime(Duration.ofSeconds(58))
				.pendingAcquireTimeout(Duration.ofSeconds(60))
				.pendingAcquireMaxCount(-1)
				.evictInBackground(Duration.ofSeconds(30))
				.lifo()
				.build();
		
		SslContext sslContext = SslContextBuilder
				.forClient()
				.trustManager(InsecureTrustManagerFactory.INSTANCE)
				.build();
		
		WebClient client = WebClient.builder()
				.clientConnector(
					new ReactorClientHttpConnector(
							HttpClient.create(provider)
							.secure(t -> t.sslContext(sslContext))
							.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 20000)
							.doOnConnected(
									conn -> conn.addHandlerLast(new ReadTimeoutHandler(30))  //sec
									.addHandlerLast(new WriteTimeoutHandler(60)) //sec
									)
							.responseTimeout(Duration.ofSeconds(60))
							))
				.build();
		return client;
	}
	
}

class IpVo {

	IpVo() {}
	
	String ip = "";

	public void setIp(String ip) {
		this.ip = ip;
	}

	public String getIp() {
		return this.ip;
	}
}

 

WebClient는 범용적으로 사용할 수 있도록 만들어야 합니다. 스프링@Bean을 사용해서 객체를 재사용할 수 있도록 추가적인 작업이 필요합니다. @Bean 으로 만드는 작업은 다음편에 이어서 진행하도록 하겠습니다.

반응형