일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | ||||
4 | 5 | 6 | 7 | 8 | 9 | 10 |
11 | 12 | 13 | 14 | 15 | 16 | 17 |
18 | 19 | 20 | 21 | 22 | 23 | 24 |
25 | 26 | 27 | 28 | 29 | 30 | 31 |
- Hive
- redshift
- 데이터파이프라인
- spark
- dag 작성
- Kafka
- docker
- SQL
- Django
- 웹 크롤링
- 알고리즘
- docker hub
- snowflake
- airflow.cfg
- truncate
- airflow
- 웹 스크래핑
- ELT
- 컨테이너 삭제
- 데이터 웨어하우스
- Serializer
- Django Rest Framework(DRF)
- 데이터레이크
- AWS
- docker-compose
- 데이터마트
- selenium
- ETL
- yarn
- dag
- Today
- Total
개발 기록장
02. [Kafka] Kafka Connector의 적용 본문
기존에는 Kafka에서 Producer.py와 Consumer.py를 이용하여 데이터를 처리했다. 이 방식은 간단한 데이터를 처리할 경우에는 직관적으로 빠르게 코드를 작성하여 처리할 수 있다는 장점이 있지만, 확장성/ 모니터링의 측면에서는 적합하지 않다는 단점이 있었다.
그래서 우리는 Kafka의 Connector의 사용을 고려하기 시작했다. 우리가 프로젝트에서 받아와야할 데이터는 서울 열린데이터 광장에서 지하철 데이터를 실시간으로 받아와야하고, API의 호출 횟수 제한이 있으므로 모니터링이 굉장히 중요한 부분이었다. 또 커넥터를 사용한다면 변경 사항이 있을 때, 따로 코드의 작성 없이 Kafka 상에서 커넥터 설정만을 수정해 사용할 수 있으므로 간편하다고 생각했다.
Kafka Connector
Kafka Connector은 크게 Source Connector와 Sink Connector로 나누어진다. Source Connector은 Topic에 데이터를 input하는 Connetor로 데이터 소스(DB, API)로부터 데이터를 가져온다. Sink Connector은 Topic의 output 데이터를 목적지(DB)에 저장해주는 역할을 한다. 데이터를 가져올 곳과 저장할 곳의 종류/목적에 따라 사용해야하는 Connector의 종류가 달라진다.
Kafka Source Connector
일반적으로 많이 사용되는 Source Connetor를 몇가지 나열하면 다음과 같다.
- File Source Connector: 파일 시스템에 저장된 데이터를 읽어옴
- ex) 로그 파일 수집, CSV 파일 처리
- JDBC Source Connector: 관계형 데이터베이스에서 데이터를 읽어옴
- ex) MySQL, PostgreSQL 등
- MongoDB Source Connector: MongoDB에서 데이터를 읽어옴
- ex) NoSQL 데이터베이스에서 데이터 수집
- HTTP Source Connector: HTTP 엔드포인트에서 데이터를 읽어옴
- ex) REST API를 통해 데이터 수집
Kafka Sink Connector
일반적으로 많이 사용되는 Sink Connecotr를 몇가지 나열하면 다음과 같다.
- JDBC Sink Connector: Kafka 토픽의 데이터를 관계형 데이터베이스에 저장
- ex) MySQL, PostgreSQL 등
- Elasticsearch Sink Connector: Kafka 토픽의 데이터를 Elasticsearch에 저장
- ex) 실시간 로그 분석, 검색 및 인덱싱
- HDFS Sink Connector: Kafka 토픽의 데이터를 Hadoop HDFS에 저장
- ex) 대규모 데이터 저장 및 분석
- S3 Sink Connector: Kafka 토픽의 데이터를 Amazon S3dp 저장
- ex) 장기 데이터 저장, 데이터 백업
- MongoDB Sink Connector: Kafka 토픽의 데이터를 MongoDB에 저장
- ex) NoSQL 데이터베이스에 실시간 데이터 저장
프로젝트에서의 적용
목표: Rest API로부터 실시간으로 데이터를 받아와 우리 프로젝트의 서비스 DB로 저장해야 함
- 서울 열린데이터 광장 실시간 지하철 정보(일괄): https://data.seoul.go.kr/dataList/OA-15799/A/1/datasetView.do
- 데이터 수집: HTTP Source Connector
- Rest API로부터 데이터를 가져옴
- 데이터 저장: S3 Sink Connector
- Amazon S3에 저장
Connector 설정 방법
Kafka yml 파일 Connector 부분에 사용할 Connector 설정 추가(사용하는 Kafka 버전 고려해서 설치하기!)
- 참고: https://docs.confluent.io/kafka-connectors/http-source/current/overview.html?utm_content=plugin_deployment_options&utm_source=confluent_hub&utm_term=kafka-connect-http-source
- Connector 설치: Command 아래에 아래 명령어 붙이기
- confluent-hub install --no-prompt confluentinc/kafka-connect-s3:latest
- confluent-hub install --no-prompt confluentinc/kafka-connect-http-source:latest
- Amazon S3에 데이터를 저장해야 하므로 권한 설정 필요: environment 아래에 추가
- AWS_ACCESS_KEY_ID: 실제 자신의 ID
- AWS_SECRET_ACCESS_KEY: 실제 값
- AWS_DEFAULT_REGION: 실제 값
- 해당 yml 파일 빌드 및 실행!
- docker compose up
kafka-connect:
image: confluentinc/cp-kafka-connect:7.3.2
hostname: kafka-connect
container_name: kafka-connect
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: "kafka1:19092"
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: "http://kafka-schema-registry:8081"
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: "http://kafka-schema-registry:8081"
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect"
CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR"
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_PLUGIN_PATH: "/usr/share/java,/etc/kafka-connect/jars,/usr/share/confluent-hub-components"
# 여기 추가
AWS_ACCESS_KEY_ID: ******
AWS_SECRET_ACCESS_KEY: *******
AWS_DEFAULT_REGION: ********
volumes:
- ./connectors:/etc/kafka-connect/jars/
depends_on:
- zoo1
- kafka1
- kafka-schema-registry
- kafka-rest-proxy
command:
- bash
- -c
- |
confluent-hub install --no-prompt debezium/debezium-connector-mysql:latest
confluent-hub install --no-prompt confluentinc/kafka-connect-datagen:0.4.0
confluent-hub install --no-prompt jcustenborder/kafka-connect-http:0.0.0.1
# 여기 추가!!
confluent-hub install --no-prompt confluentinc/kafka-connect-s3:latest
confluent-hub install --no-prompt confluentinc/kafka-connect-http-source:latest
/etc/confluent/docker/run
설치 완료 되었으면 아래 명령어로 커넥터 설정: Kafka Connect REST API에 POST 요청 CURL 명령어
- Kafka Connect: http://localhost:8083/connectors
- HttpSourceConnector 이름으로 Http Souce Connector 생성 및 관련 설정
- Subway라는 이름의 Topic을 생성하고 데이터 전달
- url로 부터 JSON 데이터를 받아오고 처리
- kafka broker server: kafka1: 19092
curl -X POST -H "Content-Type: application/json" --data '{
"name": "HttpSourceConnector",
"config": {
"poll.interval.ms": "15000",
"url": "http://swopenAPI.seoul.go.kr/api/subway/***API KEY***/json/realtimeStationArrival/ALL",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"topic.name.pattern": "subway",
"connector.class": "io.confluent.connect.http.HttpSourceConnector",
"confluent.license": "",
"tasks.max": "1",
"http.initial.offset": "0",
"max.request.size": "20971520",
"http.offset.mode": "SIMPLE_INCREMENTING",
"confluent.topic.bootstrap.servers": "kafka1:19092",
"confluent.topic.replication.factor": "1",
"value.converter.schemas.enable": "false"
}
}' http://localhost:8083/connectors
- confluent-s3-sink 이름으로 S3 Source Connector 생성 및 관련 설정
- Subway라는 이름의 Topic으로부터 데이터 받아서 json 파일로 S3에 저장
- 파일명 yyyyMMddHH로 현재 시간으로 데이터 S3에 저장
- aws bucket, access key, secret access key 설정
- "rotate.interval.ms": "10000" -> 10초마다 저장
curl -X POST -H "Content-Type: application/json" --data '{
"name": "confluent-s3-sink",
"config": {
"behavior.on.null.values": "ignore",
"topics": "subway",
"path.format": "YYYY-MM-dd",
"s3.region": "********",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"schema.compatibility": "NONE",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"s3.bucket.name": "S3 버킷이름",
"filename.timestamp.pattern": "yyyyMMddHH",
"rotate.schedule.interval.ms": "10000",
"partition.field.name": "timestamp",
"aws.secret.access.key": "***********",
"partition.duration.ms": "3600000",
"s3.part.size": "5242880",
"aws.access.key.id": "*********",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"timezone": "Asia/Seoul",
"tasks.max": "1",
"flush.size": "1",
"locale": "ko_KR",
"rotate.interval.ms": "10000",
"timestamp.extractor": "Record"
}
}' http://localhost:8083/connectors
정상 작동
- localhost:8080 kafka console에서 확인
- subway topic이 정상적으로 생성된 것을 확인할 수 있음
- 두 커넥터 모두 State: Running인 것을 확인할 수 있음
오류 상황: Source connector에서 들어오는 데이터의 크기가 너무 커서 발생하는 오류
2024-07-19 17:33:54 [2024-07-19 08:33:54,342] ERROR WorkerSourceTask{id=HttpSourceConnector-0} failed to send record to subway: (org.apache.kafka.connect.runtime.AbstractWorkerSourceTask)
2024-07-19 17:33:54 org.apache.kafka.common.errors.RecordTooLargeException: The message is 1873216 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.
- Kafka broker max message size 설정 늘리기: kafka1(브로커) environment 아래에 추가
- KAFKA_MAX_MESSAGE_BYTES: 20971520 # 20MB
- KAFKA_MESSAGE_MAX_BYTES: 20971520 # 20MB
- KAFKA_SOCKET_REQUEST_MAX_BYTES: 20971520 # 20MB
kafka1:
image: confluentinc/cp-kafka:7.3.2
hostname: kafka1
container_name: kafka1
ports:
- "9092:9092"
- "29092:29092"
- "9999:9999"
environment:
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://host.docker.internal:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
KAFKA_BROKER_ID: 1
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_JMX_PORT: 9001
KAFKA_JMX_HOSTNAME: ${DOCKER_HOST_IP:-127.0.0.1}
KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
# 추가
KAFKA_MAX_MESSAGE_BYTES: 20971520 # 20MB
KAFKA_MESSAGE_MAX_BYTES: 20971520 # 20MB
KAFKA_SOCKET_REQUEST_MAX_BYTES: 20971520 # 20MB
- Kafka connector Max Size 설정 늘리기: connector environment 아래에 추가
- CONNECT_PRODUCER_MAX_REQUEST_SIZE: 20971520 # 20MB
- CONNECT_MAX_REQUEST_SIZE: 20971520 # 20MB
kafka-connect:
image: confluentinc/cp-kafka-connect:7.3.2
hostname: kafka-connect
container_name: kafka-connect
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: "kafka1:19092"
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: "http://kafka-schema-registry:8081"
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: "http://kafka-schema-registry:8081"
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect"
CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR"
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_PLUGIN_PATH: "/usr/share/java,/etc/kafka-connect/jars,/usr/share/confluent-hub-components"
AWS_ACCESS_KEY_ID: ********
AWS_SECRET_ACCESS_KEY: *********
AWS_DEFAULT_REGION: *********
# 추가
CONNECT_PRODUCER_MAX_REQUEST_SIZE: 20971520 # 20MB
CONNECT_MAX_REQUEST_SIZE: 20971520 # 20MB
- docker 파일 재빌드 및 실행
'Kafka와 Kafka Streams' 카테고리의 다른 글
03. [Kafka Streams] 실시간 데이터에 ELT 적용과 MongoDB 커넥터 (0) | 2024.08.06 |
---|---|
01. [Kafka]Producer.py 와 Consumer.py (0) | 2024.07.27 |