Image by Author
# Introduction
Most Python applications spend significant time waiting on APIs, databases, file systems, and network services. Async programming allows a program to pause while waiting for I/O operations and continue executing other tasks instead of blocking.
In this tutorial, you will learn the fundamentals of async programming in Python using clear code examples. We will compare synchronous and asynchronous execution, explain how the event loop works, and apply async patterns to real-world scenarios such as concurrent API requests and background tasks.
By the end of this guide, you will understand when async programming is useful, how to use async and await correctly, and how to write scalable and reliable async Python code.
# Defining Async Programming in Python
Async programming allows a program to pause execution while waiting for an operation to complete and continue executing other tasks in the meantime.
Core building blocks include:
async deffor defining coroutinesawaitfor non-blocking waits- The event loop for task scheduling
Note: Async programming improves throughput, not raw computation speed.
# Understanding the Async Event Loop in Python
The event loop is responsible for managing and executing asynchronous tasks.
Key responsibilities include:
- Tracking paused and ready tasks
- Switching execution when tasks await I/O
- Coordinating concurrency without threads
Python uses the asyncio library as its standard async runtime.
# Comparing Sequential vs. Async Execution in Python
This section demonstrates how blocking sequential code compares to asynchronous concurrent execution and how async reduces total waiting time for I/O-bound tasks.
// Examining a Sequential Blocking Example
Sequential execution runs tasks one after another. If a task performs a blocking operation, the entire program waits until that operation completes. This approach is simple but inefficient for I/O-bound workloads where waiting dominates execution time.
This function simulates a blocking task. The call to time.sleep pauses the entire program for the specified number of seconds.
import time
def download_file(name, seconds):
print(f"Starting {name}")
time.sleep(seconds)
print(f"Finished {name}")
The timer starts before the function calls and stops after all three calls complete. Each function runs only after the previous one finishes.
start = time.perf_counter()
download_file("file-1", 2)
download_file("file-2", 2)
download_file("file-3", 2)
end = time.perf_counter()
print(f"[TOTAL SYNC] took {end - start:.4f} seconds")
Output:
file-1starts and blocks the program for two secondsfile-2starts only afterfile-1finishesfile-3starts only afterfile-2finishes
Total runtime is the sum of all delays, approximately six seconds.
Starting file-1
Finished file-1
Starting file-2
Finished file-2
Starting file-3
Finished file-3
[TOTAL SYNC] took 6.0009 seconds
// Examining an Asynchronous Concurrent Example
Asynchronous execution allows tasks to run concurrently. When a task reaches an awaited I/O operation, it pauses and allows other tasks to continue. This overlapping of waiting time significantly improves throughput.
This async function defines a coroutine. The await asyncio.sleep call pauses only the current task, not the entire program.
import asyncio
import time
async def download_file(name, seconds):
print(f"Starting {name}")
await asyncio.sleep(seconds)
print(f"Finished {name}")
asyncio.gather schedules all three coroutines to run concurrently on the event loop.
async def main():
start = time.perf_counter()
await asyncio.gather(
download_file("file-1", 2),
download_file("file-2", 2),
download_file("file-3", 2),
)
end = time.perf_counter()
print(f"[TOTAL ASYNC] took {end - start:.4f} seconds")
This starts the event loop and executes the async program.
Output:
- All three tasks start almost at the same time
- Each task waits independently for two seconds
- While one task is waiting, others continue executing
- Total runtime is close to the longest single delay, approximately two seconds
Starting file-1
Starting file-2
Starting file-3
Finished file-1
Finished file-2
Finished file-3
[TOTAL ASYNC] took 2.0005 seconds
# Exploring How Await Works in Python Async Code
The await keyword tells Python that a coroutine may pause and allow other tasks to run.
Incorrect usage:
async def task():
asyncio.sleep(1)
Correct usage:
async def task():
await asyncio.sleep(1)
Failing to use await prevents concurrency and may cause runtime warnings.
# Running Multiple Async Tasks Using asyncio.gather
asyncio.gather allows multiple coroutines to run concurrently and collects their results once all tasks have completed. It is commonly used when several independent async operations can be executed in parallel.
The job coroutine simulates an asynchronous task. It prints a start message, waits for one second using a non-blocking sleep, then prints a finish message and returns a result.
import asyncio
import time
async def job(job_id, delay=1):
print(f"Job {job_id} started")
await asyncio.sleep(delay)
print(f"Job {job_id} finished")
return f"Completed job {job_id}"
asyncio.gather schedules all three jobs to run concurrently on the event loop. Each job begins execution immediately until it reaches an awaited operation.
async def main():
start = time.perf_counter()
results = await asyncio.gather(
job(1),
job(2),
job(3),
)
end = time.perf_counter()
print("\nResults:", results)
print(f"[TOTAL WALL TIME] {end - start:.4f} seconds")
asyncio.run(main())
Output:
- All three jobs start almost at the same time
- Each job waits independently for one second
- While one job is waiting, others continue running
- The results are returned in the same order the tasks were passed to
asyncio.gather - Total execution time is close to one second, not three
Job 1 started
Job 2 started
Job 3 started
Job 1 finished
Job 2 finished
Job 3 finished
Results: ['Completed job 1', 'Completed job 2', 'Completed job 3']
[TOTAL WALL TIME] 1.0013 seconds
This pattern is foundational for concurrent network requests, database queries, and other I/O-bound operations.
# Making Concurrent HTTP Requests
Async HTTP requests are a common real-world use case where async programming provides immediate benefits. When multiple APIs are called sequentially, total execution time becomes the sum of all response delays. Async allows these requests to run concurrently.
This list contains three URLs that intentionally delay their responses by one, two, and three seconds.
import asyncio
import time
import urllib.request
import json
URLS = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/delay/3",
]
This function performs a blocking HTTP request using the standard library. It cannot be awaited directly.
def fetch_sync(url):
"""Blocking HTTP request using standard library"""
with urllib.request.urlopen(url) as response:
return json.loads(response.read().decode())
The fetch coroutine measures execution time and logs when a request starts. The blocking HTTP request is offloaded to a background thread using asyncio.to_thread. This prevents the event loop from blocking.
async def fetch(url):
start = time.perf_counter()
print(f"Fetching {url}")
# Run blocking IO in a thread
data = await asyncio.to_thread(fetch_sync, url)
elapsed = time.perf_counter() - start
print(f"Finished {url} in {elapsed:.2f} seconds")
return data
All requests are scheduled concurrently using asyncio.gather.
async def main():
start = time.perf_counter()
results = await asyncio.gather(
*(fetch(url) for url in URLS)
)
total = time.perf_counter() - start
print(f"\nFetched {len(results)} responses")
print(f"[TOTAL WALL TIME] {total:.2f} seconds")
asyncio.run(main())
Output:
- All three HTTP requests start almost immediately
- Each request completes after its own delay
- The longest request determines the total wall time
- Total runtime is approximately three and a half seconds, not the sum of all delays
Fetching https://httpbin.org/delay/1
Fetching https://httpbin.org/delay/2
Fetching https://httpbin.org/delay/3
Finished https://httpbin.org/delay/1 in 1.26 seconds
Finished https://httpbin.org/delay/2 in 2.20 seconds
Finished https://httpbin.org/delay/3 in 3.52 seconds
Fetched 3 responses
[TOTAL WALL TIME] 3.52 seconds
This approach significantly improves performance when calling multiple APIs and is a common pattern in modern async Python services.
# Implementing Error Handling Patterns in Async Python Applications
Robust async applications must handle failures gracefully. In concurrent systems, a single failing task should not cause the entire workflow to fail. Proper error handling ensures that successful tasks complete while failures are reported cleanly.
This list includes two successful endpoints and one endpoint that returns an HTTP 404 error.
import asyncio
import urllib.request
import json
import socket
URLS = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/status/404",
]
This function performs a blocking HTTP request with a timeout. It may raise exceptions such as timeouts or HTTP errors.
def fetch_sync(url, timeout):
with urllib.request.urlopen(url, timeout=timeout) as response:
return json.loads(response.read().decode())
This function wraps a blocking HTTP request in a safe asynchronous interface. The blocking operation is executed in a background thread using asyncio.to_thread, which prevents the event loop from stalling while the request is in progress.
Common failure cases such as timeouts and HTTP errors are caught and converted into structured responses. This ensures that errors are handled predictably and that a single failing request does not interrupt the execution of other concurrent tasks.
async def safe_fetch(url, timeout=5):
try:
return await asyncio.to_thread(fetch_sync, url, timeout)
except socket.timeout:
return {"url": url, "error": "timeout"}
except urllib.error.HTTPError as e:
return {"url": url, "error": "http_error", "status": e.code}
except Exception as e:
return {"url": url, "error": "unexpected_error", "message": str(e)}
All requests are executed concurrently using asyncio.gather.
async def main():
results = await asyncio.gather(
*(safe_fetch(url) for url in URLS)
)
for result in results:
print(result)
asyncio.run(main())
Output:
- The first two requests complete successfully and return parsed JSON data
- The third request returns a structured error instead of raising an exception
- All results are returned together without interrupting the workflow
{'args': {}, 'data': '', 'files': {}, 'form': {}, 'headers': {'Accept-Encoding': 'identity', 'Host': 'httpbin.org', 'User-Agent': 'Python-urllib/3.11', 'X-Amzn-Trace-Id': 'Root=1-6966269f-1cd7fc7821bc6bc469e9ba64'}, 'origin': '3.85.143.193', 'url': 'https://httpbin.org/delay/1'}
{'args': {}, 'data': '', 'files': {}, 'form': {}, 'headers': {'Accept-Encoding': 'identity', 'Host': 'httpbin.org', 'User-Agent': 'Python-urllib/3.11', 'X-Amzn-Trace-Id': 'Root=1-6966269f-5f59c151487be7094b2b0b3c'}, 'origin': '3.85.143.193', 'url': 'https://httpbin.org/delay/2'}
{'url': 'https://httpbin.org/status/404', 'error': 'http_error', 'status': 404}
This pattern ensures that a single failing request does not break the entire async operation and is essential for production-ready async applications.
# Using Async Programming in Jupyter Notebooks
Jupyter notebooks already run an active event loop. Because of this, asyncio.run cannot be used inside a notebook cell, as it attempts to start a new event loop while one is already running.
This async function simulates a simple non-blocking task using asyncio.sleep.
import asyncio
async def main():
await asyncio.sleep(1)
print("Async task completed")
Incorrect usage in notebooks:
Correct usage in notebooks:
Understanding this distinction ensures async code runs correctly in Jupyter notebooks and prevents common runtime errors when experimenting with asynchronous Python.
# Controlling Concurrency with Async Semaphores
External APIs and services often enforce rate limits, which makes it unsafe to run too many requests at the same time. Async semaphores allow you to control how many tasks execute concurrently while still benefiting from asynchronous execution.
The semaphore is initialized with a limit of two, meaning only two tasks can enter the protected section at the same time.
import asyncio
import time
semaphore = asyncio.Semaphore(2) # allow only 2 tasks at a time
The task function represents an asynchronous unit of work. Each task must acquire the semaphore before executing, and if the limit has been reached, it waits until a slot becomes available.
Once inside the semaphore, the task records its start time, prints a start message, and awaits a two-second non-blocking sleep to simulate an I/O-bound operation. After the sleep completes, the task calculates its execution time, prints a completion message, and releases the semaphore.
async def task(task_id):
async with semaphore:
start = time.perf_counter()
print(f"Task {task_id} started")
await asyncio.sleep(2)
elapsed = time.perf_counter() - start
print(f"Task {task_id} finished in {elapsed:.2f} seconds")
The main function schedules four tasks to run concurrently using asyncio.gather, but the semaphore ensures that they execute in two waves of two tasks.
Finally, asyncio.run starts the event loop and runs the program, resulting in a total execution time of approximately four seconds.
async def main():
start = time.perf_counter()
await asyncio.gather(
task(1),
task(2),
task(3),
task(4),
)
total = time.perf_counter() - start
print(f"\n[TOTAL WALL TIME] {total:.2f} seconds")
asyncio.run(main())
Output:
- Tasks 1 and 2 start first due to the semaphore limit
- Tasks 3 and 4 wait until a slot becomes available
- Tasks execute in two waves, each lasting two seconds
- Total wall time is approximately four seconds
Task 1 started
Task 2 started
Task 1 finished in 2.00 seconds
Task 2 finished in 2.00 seconds
Task 3 started
Task 4 started
Task 3 finished in 2.00 seconds
Task 4 finished in 2.00 seconds
[TOTAL WALL TIME] 4.00 seconds
Semaphores provide an effective way to enforce concurrency limits and protect system stability in production async applications.
# Concluding Remarks
Async programming is not a universal solution. It is not suitable for CPU-intensive workloads such as machine learning training, image processing, or numerical simulations. Its strength lies in handling I/O-bound operations where waiting time dominates execution.
When used correctly, async programming improves throughput by allowing tasks to make progress while others are waiting. Proper use of await is essential for concurrency, and async patterns are especially effective in API-driven and service-based systems.
In production environments, controlling concurrency and handling failures explicitly are critical to building reliable and scalable async Python applications.
Abid Ali Awan (@1abidaliawan) is a certified data scientist professional who loves building machine learning models. Currently, he is focusing on content creation and writing technical blogs on machine learning and data science technologies. Abid holds a Master’s degree in technology management and a bachelor’s degree in telecommunication engineering. His vision is to build an AI product using a graph neural network for students struggling with mental illness.
