티스토리 뷰
개요
async/await 문법을 사용하면 비동기 태스크가 이벤트 루프(event loop)에 등록되어 동작한다는 것은 알고 있지만, 정작 이벤트 루프 내부가 어떻게 동작하는지는 제대로 파악하지 못했습니다. Python의 이벤트 루프를 막연히 사용만 하고 있었는데, 이를 좀 더 깊이 파악해보고 싶어 코드를 직접 살펴보며 정리해보려고 합니다.
특히 다음과 같은 점들이 궁금했습니다.
- 이벤트 루프의 정의는 정확히 무엇일까?
- 이벤트 루프에서 등록된 태스크들은 어떻게 관리될까?
- 이벤트 루프는 어떻게 생성되고 관리되는 걸까?
- 네트워크 I/O 연산은 어떻게 처리되고, 처리 완료 후 이벤트 루프에서 어떻게 콜백이 실행될까?
위 궁금증에 답하기 위해, Python 내부 코드를 직접 따라가며 이벤트 루프가 어떻게 구현되어 있는지 살펴볼 예정입니다.
EventLoop란?
우선 ChatGPT에게 이벤트 루프의 개념을 물어보면, 대략 다음과 같은 설명이 나옵니다.
이 설명 자체는 맞지만, 실제 Python에서 이벤트 루프가 어떤 식으로 동작하고 있는지, 구체적으로 어떤 코드로 구현되어 있는지가 궁금해졌습니다. Python 공식 문서를 보면 이벤트 루프에 대해 다음과 같은 설명이 있습니다:
https://docs.python.org/ko/3/library/asyncio-eventloop.html
이벤트 루프는 비동기 태스크 및 콜백을 실행하고 네트워크 IO 연산을 수행하며 자식 프로세스를 실행합니다.
설명만 봐서는 이벤트 루프가 여러 기능을 제공하지만, 어떻게 구현되어 있는지에 대한 자세한 정보가 부족합니다. 그래서 코드를 보기 전에 설계 문서를 먼저 찾아보았고, 이 과정에서 PEP 3156을 발견했습니다. 이 문서를 읽으면 asyncio가 어떤 배경에서 탄생했고 어떤 식으로 설계가 진행되었는지를 파악하는 데 도움이 됩니다.
PEP 3156을 보면, Python asyncio 모듈이 크게 두 가지 역할을 한다고 정리해놓았습니다.
- 다양한 이벤트 루프 구현체들을 위한 표준 인터페이스 제공
- 그 표준 인터페이스를 만족하는 기본 이벤트 루프 구현체 제공
이 글에서는 asyncio 내부 코드를 간단히 살펴보면서, 실제 이벤트 루프가 어떤 식으로 동작하는지 알아보겠습니다.
asyncio/events.py
asyncio/events.py에서 눈에 띄는 주요 클래스는 다음과 같습니다.
- Handle
- TimerHandle
- AbstractEventLoop
- AbstractEventLoopPolicy
- BaseDefaultEventLoopPolicy
- AbstractServer
Handle과 TimerHandle
이벤트 루프에 콜백이 등록되면 반환되는 객체 중 하나가 Handle인데, 이는 콜백과 그 인자를 담는 래퍼 역할을 합니다. TimerHandle은 Handle을 상속받아, 예약된 시간(_when)에 실행될 콜백을 관리합니다.
class Handle:
def __init__(self, callback, args, loop, context=None):
if context is None:
context = contextvars.copy_context()
self._context = context
self._loop = loop
self._callback = callback
self._args = args
self._cancelled = False
...
Handle 클래스의 생성자를 보면, 어느 이벤트 루프에 등록되었는지(_loop), 콜백 함수와 인자(_callback, _args), 그리고 콜백이 취소되었는지 여부(_cancelled) 등을 저장하고 있습니다.
class TimerHandle(Handle):
def __init__(self, when, callback, args, loop, context=None):
super().__init__(callback, args, loop, context)
self._when = when
self._scheduled = False
...
TimerHandle은 실행 시간(_when)과 예약 여부(_scheduled)를 추가로 가지고 있습니다. 비교 연산(__eq__, __lt__ 등) 또한 _when을 기준으로 하도록 구현되어 있는데, 이는 이벤트 루프에서 TimerHandle을 힙(우선순위 큐)으로 관리하기 위해서입니다.
추상 이벤트 루프 인터페이스
이벤트 루프와 관련된 표준을 정의하는 중요한 인터페이스 클래스가 다음 세 가지입니다.
- AbstractEventLoop
- AbstractEventLoopPolicy
- AbstractServer
그리고 event loop와 관련된 표준을 정의하는 총 3개의 interface를 정의하고 있다.
AbstractEventLoop는 Python 이벤트 루프가 가져야 하는 메서드를 정의한 추상 클래스입니다. 주석을 살펴보면 이벤트 루프의 주요 기능인
- 이벤트 루프 실행 / 정지
- 콜백 스케줄링 및 실행
- 코루틴 스케줄링 (태스크 생성)
- 스레드와 상호 작용
- 네트워크 I/O 처리
- 파이프/서브프로세스 처리
- Signal 처리
- 에러 처리 및 디버그 기능
등이 여기에 추상 메서드 형태로 정의되어 있습니다.
AbstractEventLoopPolicy
AbstractEventLoopPolicy는 이벤트 루프를 생성하고 조회, 설정하는 방법을 정의하는 추상 클래스입니다. 자식 프로세스와의 상호작용 정책도 여기에서 다뤄집니다. 이를 상속받은 BaseDefaultEventLoopPolicy에서는 이벤트 루프를 만들기 위한 팩터리를 받아서, 메인 스레드용 이벤트 루프를 생성하는 기본 정책을 제공합니다. 대다수의 asyncio 이벤트 루프 정책(EventLoopPolicy)은 이 BaseDefaultEventLoopPolicy를 상속하고, 필요한 부분만 오버라이딩하여 사용합니다.
AbstractServer
AbstractServer는 AbstractEventLoop.create_server()가 반환하는 서버 객체를 위한 추상 인터페이스입니다. 서버 열기/닫기 등의 기능을 정의해두고 있으며, 이를 통해 이벤트 루프가 다양한 종류의 연결(커넥션)을 확장할 수 있습니다.
class diagram
asyncio/base_events.py
문서에서 두 번째로 볼 만한 곳은 base_events.py입니다. 모듈의 docstring을 보면, 이벤트 루프가 멀티플렉서(I/O 이벤트를 통지해주는 부분)와 이벤트 루프 자체(콜백 스케줄링 등)로 구분되며, 이 중 멀티플렉서를 감싼 이벤트 루프 기능이 base_events.py에 정의되어 있다고 합니다.
클래스 구성
- Server: AbstractServer를 상속받아 기본 서버 기능 구현
- BaseEventLoop: AbstractEventLoop를 상속받아 기본 이벤트 루프 동작을 구현
이벤트 루프가 태스크를 처리하는 방식
개인적으로 가장 궁금했던 점 중 하나는 이벤트 루프에 등록된 코루틴(태스크)들이 어떻게 스케줄링되어 처리되는지입니다. 보통 이벤트 루프 밖에서는 run_until_complete() 메서드를 통해 특정 Future가 완료될 때까지 이벤트 루프를 돌립니다.
이벤트 루프가 콜백을 처리하는 방식
개인적으로 가장 궁금했던 점 중 하나는 이벤트 루프에 등록된 코루틴(태스크)들이 어떻게 스케줄링되어 처리되는지입니다. 보통 이벤트 루프 밖에서는 run_until_complete() 메서드를 통해 특정 Future가 완료될 때까지 이벤트 루프를 돌립니다.
def run_until_complete(self, future):
...
try:
self.run_forever()
except:
...
raise
finally:
future.remove_done_callback(_run_until_complete_cb)
...
return future.result()
이때 내부적으로는 run_forever()가 호출되는데, 여기서 self._run_once()를 계속 반복 호출하며 이벤트 루프가 돌아갑니다.
def run_forever(self):
"""Run until stop() is called."""
try:
self._run_forever_setup()
while True:
self._run_once()
if self._stopping:
break
finally:
self._run_forever_cleanup()
결국 이벤트 루프의 한 사이클은 _run_once()에 정의되어 있으며, 이를 다음과 같은 5단계로 나눠서 살펴볼 수 있습니다.
- 취소된 작업 정리
- 다음 I/O 이벤트까지 얼마나 대기할지 결정
- I/O 이벤트 대기 및 처리
- 실행 시기가 된 예약 작업들을 _ready 큐로 이동
- _ready에 쌓인 콜백 실행
1. 취소된 작업 정리
이벤트 루프 내에는 현재 실행 가능한 콜백을 담는 _ready와 아직 실행 시점이 오지 않은 콜백을 힙 구조로 담는 _scheduled가 있습니다. 첫 단계에서는 _scheduled에서 취소된 콜백들을 제거합니다.
sched_count = len(self._scheduled)
if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
self._timer_cancelled_count / sched_count >
_MIN_CANCELLED_TIMER_HANDLES_FRACTION):
# Remove delayed calls that were cancelled if their number
# is too high
new_scheduled = []
for handle in self._scheduled:
if handle._cancelled:
handle._scheduled = False
else:
new_scheduled.append(handle)
heapq.heapify(new_scheduled)
self._scheduled = new_scheduled
self._timer_cancelled_count = 0
else:
# Remove delayed calls that were cancelled from head of queue.
while self._scheduled and self._scheduled[0]._cancelled:
self._timer_cancelled_count -= 1
handle = heapq.heappop(self._scheduled)
handle._scheduled = False
_scheduled 내부는 heap으로 저장하고 있어 size에 따라서 다르게 작동하고 있는데, 왜 나눠져있는지 궁금해서 분석해본 내용은 다음과 같습니다. 코드를 읽고 한번 고민해본 다음에 접은글을 읽으시는걸 추천드립니다.
scheduled task가 N, 연속적으로 취소된 task가 M, 취소된 task가 C 라고 하고 설명해보자면. 첫번째 로직 전체 순회(O(N))하여 새로운 scheduled 객체를 만들고 heap을 만들어(O(N)) 저장 두번째 로직 실행시간 기준 취소된 task인지 확인하면서 heappop을 하게 됩니다.(MO(logN)). 비교 O(n)과 mO(logn)의 비교인데. sched task의 수가 적은 경우에는 n과 logn의 차이가 적어서 첫번째 로직을 타는게 효율적이고, 연속적으로 취소된 task의 갯수가 적은 경우에는 두번째 로직을 타는게 효율적으로 보입니다. 다만 데이터 구조상 전체 취소된 숫자만 들고 있고, 연속적으로 취소된 숫자를 들고 있지 않으므로 FRACTION으로 비교하는 Heuristic으로 체크하는 것으로 보입니다.
2. 대기 시간 계산
실행해야 할 콜백이 없다면, 이벤트 루프는 쓸데없이 CPU를 점유하지 않도록 일정 시간 대기합니다. _scheduled에서 가장 이른 실행 시간이 있는 콜백을 확인해, 그 콜백까지 얼마나 기다려야 할지(timeout)를 계산해둡니다.
timeout = None
if self._ready or self._stopping: # 현재 실행가능한 콜백이 있거나, event loop가 취소되는 경우
timeout = 0
elif self._scheduled: # schedulng된 콜백이 있는 경우
# Compute the desired timeout.
timeout = self._scheduled[0]._when - self.time()
if timeout > MAXIMUM_SELECT_TIMEOUT:
timeout = MAXIMUM_SELECT_TIMEOUT
elif timeout < 0:
timeout = 0
3. I/O 이벤트 수신 및 처리
계산된 timeout을 사용해 I/O 멀티플렉서(Selector 등)에서 이벤트가 발생할 때까지 대기하고, 이벤트가 도착하면 _process_events() 메서드를 통해 처리합니다. base_events.py에서는 _process_events()가 추상화되어 있고, 실제 구현은 다른 파일에서 볼 수 있습니다.
event_list = self._selector.select(timeout)
self._process_events(event_list)
4. 실행 시기가 된 예약 콜백을 _ready로 이동
예약된 콜백 목록(_scheduled)에서 현재 시각이 된(혹은 지났지만 아직 실행이 안 된) 콜백들을 꺼내 _ready에 등록합니다.
end_time = self.time() + self._clock_resolution
while self._scheduled:
handle = self._scheduled[0]
if handle._when >= end_time:
break
handle = heapq.heappop(self._scheduled)
handle._scheduled = False
self._ready.append(handle)
5. _ready에 있는 콜백 실행
마지막으로 _ready에 쌓여 있는 콜백들을 FIFO 순서로 실행합니다. 이때 각 콜백 객체인 Handle 내부 handle._run()이 호출되어 실제 콜백 함수가 실행됩니다.
ntodo = len(self._ready)
for i in range(ntodo):
handle = self._ready.popleft()
if handle._cancelled:
continue
if self._debug:
try:
self._current_handle = handle
t0 = self.time()
handle._run()
dt = self.time() - t0
if dt >= self.slow_callback_duration:
logger.warning('Executing %s took %.3f seconds',
_format_handle(handle), dt)
finally:
self._current_handle = None
else:
handle._run()
handle = None # Needed to break cycles when an exception occurs.
Class Diagrams
asyncio/selector_events.py
이제 base_events.py에서 빠져 있던 _process_events() 구현을 찾아보면 selector_events.py를 만날 수 있습니다. 이 파일은 docstring에서 “Selector를 사용하는 이벤트 루프와 관련 클래스들”이라고 소개되고 있습니다. 여기에는 BaseSelectorEventLoop가 정의되어 있는데, 이 클래스가 BaseEventLoop를 상속받아 Selector를 통해 I/O 멀티플렉싱을 수행하도록 구현되어 있습니다.
python
코드 복사
def _process_events(self, event_list):
for key, mask in event_list:
fileobj, (reader, writer) = key.fileobj, key.data
if mask & selectors.EVENT_READ and reader is not None:
self._add_callback(reader)
if mask & selectors.EVENT_WRITE and writer is not None:
self._add_callback(writer)
_process_events()는 Selector가 반환한 이벤트를 보고, 읽기/쓰기 가능한 상태라면 해당 콜백(reader, writer)을 _ready에 등록(_add_callback())하는 방식으로 동작합니다.
asyncio/unix_events.py
유닉스 계열 시스템에서 사용되는 기본 이벤트 루프와 이벤트 루프 정책(_UnixDefaultEventLoopPolicy)이 정의되어 있습니다. 이때 이벤트 루프 클래스는 _UnixSelectorEventLoop이며, 유닉스의 signal 처리 로직 등을 포함합니다. 마지막에 다음과 같은 별칭이 지정되어 있어, unix_events를 다른 곳에서 불러올 때
python
코드 복사
SelectorEventLoop = _UnixSelectorEventLoop
DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy
EventLoop = SelectorEventLoop
와 같은 식으로 동일하게 사용할 수 있게 됩니다.
asyncio/__init__.py에서는 플랫폼별로 다음과 같이 구분해 임포트하고 있음을 알 수 있습니다.
python
코드 복사
if sys.platform == 'win32': # pragma: no cover
from .windows_events import *
else:
from .unix_events import *
그렇다면 Windows는?
Windows 환경에서는 unix_events.py 대신 windows_events.py가 사용됩니다. 내부적으로 base_events.py는 동일하게 공유하지만, I/O 멀티플렉싱 구현체로는 selector_events.py가 아닌 Proactor 기반의 proactor_events.py를 활용합니다.
Class Diagram
asyncio/runners.py
마지막으로 살펴볼 부분은 이벤트 루프와 실제 사용자 코드(async def 함수)를 연결해주는 시작점에 해당하는 runners.py입니다. 일반적으로 async 함수를 실행할 때 아래와 같이 작성합니다:
python
코드 복사
import asyncio
async def main():
...
if __name__ == "__main__":
asyncio.run(main())
asyncio.run(main())의 내부 구현을 보면, 다음과 같이 Runner라는 컨텍스트 매니저를 사용해 이벤트 루프를 생성하고 실행합니다.
python
코드 복사
def run(main, *, debug=None, loop_factory=None):
if events._get_running_loop() is not None:
raise RuntimeError(
"asyncio.run() cannot be called from a running event loop")
with Runner(debug=debug, loop_factory=loop_factory) as runner:
return runner.run(main)
자문 자답
개요에서 남겼던 의문에 직접 답변하면서 이번글을 마무리하려고 합니다.
- 이벤트 루프의 정의가 뭔지?Python 공식 문서를 보면 이러한 설명이 다소 추상적으로 느껴지지만, 실제 코드를 보면 이벤트 루프가 콜백을 관리할 자료구조(_ready, _scheduled)와 I/O 멀티플렉서를 제어하는 로직을 내부에 포함하고 있다는 점을 이해할 수 있습니다.
- 이벤트 루프는 비동기 태스크 및 콜백을 실행하고, 네트워크 I/O 연산을 수행하며, 자식 프로세스를 실행하는 핵심 구조입니다.
- 이벤트 루프는 누가 만들고, 어떻게 실행되는가?
- asyncio.run() 같은 함수를 호출하면, 내부적으로 Runner가 EventLoopPolicy를 통해 이벤트 루프를 생성하고 실행합니다.
- 비동기 태스크 및 콜백은 어떻게 이벤트 루프에 등록되는가?더 자세히 알고 싶다면 PEP 492, PEP 3156 등을 참고하는 것을 권장합니다.
- 이 부분은 아직 자세히 다루지 않았습니다. asyncio.create_task(coro), loop.call_soon(callback) 등 다양한 방법이 있고, Task나 Future 등과 밀접하게 연결됩니다.
- 네트워크 I/O 연산은 어떻게 수행되고, 처리 완료 후 이벤트 루프가 이를 어떻게 받는가?
- 커널에 I/O 요청을 등록해두고, 이벤트 루프가 selector(또는 Proactor)를 통해 “이벤트가 준비되면 알림”을 받습니다. 이 알림을 _process_events()가 처리하여 등록해둔 콜백 또는 Future가 완료될 수 있도록 콜백을 _ready에 넣어 실행하게 됩니다.
'개발' 카테고리의 다른 글
머신러닝을 현실에 적용하는 법: MLSE 커리어 이야기 (0) | 2024.11.24 |
---|---|
나만의 python 패키지 만들기 - (1) 우리는 python에서 어떻게 다른 사람의 코드를 사용할 수 있을까 (0) | 2023.09.20 |
- Total
- Today
- Yesterday
- word ladder 2
- n queens 2
- text justification
- Python
- leetcode 매일 풀기
- maximum rectangle
- wildcard matching
- mlse
- scramble string
- best time to buy and sell stock 3
- hard mode challenge
- sudoku solver
- 가상면접 사례로 배우는 대규모 시스템 설계
- binary tree maximum path sum
- palindrome partitioning 2
- first missing positive
- valid number
- permutation sequence
- 알고리즘
- 글또 10기
- 회고
- substring with concatenation of all words
- slay the spire에 진심인편
- otel
- 개발자 글쓰기
- leetcode 매일풀기
- datalakehouse
- leetcode
- longest valid parentheses
- distinct subsequences
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | 6 | 7 |
8 | 9 | 10 | 11 | 12 | 13 | 14 |
15 | 16 | 17 | 18 | 19 | 20 | 21 |
22 | 23 | 24 | 25 | 26 | 27 | 28 |
29 | 30 |