Python/Advanced
[Python] Thread(5) - Producer and Consumer Using Queue
unsungIT
2021. 5. 17. 20:02
큐를 이용한 생산자 소비자 구조의 스레드
import concurrent.futures
import logging
import queue
import random
import threading
import time
# 생산자
def producer(queue, event):
'''
네트워크 대기 상태라 가정(서버)
'''
while not event.is_set():
message = random.randint(1,101)
logging.info('Producer got message: {}'.format(message))
queue.put(message)
logging.info('Producer received event Exiting')
# 소비자
def consumer(queue, event):
'''
응답 받고 소비하는 것으로 가정 or DB 저장
'''
while not event.is_set() or not queue.empty():
message = queue.get()
logging.info(
'Consumer storing message: {}, (size={})'.format( message, queue.qsize())
)
logging.info('Consumer received enent Exiting')
if __name__=='__main__':
# Logging format 설정
format = "%(asctime)s: %(message)s"
logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S")
# 사이즈 중요
pipeline = queue.Queue(maxsize=100)
# 이벤트 플래그 초기 값 0
event = threading.Event()
# With context 시작
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
# 기존에는 메소드1개, 각 스레드에서 메소드를 코드 영역에서 공유함.
# 지금은 스레드(생산자), 스레드(소비자), 각각 실행, 통신은 큐를 이용해서 통신함.
# (self, fn, *args, **kwargs): 함수, 데이터공유(큐), 이벤트
executor.submit(producer, pipeline, event)
# (self, fn, *args, **kwargs): 함수, 데이터공유(큐), 이벤트
executor.submit(consumer, pipeline, event)
# 실행 시간 조정
time.sleep(0.1)
logging.info('Main : about to set event')
#프로그램 종료
event.set()