개발 기록장

01. [Kafka]Producer.py 와 Consumer.py 본문

Kafka와 Kafka Streams

01. [Kafka]Producer.py 와 Consumer.py

jxwxnk 2024. 7. 27. 16:57
반응형

상황: 실시간 지하철 데이터를 Kafka를 이용하여 처리해아함
먼저 강의에서 다루었던 데이터 전달 방식인 Producer.py와 Consumer.py를 작성하여 Kafka 환경을 Test했다.

Producer.py: Topic을 생성하고 데이터를 Topic으로 전송한다.

  • 노선별 지하철 정보를 받기위해 subway 리스트를 만들어 반복문을 돌려 API 요청을 넣었음
  • Consumer.py에서 노선별 지하철 정보 저장을 위해, 실시간 지하철 데이터와 함께 지하철 노선명도 함께 Topic을 통해 전달함
    • 생성된 Topic 이름: subway_realtime
  • 10초마다 API에 요청을 넣어 새로운 데이터를 받아옴
import requests
from kafka import KafkaProducer
import json
import time
import requests

# Kafka 설정
KAFKA_TOPIC = 'subway_realtime'
KAFKA_BOOTSTRAP_SERVERS = 'localhost:9092'

# 기본 API URL (동적 부분을 포함)
BASE_API_URL = 'http://swopenapi.seoul.go.kr/api/subway/발급받은 인증키/xml/realtimePosition/0/100/{sub}'

# Kafka 프로듀서 생성
producer = KafkaProducer(
    bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

def fetch_subway_data(sub):
    api_url = BASE_API_URL.format(sub=sub)
    response = requests.get(api_url)
    if response.status_code == 200:
        return response.text  # JSON이 아닌 XML 포맷이므로 text로 가져옴
    else:
        print(f"Failed to fetch data: {response.status_code}")
        return None

subway = [
    '1호선', '2호선', '3호선', '4호선', '5호선', '6호선', '7호선', '8호선', '9호선', 
    '경의중앙선', '공항철도', '경춘선', '수인분당선', '신분당선', '우이신설선', 'GTX-A',
    '서해선', '경강선'
]

while True:
    for sub in subway:
        data = fetch_subway_data(sub)
        if data:
            message = {
                "sub": sub,
                "data": data
            }
            producer.send(KAFKA_TOPIC, message)
            print(f"Sent data to Kafka for {sub}: {message}")

    time.sleep(10)  # 10초마다 모든 노선의 데이터 요청

Consumer.py: Topic으로부터 데이터를 받아오고 파일로 저장한다.

  • 공공데이터는 XML 형식의 데이터 -> Json 형식으로 저장하기 위해 포맷 변환
  • 로컬 파일에 지하철 노선 명으로 노선별 실시간 지하철 정보 데이터 저장
    • ex) subway1호선_realTimedata.json
from kafka import KafkaConsumer
import json
import os
import datetime
import xmltodict

# Kafka 설정
KAFKA_TOPIC = 'subway_realtime'
KAFKA_BOOTSTRAP_SERVERS = 'localhost:9092'

# 로컬 저장 경로 설정
LOCAL_SAVE_PATH = 'subway_data/'

# 저장 경로가 존재하지 않으면 생성
if not os.path.exists(LOCAL_SAVE_PATH):
    os.makedirs(LOCAL_SAVE_PATH)

# Kafka 소비자 생성
consumer = KafkaConsumer(
    KAFKA_TOPIC,
    bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    group_id='subway-group',
    value_deserializer=lambda x: x.decode('utf-8')  # XML 데이터는 JSON 디코더가 필요 없음
)


def save_to_local_file(data, filename):
    with open(os.path.join(LOCAL_SAVE_PATH, filename), 'w') as f:
        f.write(data)


for message in consumer:
    try:
        # 메시지 값을 JSON으로 변환
        message_value = json.loads(message.value)

        sub = message_value["sub"]
        savedata = message_value["data"]

        json_data = xmltodict.parse(savedata)
        # JSON 데이터를 문자열로 변환
        json_string = json.dumps(json_data, ensure_ascii=False, indent=4)
        # 파일 이름 설정
        filename = f"subway{sub}_realTimedata.json"

        # 로컬 파일에 저장
        save_to_local_file(json_string, filename)
        print("저장완료")
    except json.JSONDecodeError as e:
        print(f"Failed to decode JSON: {e}")
    except TypeError as e:
        print(f"Type error: {e}")
    except Exception as e:
        print(f"Unexpected error: {e}")

저장된 데이터 결과

  • 노선별로 Json 형식의 데이터가 저장되는 것을 확인할 수 있음
{
    "realtimePosition": {
        "RESULT": {
            "code": "INFO-000",
            "developerMessage": null,
            "link": null,
            "message": "정상 처리되었습니다.",
            "status": "200",
            "total": "76"
        },
        "row": [
            {
                "rowNum": "1",
                "selectedCount": "76",
                "totalCount": "76",
                "subwayId": "1001",
                "subwayNm": "1호선",
                "statnId": "1001000125",
                "statnNm": "제기동",
                "trainNo": "0113",
                "lastRecptnDt": "20240717",
                "recptnDt": "2024-07-17 14:20:57",
                "updnLine": "1",
                "statnTid": "1001000161",
                "statnTnm": "인천",
                "trainSttus": "1",
                "directAt": "0",
                "lstcarAt": "0"
            },
            {
                "rowNum": "2",
                "selectedCount": "76",
                "totalCount": "76",
                "subwayId": "1001",
                "subwayNm": "1호선",
                "statnId": "1001000126",
                "statnNm": "신설동",
                "trainNo": "0115",
                "lastRecptnDt": "20240717",
                "recptnDt": "2024-07-17 14:20:18",
                "updnLine": "1",
                "statnTid": "1001000161",
                "statnTnm": "인천",
                "trainSttus": "1",
                "directAt": "0",
                "lstcarAt": "0"
            },
            .
            .
            .
반응형