일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | ||||
4 | 5 | 6 | 7 | 8 | 9 | 10 |
11 | 12 | 13 | 14 | 15 | 16 | 17 |
18 | 19 | 20 | 21 | 22 | 23 | 24 |
25 | 26 | 27 | 28 | 29 | 30 | 31 |
Tags
- Hive
- truncate
- AWS
- 데이터레이크
- Django
- 알고리즘
- redshift
- dag 작성
- docker-compose
- 웹 스크래핑
- 데이터파이프라인
- 웹 크롤링
- airflow.cfg
- yarn
- 데이터 웨어하우스
- SQL
- ELT
- snowflake
- airflow
- spark
- 컨테이너 삭제
- dag
- selenium
- Serializer
- 데이터마트
- Django Rest Framework(DRF)
- docker hub
- Kafka
- ETL
- docker
Archives
- Today
- Total
개발 기록장
03. [Kafka Streams] 실시간 데이터에 ELT 적용과 MongoDB 커넥터 본문
반응형
AWS S3 스토리지로 데이터를 적재할 때의 문제점
- S3는 서비스 DB로 적합하지 않음
- 지하철 데이터는 15초의 주기로 생성되고, 이를 반영해 대시보드에서도 실시간 성을 유지해야 한다. 그러나 S3에 적재하게 되면, Kafka Topic으로부터 데이터를 저장하는 것뿐만 아니라 데이터를 대시보드에서 출력하는 데에도 시간이 오래 걸린다. 또 실시간 지하철 정보 데이터는 축적되어 저장할 필요가 없기 때문에 용량이 커다란 스토리지가 필요하지 않다.
- ELT의 필요성
- API로부터 받아온 데이터에는 대시보드 시각화에 필요 없는 정보와 보기 편하도록 변환해야 하는 값들도 존재한다. 기존에는 이 값들을 태블로 대시보드 시각화 과정에서 정리하려 했으나, 태블로에서 이 값들을 처리하는 과정에서도 시간이 오래 걸린다.(실시간 성을 해친다고 판단!)
해결책
Kafka Streams를 도입하자!
- Kafka와의 좋은 호환성
- 기존의 파이프라인 구조를 확장하여 사용할 수 있음
- 별도의 클러스터가 필요 없고, 기존 Kafka 클러스터에 통합되어 운영
- 실시간 스트림 처리 가능
- 낮은 지연 시간을 보장하며, 실시간 스트림 처리를 지원함
- 지연 없이 실시간으로 데이터를 변환할 수 있음
- 좋은 유연성
- 다른 데이터베이스와의 호환성이 좋음
- 단순한 개발 및 유지보수 가능
- 자바 코드로 쉽게 개발 가능
- 변경사항이 생길 때마다 자바 코드 수정해서 이미지 빌드하면 됨
MongoDB: 실시간 지하철 데이터의 서비스 DB이자 Data Mart
- 기존의 DB인 AWS S3 대체
- 스키마리스 데이터베이스
- 지하철 정보 데이터는 JSON 형식이므로 스키마의 유연성이 필요했음
- 뛰어난 입출력 성능
- 실시간 성을 유지해야하므로 데이터의 입출력이 빠르게 이루어져야함
- Kafka에 MongoDB Sink Connector 존재
- 설정 및 유지보수의 편의성
- Kafka 클러스터와 호환성 용이
프로젝트에서의 적용
목표: HttpSource Connector를 통해 받아온 데이터를 원하는 형태로 변환하고 MongoDB로 저장해야 함
- 서울 열린데이터 광장 실시간 지하철 정보(일괄): https://data.seoul.go.kr/dataList/OA-15799/A/1/datasetView.do
- 데이터 수집(ETL): HTTP Source Connector
- 데이터 처리(ELT): Kafka Stremas App
- 데이터 저장: MongoDB Sink Connector
- 서비스 DB이자 Data Mart: MongoDB
Kafka Stremas APP 작성 및 Docker 이미지 빌드
Java 프로젝트 생성(개인마다 설치되어 있는 환경에 따라 달라지겠지만 내 설정은 다음과 같다)
- 이름 작성: Kafka Streams APP
- Language: Java
- Build System: Maven
- JDK: 11.0.11 version
Pom.xml 작성
- Kafka Streams 관련 의존성 삽입: 기존에 구축되어 있는 카프카 버전과의 호환성 확인 필요!!
- MVN Repository
- Kafka Streams: https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams
- Kafka Clients: https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients
- 데이터 변환 코드 작성에 필요한 의존성 삽입
- 전체 코드: https://github.com/kjw4420/Kafka_Streams_App/blob/main/pom.xml
Kafka Stremas 앱 생성 및 데이터 변환 코드: myStreamsApp.java
- 기존의 Kafka 클러스터와 연결하기
- 데이터 변환 코드 추가
- ex) 데이터 매핑, 시간값 변환, 필요 없는 칼럼 삭제 등
- 변환된 데이터 output 토픽으로 내보내기
- 전체 코드: https://github.com/kjw4420/Kafka_Streams_App/blob/main/src/main/java/org/example/myStreamsApp.java
package org.example;
import java.util.Properties;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
public class myStreamsApp {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-app"); // Kafka Streams 애플리케이션 이름
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:19092"); // Kafka Broker 서버 포트
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 2097152); // 2 MB
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream("subway"); // Input 토픽
// 데이터 변환 코드
// 데이터 변환 후 Output 토픽으로 내보내기
// transformedStream.to("output-topic", Produced.with(Serdes.String(), Serdes.String()));
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
}
}
Dockerfile 작성
- 자신의 환경에 따른 openJDK 사용하는 베이스 이미지 설정
- 컨테이너 작업 WorkDIR 설정
- JAR file 컨테이너로 복사
- 애플리케이션 실행 명령어 지정
# Use a specific OpenJDK version base image
FROM openjdk:11.0.11-jre-slim
# Set the working directory in the container
WORKDIR /app
# Copy the JAR file into the container
COPY target/kafka_streams_app-1.0-SNAPSHOT.jar /app/kafka-streams-app.jar
# Specify the command to run the application
CMD ["java", "-jar", "/app/kafka-streams-app.jar"]
MVN 빌드 && Docker image 빌드
다음의 명령어 차례로 실행
- MVN 빌드: mvn clean install
- Docker image 빌드: docker build -t kafka-streams-app:latest .
Kafka 클러스터에 Kafka Streams App 추가하기
- Kafka의 docker-compose.yml 파일에 추가
kafka-streams-app:
image: kafka-streams-app:latest
container_name: kafka-streams-app
depends_on:
- kafka1
- zoo1
environment:
- KAFKA_BOOTSTRAP_SERVERS=kafka1:19092 # Kafka Broker 서버 포트
- APPLICATION_ID=kafka-streams-app # Kafka Streams 애플리케이션 이름
- 컨테이너 생성 명령어 실행: docker-compose up
MongoDB 구축
Kafka 클러스터에 MongoDB 추가하기
우리 프로젝트에서는 데이터의 축적 형태가 아닌 실시간으로 덮어쓰는 형식으로 MongoDB를 이용할 것이므로 Docker Compose를 이용해 Kafka 클러스터에 추가하는 것이지만, 만약 MongoDB에 데이터를 누적하고 데이터가 사라지면 안되는 것이라면 MongoDB를 Kafka 클러스터에 추가하는 방식이 아닌 다른 방식을 고려해 보아야 함!
- 요약
- 우리 서비스는 데이터 누적할 필요가 없으므로 즉, Kafka 클러스터가 종료되면서 DB 데이터가 사라져도 상관 없으므로 docker compose안에 MongoDB 서비스 정의하는 것임
- 만약 데이터가 사라지는 것이 걱정된다면 Kafka 클러스터와 MongoDB 별개로 환경 구축하는 것이 좋음
- Kafka 클러스터 Docker-compose.yml에 MongoDB 추가
mongodb:
image: mongo:latest
hostname: mongodb
container_name: mongodb
ports:
- "27017:27017"
- 컨테이너 생성 명령어 실행: docker-compose up
MongoDB 설정
- mongoDB 컨테이너 bash로 접속
- docker exec -it mongodb bash
- mongodDB 접속
- mongosh mongodb://localhost:27017
- 데이터베이스 보기
- show databases
- 사용할 데이터베이스 이름 설정 및 접속
- use [데이터베이스 이름]
- 컬렉션 생성
- db.createCollection("컬렉션 이름")
번외) 데이터 확인 명령어
- 데이터베이스 접속
- use subway_db
- 컬렉션 보기
- show collections
- 컬렉션에 저장된 데이터 보기
- db.컬렉션이름.find().pretty()
- 컬렉션에 저장된 데이터 개수 보기
- db.컬렉션이름.countDocuments()
MongoDB Sink Connector 구축
- output topic의 데이터 MongoDB로 저장
- 이전의 데이터 누적 형식 X -> 새로운 데이터로 덮어쓰는 형식으로 설정: 실시간 데이터만 필요하므로...
- Connector에 대한 자세한 설명은 앞선 블로그 글 참조: https://jiwon-www.tistory.com/67
MongoDB Sink Connector 설치
- Kafka docker-compose.yml에서 Connector 설치: Command 아래에 아래 명령어 붙이기
- confluent-hub install --no-prompt mongodb/kafka-connect-mongodb:latest
- kafka 클러스터 재빌드: docker-compose up
세부 설정
- 주요 설정사항
- collection: MongoDB에서 데이터 저장할 컬렉션 이름
- connection.uri: MongoDB 연결 URI
- database: MongoDB 데이터베이스 이름
- topics: 데이터 가져올 Kafka 토픽
- id 값 기준으로 데이터 덮어쓰기 설정
- Kafka Connector 설정 POST 명령어
curl -X POST -H "Content-Type: application/json" --data '{
"name": "mongo-sink-connector",
"writemodel.strategy": "com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneBusinessKeyStrategy",
"collection": "subway_collection",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"tasks.max": "1",
"connection.uri": "mongodb://mongodb:27017",
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
"document.id.strategy.overwrite.existing": "true",
"value.projection.type": "AllowList",
"key.projection.list": "_id",
"value.projection.list": "_id",
"database": "subway_db",
"topics": "output-topic",
"document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy",
"key.projection.type": "AllowList"
}' http://localhost:8083/connectors
정상 작동
- localhost:8080 kafka console에서 확인
- output topic이 정상적으로 생성된 것을 확인할 수 있음
- 두 커넥터 모두 State: Running인 것을 확인할 수 있음
- Http Source Connector
- MongoDB Sink Connector
반응형
'Kafka와 Kafka Streams' 카테고리의 다른 글
02. [Kafka] Kafka Connector의 적용 (0) | 2024.08.03 |
---|---|
01. [Kafka]Producer.py 와 Consumer.py (0) | 2024.07.27 |