이전에 배웠던 Kafka를 이용해 각각의 서비스의 데이터 변동에 의한 다른 서비스의 데이터베이스가 같이 변경되는 것을 확인해보자
🧶 Order ▶ Catalog
- orderservice에서 요청된 주문의 수량 정보를 Catalog service에 반영이 된다.
- orderservice에서 kafka topic으로 메세지를 전송하고 catalogservice에서 kafka topic으로 전송 된 메세지를 받는다
- 즉 order service 는 producer, catalog service는 consumer 가 되는 것이다.
- orderservice 가 자신의 테이블에 DB를 변동시키면 kafka에 topic을 통해 변동된 내용을 전달한다.
- 그럼 해당 topic을 구독하고 있는 catalog service또한 topic의 데이터를 읽고 자신의 서비스로 가져와 자신의 테이블의 데이터를 변동시킬 수 있다.
♪ Catalog service 변경
- 라이브러리
- Kafka 추가
- 팩토리 추가 및 코드 추가
- messagequeue.KafkaConsumerConfig : 이름 그대로 컨슈머의 역할을 하게 되는 설정 코드이다.
- messagequeue.KafkaConsumer : Config를 통해 컨슈머의 데이터가 들어올 것이다. 그럼 topics의 이름을 따서 Listener를 하나 제작하여 해당 메세지를 가져와 DB처리를 하는 로직을 하나 만들어준다.
- 두 코드의 차이점
- Config
- Consumer의 설정 클래스이다. 해당 클래스는 consumer의 factory와 Listener Container factory를 설정한다. 이것은 Kafka Consumer의 생성과 실행을 담당한다.
- 또한 동작 방식과 속성을 정의하고 연결 정보, 그룹 ID, 직렬화와 역직렬화를 설정한다
- Service
- 메세지를 받아 데이터를 처리하는 역할을 하게 된다. @KafkaListener 어노테이션을 사용해서 특정 토픽을 구독하고 해당 토픽에서 발생한 메세지를 처리한다
♪ Order service 변경
- 라이브러리 추가
- Kafka 라이브러리 추가
- 팩토리 추가 및 클래스 추가
- messagequeue.KafkaProducerConfig : 이번에는 producer로써 전달하기 때문에 설정이 조금 다르다.
- messagequeue.KafkaProducer : 전달하는 클래스이기 때문에 send라는 메소드의 이름을 지어주었다. 인자를 받아 Dto를 통해 데이터를 조작하고 JSON타입으로 전달해주는 것을 알 수 있다.
- OrderServiceController.createOrder(POST) : 해당 데이터 변경은 주문을 하였을시에 변경이 되는 로직이라고 이해를 하면 된다 때문에 우리는 Kafka 를 통해 전달하게 되고 service에서 전달하는 로직을 작성하였다. 이제 우리는 그것을 controller단에서 전송을 한다고 보면 된다.
- messagequeue.KafkaProducerConfig : Consumer에서 Producer로 변경되고 따로 그룹ID가 없다는 것을 알았다.어차피 전송을 하는 곳이기 떄문에 컨슈머 그룹을 다뤄야하는 컨슈머쪽과는 조금 다르다. 또한 데이터를 전송하는 것이기 떄문에 Listener contatiner에 등록하지 않고 그저 보내는 Template만 제작해준다. 물론 Kafka전용 template이다
- messagequeue.KafkaProducer : 제작되는 방식은 매우 비슷하지만 Listener가 따로 없기에 어노테이션은 없고 send라는 메소드만 존재한다. 해당 메소드는 받은 데이터를 JSON화하여 전달하는 것을 볼 수 있다.
- Controller : 빨간 밑줄 친 곳을 보면 kafkaProducer를 사용하는 것을 볼 수 있다. 이렇게 우리가 사용하려는 곳 밑에 작성해주면 된다.
♪ TEST
- 테스트에 앞서 5개의 서버가 작동이 되어야 한다.
- Zookeeper 서버
- Kafka 서버
- Eureka 서버
- Config 서버
- apigateway 서버
- 이후 catalog와 order 서비스를 실행하면 된다. 그리고 eureka 대시보드를 통해 h2-console로 가서 데이터베이스의 변화를 보자
- 이제 PostMan을 통해 데이터를 넣어보자
- 전달이 정확히 되었는지는 각 서비스의 로그와 각 서비스의 console을 통한 DB를 확인해보면 되는 것이다.
- 메세지가 작아서 잘 보이지는 않지만 producer인 order는 정상적으로 변동된 데이터를 전달하고 consumer인 catalog는 데이터를 받고 save 를 통해 해당 데이터를 update까지 해주는 모습을 보여주고 있다.
- 결과적으로 성공적으로 해당 데이터가 변경된 것을 볼 수 있다.
♪ 결과
- Order에 주문이 들어왔고 사용자가 주문이 들어왔기 때문에 해당 갯수가 Catalog의 데이터에서 빠져야 한다. Kafka를 통해 각각의 서버에 전달하고 각 DB가 변동되는 것을 확인 할 수 있었다
🧶 멀티 Order Service에서 데이터 동기화
- 우리는 서비스를 사용할때 한개의 서비스를 여러개 포트를 열어 사용할 수 있다. 때문에 따로 열린 서비스의 DB는 다르다. 하지만 근본적으로 한개의 서비스이기 때문에 해당 서비스의 데이터는 같아야한다. 즉, 이번에 해야할것은 같은 이름의 서비스지만 다른 포트 설정으로 인해 DB가 달라 동기화 문제가 발생할 수 있는 문제를 Kafka로 해결해보자
♪ 테스트를 위한 준비
- orderService를 터미널을 사용하여 한개 더 사용한다
- Userservice 를 기동하여 사용자의 정보를 저장하기 위해 회원가입과 로그인을 진행한다.
- 우선 진행하는 과정은 해당 블로그에 있는 데이터 동기화 문제를 다룰떄랑 똑같다
👨👧👦 Microservice간의 통신 (RestTemplate & FeignClient)
🍃 Communication Types Synchronous HTTP communication ( 동기 ) 요청이 들어오면 해당 요청을 처리 할 떄 다른 서비스의 내용이 필요하면 내용을 가져다가 요청을 처리하는 과정을 한 프로세스로 보고 진행 A
latewalk.tistory.com
- 해당 블로그에서 다룬 동기화 문제에 이어져서 실제로 카프카를 이용해서 DB를 설정하게 되는데 이는 각각의 서비스가 아닌 MariaDB라는 한개의 DB에 데이터를 모두 넣어주는 방법을 사용하게 될 것이다.
♪ 추가 해주어야 하는 과정 1 ( OrderService의 DB 수정 )
- 한가지의 서비스는 한개의 DB를 사용해야 데이터 동기화에 좋다고 했다. 그렇다면 우리가 이번에 하는 orderservice에 데이터를 넣어주면 이제 모두 자신의 DB가 아닌 Kafka로 전달해야 한다.
- Kafka topic 에 메세지를 전달해주는 것이 Source Connect 이다
- Topic에서 데이터를 가져가서 사용하는 것이 Sink Connect이다.
- MariaDB에 데이터 추가
- OrderService 에 MariaDB에 관한 내용을 추가
- 라이브러리 추가 확인 : 과정을 그대로 따라왔다면 있겠지만 없을 수도 있으니 체크
- yml파일에서 h2가 아닌 mariaDB로 변경
- 다시 orderService 를 기동하고 데이터를 넣어보면 정상적으로 MariaDB로 들어간것을 확인 해 볼 수 있다.
♪ 추가 해주어야 하는 과정 2 ( OrderService의 Kafka 추가 )
- 우리는 이제 해당 서비스가 본인의 DB가 아닌 외부의 DB와 연결이 되기 때문에 데이터가 저장된다는 것을 알았고 그것을 저장하기 위해 Kafka 를 사용한다는 사실도 깨닳았다.때문에 우리는 변경해주어야 하는 정보가 있다. 바로 데이터에 저장하는 부분을 변경 해주어야 한다.
- 기존에 우리는 JPA를 통해 우리의 정보를 저장하고 save까지 해주었지만 이제 들어온 데이터를 메세지화 하여 producer의 역할을 하는 것으로 변경을 하게 된다.
----♣ DTO 제작----
- Kafka의 Connect producer를 사용하기 위해 객체를 만들어주어야 한다.
- KafkaOrderDto
- 우린 결국 Dto를 사용해서 데이터를 전송하게 될 것이다. 데이터는 크게 Schema와 Payload로 이루어져 있으며 그 각각의 데이터를 구성해야 한다.
- Schema
- 테이블의 구조를 뜻한다. 실제로 연결될 DB의 테이블의 상황을 작성하여 전달하면 그 내용에 맞춰 데이터를 입력한다. 여기서 List형태로 Field를 받는데 상세한 테이블의 값의 형태를 집어 넣는것이라고 생각하면 편하다
- Payload
- 실제로 전송될 데이터를 뜻한다. 우리가 기존에 DB에 저장하던 Entity구성을 똑같이 작성하면 된다.
- 완성된 메세지를 보면 좀 더 확실히 이해가 갈것이다.
----♣ Producer 제작----
- 우리가 Kafka Producer를 만든것처럼 OrderDB에 전달할 Producer도 만들어주어야 한다.
@Service
@Slf4j
public class OrderProducer {
private KafkaTemplate<String, String> kafkaTemplate;
List<Field> field = Arrays.asList(new Field("String", true, "order_id"),
new Field("String", true, "user_id"),
new Field("String", true, "product_id"),
new Field("int32", true, "qty"),
new Field("int32", true, "unit_price"),
new Field("int32", true, "total_price")
);
Schema schema = Schema.builder()
.type("struct")
.fields(field)
.optional(false)
.name("orders")
.build();
@Autowired
public OrderProducer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public OrderDto send(String topic, OrderDto orderDto) {
Payload payload = Payload.builder()
.order_id(orderDto.getOrderId())
.user_id(orderDto.getUserId())
.product_id(orderDto.getProductId())
.qty(orderDto.getQty())
.unit_price(orderDto.getUnitPrice())
.total_price(orderDto.getTotalPrice())
.build();
KafkaOrderDto kafkaOrderDto = new KafkaOrderDto(schema, payload);
ObjectMapper mapper = new ObjectMapper();
String jsonInString = "";
try {
jsonInString = mapper.writeValueAsString(kafkaOrderDto);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
kafkaTemplate.send(topic, jsonInString);
log.info("Kafka Producer send data from the Order microservice : " + kafkaOrderDto);
return orderDto;
}
}
- Field를 설정해주고 리스트에 넣어서 스키마에 넣어준다 이떄 우리는 Builder를 사용하기로 했기 때문에 사용하는 모습을 보여줄수 있다.
- send에 대한 부분도 변경이 있는 것을 확인할 수 있다.
- payload를 builder를 통해 설정해주었다.
- KafkaOrderDto를 통해 설정한 schema와 payload를 넣어준다.
- 마지막으로 KafkaOrderDto를 JSON형태로 전송하는 것을 볼 수 있다.
----♣ Controller 제작----
- Connect 전용의 kafka send를 또 불러줘야 하기 때문에 controller에서 OrderProducer의 의존성을 새롭게 주입하고 전송하는 것을 알 수 있다.
----♣ SinkConnector 추가----
- 카프카 커넥트를 연결해주어야 한다.
confluent 폴더 경로 ( window 버전 )
./bin/windows/connect-distributed.bat ./etc/kafka/connect-distributed.properties
- 싱크 커넥터 추가를 postman을 통해 요청으로 진행하는 것을 확인할 수 있다. sink를 추가하는 내용은 크게 달라지지 않았지만 name과 topics의 이름이 변경되었다. 그리고 GET을 통해 정상적으로 돌아가는 것을 확인하면 테스트를 진행하면 된다.
♪ TEST 시작
- 이제 지금까지 했던 모든 것을 한번에 동작을 하면서 확인을 해보자 단, 시작하기 전에 어떤 과정으로 테스트가 진행되는지 한번 알아볼 필요가 있다
- 두개의 오더서비스가 만들어져있고 그곳에 사용자가 주문을 하면 주문에 대한 내용이 라운드로빈 방식으로 나눠져 들어간다.
- 그렇다면 Catalog에 데이터가 전달되고 Catalog 서버는 데이터를 수정하게 된다
- 그리고 Order서비스는 kafka를 통해 MariaDB인 곳에 데이터를 처리하게 되면서 데이터가 한곳에서 관리된다
- 그러므로 사용자가 자신의 데이터를 가져올때 요청마다 다른 것이 아닌 일정하게 같은 갯수를 Response하는 것을 알 수 있다.
- 우선 두개의 요청이 서로 다른 서비스 포트 번호로 들어가는 것이 확인이 된다.
- 그러면 싱크 커넥터를 등록 해놓았기 때문에 싱크커넥터가 컨슈머에 등록된 JSON방식의 데이터를 확인하고 DB에 등록하기 때문에 잘 들어갔는지 DB를 확인해보면된다.
- 이런식으로 데이터가 잘 들어오는 것을 확인할 수있다.
요청를 통해 들어온 주문을 받아 OrderService는 Producer의 역할을 하여 해당 데이터를 JSON형태로 변환해서 직렬화를 통해 Kafka Topic으로 전달한다. 그럼 SinkConnector는 Topic에 들어온 데이터를 받아 DB에 저장하는 것을 볼 수 있다.
'MSA > MSA 강좌 - 이도원 강사님' 카테고리의 다른 글
👨👧👦 분산 추척, Zipkin (1) | 2024.04.12 |
---|---|
👨👧👦12. CircuitBreaker 와 Resilience4J의 사용 (0) | 2024.04.11 |
👨👧👦10. 데이터 동기화를 위한 Apache Kafka활용하기 (1) (0) | 2024.04.09 |
👨👧👦9. Microservice간의 통신 (RestTemplate & FeignClient) (0) | 2024.04.04 |
👨👧👦8. 설정 정보와 암호화 처리 (Encryption 과 Decryption) (1) | 2024.04.03 |