큐를 이용한 생산자 소비자 구조의 스레드
import concurrent.futures
import logging
import queue
import random
import threading
import time
# 생산자
def producer(queue, event):
'''
네트워크 대기 상태라 가정(서버)
'''
while not event.is_set():
message = random.randint(1,101)
logging.info('Producer got message: {}'.format(message))
queue.put(message)
logging.info('Producer received event Exiting')
# 소비자
def consumer(queue, event):
'''
응답 받고 소비하는 것으로 가정 or DB 저장
'''
while not event.is_set() or not queue.empty():
message = queue.get()
logging.info(
'Consumer storing message: {}, (size={})'.format( message, queue.qsize())
)
logging.info('Consumer received enent Exiting')
if __name__=='__main__':
# Logging format 설정
format = "%(asctime)s: %(message)s"
logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S")
# 사이즈 중요
pipeline = queue.Queue(maxsize=100)
# 이벤트 플래그 초기 값 0
event = threading.Event()
# With context 시작
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
# 기존에는 메소드1개, 각 스레드에서 메소드를 코드 영역에서 공유함.
# 지금은 스레드(생산자), 스레드(소비자), 각각 실행, 통신은 큐를 이용해서 통신함.
# (self, fn, *args, **kwargs): 함수, 데이터공유(큐), 이벤트
executor.submit(producer, pipeline, event)
# (self, fn, *args, **kwargs): 함수, 데이터공유(큐), 이벤트
executor.submit(consumer, pipeline, event)
# 실행 시간 조정
time.sleep(0.1)
logging.info('Main : about to set event')
#프로그램 종료
event.set()
'Python > Advanced' 카테고리의 다른 글
[Python] Multiprocessing(2) - Naming, Parallel processing (0) | 2021.05.18 |
---|---|
[Python] Multiprocessing(1) - Join, is_alive (0) | 2021.05.17 |
[Python] Thread(4) - Lock, Unlock (0) | 2021.05.17 |
[Python] Thread(3) - ThreadPoolExecutor (0) | 2021.05.14 |
[Python] Thread(2) - DeamonThread (0) | 2021.05.14 |