본문 바로가기

Python/Intermediate

[Python] 병행성(Concurrency) - Futures(2)

  • 2가지 패턴 실습

concurrent.futures - wait, as_completed

 

import os
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, wait, as_completed

WORK_LIST = [100000000, 10000000, 1000000, 100000]

# 동시성 합계 계산 메인 함수
# 누적 합계 함수(제레네이터)
def sum_generator(n):
    return sum(n for n in range(1, n+1))

# wait - 모든 작업이 끝날때까지 기다림
# as_completed - 먼저 끝난 작업결과를 반환함
def main():
    # Worker Count
    worker = min(10, len(WORK_LIST))
    
    # 시작 시간
    start_tm = time.time()
    # Futures, 작업을 넣을 리스트 만들기
    futures_list = []

    # 결과 건수
    # ProcessPoolExecutor/ThreadPoolExecutor
    with ThreadPoolExecutor() as excutor:
        for work in WORK_LIST:
            # future 반환
            future = excutor.submit(sum_generator, work)
            # 스케쥴링
            futures_list.append(future)
            # 스케쥴링 확인
            print('**Scheduled for {} : {}'.format(work, future))
            # print()
        
        print('type(future): ',type(future))

		# wait 결과 출력 ----------------------------------------
        result = wait(futures_list, timeout=7)
        # 성공
        print('CCC Completed Tasks : ' + str(result.done))
        # 실패
        print('PPP Pending ones after waiting for 7seconds : ' + str(result.not_done))
        # 결과 값 출력
        print([future.result() for future in result.done])
        # wait 결과 출력 ----------------------------------------
            
    # 종료 시간
    end_tm = time.time() - start_tm
    # 출력 포멧
    msg = '\n Time : {:.2f}s'
    # 최종 결과 출력
    print(msg.format(end_tm))



# 실행
if __name__ == '__main__':
    main()

--------------------------------------------[result]

**Scheduled for 100000000 : <Future at 0x209534e8390 state=running>
**Scheduled for 10000000 : <Future at 0x2095376b908 state=running>
**Scheduled for 1000000 : <Future at 0x20954f64d68 state=running>
**Scheduled for 100000 : <Future at 0x20954f715f8 state=finished returned int>
type(future):  < class 'concurrent.futures._base.Future' >
CCC Completed Tasks : {<Future at 0x2095376b908 state=finished returned int>, <Future at 0x209534e8390 state=finished returned int>, <Future at 0x20954f64d68 state=finished returned int>, <Future at 0x20954f715f8 state=finished returned int>}
PPP Pending ones after waiting for 7seconds : set()
[50000005000000, 5000000050000000, 500000500000, 5000050000]

 Time : 6.87s

import os
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, wait, as_completed

# WORK_LIST = [100000, 1000000, 10000000, 100000000]
WORK_LIST = [100000000, 10000000, 1000000, 100000]

# 동시성 합계 계산 메인 함수
# 누적 합계 함수(제레네이터)
def sum_generator(n):
    return sum(n for n in range(1, n+1))

# wait - 모든 작업이 끝날때까지 기다림
# as_completed - 먼저 끝난 작업결과를 반환함
def main():
    # Worker Count
    worker = min(10, len(WORK_LIST))
    
    # 시작 시간
    start_tm = time.time()
    # Futures, 작업을 넣을 리스트 만들기
    futures_list = []

    # 결과 건수
    # ProcessPoolExecutor/ThreadPoolExecutor
    with ThreadPoolExecutor() as excutor:
        for work in WORK_LIST:
            # future 반환
            future = excutor.submit(sum_generator, work)
            # 스케쥴링
            futures_list.append(future)
            # 스케쥴링 확인
            print('**Scheduled for {} : {}'.format(work, future))
            # print()
        
        print('type(future): ',type(future))

        # as_completed 결과 출력 ----------------------------------------
        for future in as_completed(futures_list):
            result = future.result()
            done = future.done()
            cancelled = future.cancelled
            
            # future 결과 확인
            print('Future Result : {}, Done : {}'.format(result, done))
            print('Future Cancelled : {}'.format(cancelled))
        # as_completed 결과 출력 ----------------------------------------
        
        
            
    # 종료 시간
    end_tm = time.time() - start_tm
    # 출력 포멧
    msg = '\n Time : {:.2f}s'
    # 최종 결과 출력
    print(msg.format(end_tm))



# 실행
if __name__ == '__main__':
    main()

--------------------------------------------[result]

**Scheduled for 100000000 : <Future at 0x269a63b82e8 state=running>
**Scheduled for 10000000 : <Future at 0x269a662b8d0 state=running>
**Scheduled for 1000000 : <Future at 0x269a7e24cf8 state=running>
**Scheduled for 100000 : <Future at 0x269a7e24dd8 state=finished returned int>
type(future):  < class 'concurrent.futures._base.Future' >
Future Result : 500000500000, Done : True
Future Cancelled : <bound method Future.cancelled of <Future at 0x269a7e24cf8 state=finished returned int>>
Future Result : 5000050000, Done : True
Future Cancelled : <bound method Future.cancelled of <Future at 0x269a7e24dd8 state=finished returned int>>
Future Result : 50000005000000, Done : True
Future Cancelled : <bound method Future.cancelled of <Future at 0x269a662b8d0 state=finished returned int>>
Future Result : 5000000050000000, Done : True
Future Cancelled : <bound method Future.cancelled of <Future at 0x269a63b82e8 state=finished returned int>>

 Time : 7.02s