일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | ||
6 | 7 | 8 | 9 | 10 | 11 | 12 |
13 | 14 | 15 | 16 | 17 | 18 | 19 |
20 | 21 | 22 | 23 | 24 | 25 | 26 |
27 | 28 | 29 | 30 |
Tags
- Kafka
- truncate
- docker-compose
- selenium
- docker
- Django
- dag 작성
- yarn
- redshift
- 웹 크롤링
- dag
- Serializer
- Django Rest Framework(DRF)
- ETL
- 웹 스크래핑
- 컨테이너 삭제
- SQL
- 데이터파이프라인
- AWS
- spark
- 알고리즘
- 데이터마트
- snowflake
- 데이터 웨어하우스
- airflow.cfg
- docker hub
- 데이터레이크
- ELT
- airflow
- Hive
Archives
- Today
- Total
개발 기록장
01. [Kafka]Producer.py 와 Consumer.py 본문
반응형
상황: 실시간 지하철 데이터를 Kafka를 이용하여 처리해아함
먼저 강의에서 다루었던 데이터 전달 방식인 Producer.py와 Consumer.py를 작성하여 Kafka 환경을 Test했다.
- 데이터는 서울 열린 데이터 광장의 노선별 실시간 지하철 데이터를 이용: https://data.seoul.go.kr/dataList/OA-12601/A/1/datasetView.do
Producer.py: Topic을 생성하고 데이터를 Topic으로 전송한다.
- 노선별 지하철 정보를 받기위해 subway 리스트를 만들어 반복문을 돌려 API 요청을 넣었음
- Consumer.py에서 노선별 지하철 정보 저장을 위해, 실시간 지하철 데이터와 함께 지하철 노선명도 함께 Topic을 통해 전달함
- 생성된 Topic 이름: subway_realtime
- 10초마다 API에 요청을 넣어 새로운 데이터를 받아옴
import requests
from kafka import KafkaProducer
import json
import time
import requests
# Kafka 설정
KAFKA_TOPIC = 'subway_realtime'
KAFKA_BOOTSTRAP_SERVERS = 'localhost:9092'
# 기본 API URL (동적 부분을 포함)
BASE_API_URL = 'http://swopenapi.seoul.go.kr/api/subway/발급받은 인증키/xml/realtimePosition/0/100/{sub}'
# Kafka 프로듀서 생성
producer = KafkaProducer(
bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
def fetch_subway_data(sub):
api_url = BASE_API_URL.format(sub=sub)
response = requests.get(api_url)
if response.status_code == 200:
return response.text # JSON이 아닌 XML 포맷이므로 text로 가져옴
else:
print(f"Failed to fetch data: {response.status_code}")
return None
subway = [
'1호선', '2호선', '3호선', '4호선', '5호선', '6호선', '7호선', '8호선', '9호선',
'경의중앙선', '공항철도', '경춘선', '수인분당선', '신분당선', '우이신설선', 'GTX-A',
'서해선', '경강선'
]
while True:
for sub in subway:
data = fetch_subway_data(sub)
if data:
message = {
"sub": sub,
"data": data
}
producer.send(KAFKA_TOPIC, message)
print(f"Sent data to Kafka for {sub}: {message}")
time.sleep(10) # 10초마다 모든 노선의 데이터 요청
Consumer.py: Topic으로부터 데이터를 받아오고 파일로 저장한다.
- 공공데이터는 XML 형식의 데이터 -> Json 형식으로 저장하기 위해 포맷 변환
- 로컬 파일에 지하철 노선 명으로 노선별 실시간 지하철 정보 데이터 저장
- ex) subway1호선_realTimedata.json
from kafka import KafkaConsumer
import json
import os
import datetime
import xmltodict
# Kafka 설정
KAFKA_TOPIC = 'subway_realtime'
KAFKA_BOOTSTRAP_SERVERS = 'localhost:9092'
# 로컬 저장 경로 설정
LOCAL_SAVE_PATH = 'subway_data/'
# 저장 경로가 존재하지 않으면 생성
if not os.path.exists(LOCAL_SAVE_PATH):
os.makedirs(LOCAL_SAVE_PATH)
# Kafka 소비자 생성
consumer = KafkaConsumer(
KAFKA_TOPIC,
bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='subway-group',
value_deserializer=lambda x: x.decode('utf-8') # XML 데이터는 JSON 디코더가 필요 없음
)
def save_to_local_file(data, filename):
with open(os.path.join(LOCAL_SAVE_PATH, filename), 'w') as f:
f.write(data)
for message in consumer:
try:
# 메시지 값을 JSON으로 변환
message_value = json.loads(message.value)
sub = message_value["sub"]
savedata = message_value["data"]
json_data = xmltodict.parse(savedata)
# JSON 데이터를 문자열로 변환
json_string = json.dumps(json_data, ensure_ascii=False, indent=4)
# 파일 이름 설정
filename = f"subway{sub}_realTimedata.json"
# 로컬 파일에 저장
save_to_local_file(json_string, filename)
print("저장완료")
except json.JSONDecodeError as e:
print(f"Failed to decode JSON: {e}")
except TypeError as e:
print(f"Type error: {e}")
except Exception as e:
print(f"Unexpected error: {e}")
저장된 데이터 결과
- 노선별로 Json 형식의 데이터가 저장되는 것을 확인할 수 있음
{
"realtimePosition": {
"RESULT": {
"code": "INFO-000",
"developerMessage": null,
"link": null,
"message": "정상 처리되었습니다.",
"status": "200",
"total": "76"
},
"row": [
{
"rowNum": "1",
"selectedCount": "76",
"totalCount": "76",
"subwayId": "1001",
"subwayNm": "1호선",
"statnId": "1001000125",
"statnNm": "제기동",
"trainNo": "0113",
"lastRecptnDt": "20240717",
"recptnDt": "2024-07-17 14:20:57",
"updnLine": "1",
"statnTid": "1001000161",
"statnTnm": "인천",
"trainSttus": "1",
"directAt": "0",
"lstcarAt": "0"
},
{
"rowNum": "2",
"selectedCount": "76",
"totalCount": "76",
"subwayId": "1001",
"subwayNm": "1호선",
"statnId": "1001000126",
"statnNm": "신설동",
"trainNo": "0115",
"lastRecptnDt": "20240717",
"recptnDt": "2024-07-17 14:20:18",
"updnLine": "1",
"statnTid": "1001000161",
"statnTnm": "인천",
"trainSttus": "1",
"directAt": "0",
"lstcarAt": "0"
},
.
.
.
반응형
'Kafka와 Kafka Streams' 카테고리의 다른 글
03. [Kafka Streams] 실시간 데이터에 ELT 적용과 MongoDB 커넥터 (0) | 2024.08.06 |
---|---|
02. [Kafka] Kafka Connector의 적용 (0) | 2024.08.03 |