### Pool Initialization and Setup Callbacks Source: https://context7.com/magicstack/asyncpg/llms.txt Shows how to configure connection pools with initialization callbacks for connection setup and setup callbacks for connection acquisition. Useful for setting custom type codecs or session parameters. ```python import asyncio import asyncpg import json async def init_connection(conn): """Called once when a connection is created.""" # Set up custom type codecs await conn.set_type_codec( 'json', encoder=json.dumps, decoder=json.loads, schema='pg_catalog' ) # Set session parameters await conn.execute("SET timezone TO 'UTC'") async def setup_connection(conn): """Called every time a connection is acquired.""" # Set up notifications listener await conn.add_listener('my_channel', lambda *args: print(args)) async def main(): pool = await asyncpg.create_pool( user='postgres', database='mydb', init=init_connection, # Called on connection creation setup=setup_connection, # Called on each acquire ) async with pool.acquire() as conn: # JSON codec is already set up data = await conn.fetchval("SELECT $1::json", {'key': 'value'}) print(data) # {'key': 'value'} await pool.close() asyncio.run(main()) ``` -------------------------------- ### Install asyncpg with pip Source: https://github.com/magicstack/asyncpg/blob/master/docs/installation.md Use this command for standard installation without GSSAPI/SSPI authentication. ```bash pip install asyncpg ``` -------------------------------- ### Install asyncpg from source Source: https://github.com/magicstack/asyncpg/blob/master/docs/installation.md Install asyncpg from a Git checkout. Ensure you have cloned the repository with submodules and have the necessary build tools and CPython header files. ```bash pip install -e . ``` -------------------------------- ### Install asyncpg Source: https://github.com/magicstack/asyncpg/blob/master/README.rst Install asyncpg using pip. For GSSAPI/SSPI authentication, use the optional extra. ```bash pip install asyncpg ``` ```bash pip install 'asyncpg[gssauth]' ``` -------------------------------- ### Install asyncpg with GSSAPI/SSPI authentication Source: https://github.com/magicstack/asyncpg/blob/master/docs/installation.md Install asyncpg with support for GSSAPI/SSPI authentication. This command installs SSPI on Windows and GSSAPI on other platforms. ```bash pip install 'asyncpg[gssauth]' ``` -------------------------------- ### Run asyncpg tests Source: https://github.com/magicstack/asyncpg/blob/master/docs/installation.md Execute the asyncpg test suite. This requires a PostgreSQL installation. ```bash python setup.py test ``` -------------------------------- ### Install asyncpg from source with debug build Source: https://github.com/magicstack/asyncpg/blob/master/docs/installation.md Create a debug build of asyncpg from a Git checkout by setting the ASYNCPG_DEBUG environment variable. This build includes more runtime checks. ```bash env ASYNCPG_DEBUG=1 pip install -e . ``` -------------------------------- ### Web Service with Asyncpg Connection Pool Source: https://github.com/magicstack/asyncpg/blob/master/docs/usage.md This example shows a complete web service using aiohttp and asyncpg to handle requests, acquire database connections from a pool, and execute queries. ```python import asyncio import asyncpg from aiohttp import web async def handle(request): """Handle incoming requests.""" pool = request.app['pool'] power = int(request.match_info.get('power', 10)) # Take a connection from the pool. async with pool.acquire() as connection: # Open a transaction. async with connection.transaction(): # Run the query passing the request argument. result = await connection.fetchval('select 2 ^ $1', power) return web.Response( text="2 ^ {} is {}".format(power, result)) async def init_db(app): """Initialize a connection pool.""" app['pool'] = await asyncpg.create_pool(database='postgres', user='postgres') yield await app['pool'].close() def init_app(): """Initialize the application server.""" app = web.Application() # Create a database context app.cleanup_ctx.append(init_db) # Configure service routes app.router.add_route('GET', '/{power:\d+}', handle) app.router.add_route('GET', '/', handle) return app app = init_app() web.run_app(app) ``` -------------------------------- ### Manual Transaction Control Source: https://github.com/magicstack/asyncpg/blob/master/docs/api/index.md Control transactions manually using `start()`, `commit()`, and `rollback()` methods. Ensure to handle exceptions for proper rollback. ```python tr = connection.transaction() await tr.start() try: ... except: await tr.rollback() raise else: await tr.commit() ``` -------------------------------- ### Use Nested Transactions Source: https://github.com/magicstack/asyncpg/blob/master/docs/api/index.md asyncpg supports nested transactions, which create savepoints. The example demonstrates a nested transaction that is automatically rolled back upon exception. ```python async with connection.transaction(): await connection.execute('CREATE TABLE mytab (a int)') try: # Create a nested transaction: async with connection.transaction(): await connection.execute('INSERT INTO mytab (a) VALUES (1), (2)') # This nested transaction will be automatically rolled back: raise Exception except: # Ignore exception pass # Because the nested transaction was rolled back, there # will be nothing in `mytab`. assert await connection.fetch('SELECT a FROM mytab') == [] ``` -------------------------------- ### Configure Custom Type Conversion for Complex Numbers in asyncpg Source: https://github.com/magicstack/asyncpg/blob/master/docs/usage.md This example demonstrates how to encode and decode Python `complex` numbers to a custom PostgreSQL composite type named `mycomplex`. It requires creating the composite type in PostgreSQL and then setting the type codec with tuple formatting. ```python import asyncio import asyncpg async def main(): conn = await asyncpg.connect() try: await conn.execute( ''' CREATE TYPE mycomplex AS ( r float, i float ); ''' ) await conn.set_type_codec( 'complex', encoder=lambda x: (x.real, x.imag), decoder=lambda t: complex(t[0], t[1]), format='tuple', ) res = await conn.fetchval('SELECT $1::mycomplex', (1+2j)) finally: await conn.close() asyncio.run(main()) ``` -------------------------------- ### Using the hstore Extension with asyncpg Source: https://context7.com/magicstack/asyncpg/llms.txt Demonstrates how to use PostgreSQL's hstore extension with asyncpg, including enabling the extension, registering the codec, inserting hstore data, and querying it. ```python import asyncio import asyncpg async def main(): conn = await asyncpg.connect(user='postgres', database='test') # Enable hstore extension await conn.execute('CREATE EXTENSION IF NOT EXISTS hstore') # Register the hstore codec await conn.set_builtin_type_codec( 'hstore', codec_name='pg_contrib.hstore' ) # Insert hstore data await conn.execute(''' CREATE TABLE IF NOT EXISTS products( id serial PRIMARY KEY, attributes hstore ) ''') await conn.execute( 'INSERT INTO products(attributes) VALUES($1)', {'color': 'red', 'size': 'large', 'weight': '500g'} ) # Query hstore data - returns as Python dict result = await conn.fetchval( "SELECT attributes FROM products WHERE attributes->'color' = 'red'" ) print(result) # {'color': 'red', 'size': 'large', 'weight': '500g'} await conn.close() asyncio.run(main()) ``` -------------------------------- ### Connect and Interact with PostgreSQL using asyncpg Source: https://github.com/magicstack/asyncpg/blob/master/docs/usage.md Demonstrates establishing a connection, creating a table, inserting data, and fetching a record. Uses native PostgreSQL syntax for query arguments ($n). ```python import asyncio import asyncpg import datetime async def main(): # Establish a connection to an existing database named "test" # as a "postgres" user. conn = await asyncpg.connect('postgresql://postgres@localhost/test') # Execute a statement to create a new table. await conn.execute(''' CREATE TABLE users( id serial PRIMARY KEY, name text, dob date ) ''') # Insert a record into the created table. await conn.execute(''' INSERT INTO users(name, dob) VALUES($1, $2) ''', 'Bob', datetime.date(1984, 3, 1)) # Select a row from the table. row = await conn.fetchrow( 'SELECT * FROM users WHERE name = $1', 'Bob') # *row* now contains # asyncpg.Record(id=1, name='Bob', dob=datetime.date(1984, 3, 1)) # Close the connection. await conn.close() asyncio.run(main()) ``` -------------------------------- ### Basic asyncpg Connection and Query Source: https://github.com/magicstack/asyncpg/blob/master/README.rst Connect to a PostgreSQL database, execute a query with a parameter, and close the connection. Requires Python 3.9+ and PostgreSQL 9.5+. ```python import asyncio import asyncpg async def run(): conn = await asyncpg.connect(user='user', password='password', database='database', host='127.0.0.1') values = await conn.fetch( 'SELECT * FROM mytable WHERE id = $1', 10, ) await conn.close() asyncio.run(run()) ``` -------------------------------- ### Establish SSL/TLS Connections Source: https://context7.com/magicstack/asyncpg/llms.txt Connect to PostgreSQL using SSL/TLS with different security configurations. For production, always use certificate verification ('require', 'verify-ca', or 'verify-full'). ```python import asyncio import asyncpg import ssl async def main(): # Simple SSL connection (verify server certificate) conn = await asyncpg.connect( user='postgres', database='mydb', host='db.example.com', ssl='require' # or 'verify-ca', 'verify-full' ) await conn.close() # Custom SSL context for full control sslctx = ssl.create_default_context( ssl.Purpose.SERVER_AUTH, cafile='/path/to/ca-certificate.pem' ) sslctx.check_hostname = True # verify-full behavior sslctx.load_cert_chain( '/path/to/client-cert.pem', keyfile='/path/to/client-key.pem' ) conn = await asyncpg.connect( user='postgres', database='mydb', host='db.example.com', ssl=sslctx ) await conn.close() # Skip certificate verification (not recommended for production) sslctx = ssl.create_default_context(ssl.Purpose.SERVER_AUTH) sslctx.check_hostname = False sslctx.verify_mode = ssl.CERT_NONE conn = await asyncpg.connect( user='postgres', database='mydb', ssl=sslctx ) await conn.close() asyncio.run(main()) ``` -------------------------------- ### Access Connection Settings Source: https://github.com/magicstack/asyncpg/blob/master/docs/api/index.md Shows how to retrieve connection settings, specifically the client_encoding, from a ConnectionSettings object. ```python >>> connection.get_settings().client_encoding 'UTF8' ``` -------------------------------- ### Use Pool with Async Context Manager Source: https://context7.com/magicstack/asyncpg/llms.txt Demonstrates using asyncpg's connection pool with an async context manager for automatic cleanup. Ensures the pool is closed properly after use. ```python import asyncio import asyncpg async def main_with_context(): async with asyncpg.create_pool(user='postgres', database='mydb') as pool: async with pool.acquire() as conn: await conn.execute('SELECT 1') # Pool closed automatically asyncio.run(main_with_context()) ``` -------------------------------- ### Cursor from Prepared Statement Source: https://github.com/magicstack/asyncpg/blob/master/docs/api/index.md Create a cursor from a prepared statement, passing arguments to the statement. This combines the performance benefits of prepared statements with cursor iteration. ```python async def iterate(con: Connection): # Create a prepared statement that will accept one argument stmt = await con.prepare('SELECT generate_series(0, $1)') async with con.transaction(): # Postgres requires non-scrollable cursors to be created # and used in a transaction. # Execute the prepared statement passing `10` as the # argument -- that will generate a series or records # from 0..10. Iterate over all of them and print every # record. async for record in stmt.cursor(10): print(record) ``` -------------------------------- ### Establish asyncpg Connection Source: https://context7.com/magicstack/asyncpg/llms.txt Connect to a PostgreSQL database using individual parameters or a DSN string. Supports statement caching, timeouts, and SSL/TLS encryption. Always close the connection when done. ```python import asyncio import asyncpg async def main(): # Connect using individual parameters conn = await asyncpg.connect( user='postgres', password='secret', database='mydb', host='127.0.0.1', port=5432, command_timeout=60, statement_cache_size=100 ) # Or connect using a DSN conn = await asyncpg.connect( 'postgresql://postgres:secret@localhost:5432/mydb' ) # Check server version print(conn.get_server_version()) # ServerVersion(major=15, minor=2, micro=0, releaselevel='final', serial=0) # Get server PID print(conn.get_server_pid()) # 12345 # Always close the connection when done await conn.close() asyncio.run(main()) ``` -------------------------------- ### Fetch Row and Access Record Fields Source: https://github.com/magicstack/asyncpg/blob/master/docs/api/index.md Demonstrates fetching a single row using fetchrow and accessing its fields by name or index. Also shows converting a Record to a dictionary or tuple. ```python >>> import asyncpg >>> import asyncio >>> loop = asyncio.get_event_loop() >>> conn = loop.run_until_complete(asyncpg.connect()) >>> r = loop.run_until_complete(conn.fetchrow(''' ... SELECT oid, rolname, rolsuper FROM pg_roles WHERE rolname = user''')) >>> r >>> r['oid'] 16388 >>> r[0] 16388 >>> dict(r) {'oid': 16388, 'rolname': 'elvis', 'rolsuper': True} >>> tuple(r) (16388, 'elvis', True) ``` -------------------------------- ### Fetch Multiple Rows with fetch() Source: https://context7.com/magicstack/asyncpg/llms.txt Execute a query and retrieve all results as a list of Record objects. Records support both index and key-based access, and can be converted to dictionaries. ```python import asyncio import asyncpg async def main(): conn = await asyncpg.connect(user='postgres', database='test') # Fetch all rows matching the query rows = await conn.fetch( 'SELECT id, name, email FROM users WHERE name LIKE $1', 'A%' ) for row in rows: # Access by column name print(f"Name: {row['name']}, Email: {row['email']}") # Access by index print(f"ID: {row[0]}") # Convert to dict print(dict(row)) # {'id': 1, 'name': 'Alice', 'email': 'alice@example.com'} # Get keys and values print(list(row.keys())) # ['id', 'name', 'email'] print(list(row.values())) # [1, 'Alice', 'alice@example.com'] await conn.close() asyncio.run(main()) ``` -------------------------------- ### Execute SQL Commands with execute() Source: https://context7.com/magicstack/asyncpg/llms.txt Run SQL commands using execute(). Can execute multiple statements or parameterized queries with PostgreSQL's native $n syntax. Supports command timeouts. ```python import asyncio import asyncpg async def main(): conn = await asyncpg.connect(user='postgres', database='test') # Execute multiple statements at once (no parameters) await conn.execute(''' CREATE TABLE IF NOT EXISTS users( id serial PRIMARY KEY, name text NOT NULL, email text UNIQUE, created_at timestamp DEFAULT now() ); CREATE INDEX IF NOT EXISTS idx_users_email ON users(email); ''') # Execute with parameters using $n syntax status = await conn.execute(''' INSERT INTO users(name, email) VALUES($1, $2) ''', 'Alice', 'alice@example.com') print(status) # INSERT 0 1 # Execute with timeout await conn.execute( 'UPDATE users SET name = $1 WHERE id = $2', 'Alice Smith', 1, timeout=10.0 ) await conn.close() asyncio.run(main()) ``` -------------------------------- ### Query Logging in asyncpg Source: https://context7.com/magicstack/asyncpg/llms.txt Implement query logging in asyncpg for debugging and performance monitoring. Loggers can be added permanently or used temporarily via a context manager. Log records contain query details, arguments, elapsed time, and exceptions. ```python import asyncio import asyncpg async def main(): conn = await asyncpg.connect(user='postgres', database='test') # Simple query logger def log_query(record): print(f"Query: {record.query}") print(f"Args: {record.args}") print(f"Elapsed: {record.elapsed:.4f}s") if record.exception: print(f"Exception: {record.exception}") # Add logger permanently conn.add_query_logger(log_query) await conn.fetch('SELECT * FROM users WHERE id = $1', 1) # Output: # Query: SELECT * FROM users WHERE id = $1 # Args: (1,) # Elapsed: 0.0023s # Remove logger conn.remove_query_logger(log_query) # Use context manager for temporary logging class QueryCollector: def __init__(self): self.queries = [] def __call__(self, record): self.queries.append({ 'query': record.query, 'elapsed': record.elapsed }) collector = QueryCollector() with conn.query_logger(collector): await conn.fetch('SELECT 1') await conn.fetch('SELECT 2') print(f"Collected {len(collector.queries)} queries") for q in collector.queries: print(f" {q['query']} ({q['elapsed']:.4f}s)") await conn.close() asyncio.run(main()) ``` -------------------------------- ### Execute Prepared Statement Source: https://github.com/magicstack/asyncpg/blob/master/docs/api/index.md Use prepared statements to optimize frequently executed queries. The server parses, analyzes, and compiles the query once, allowing for reuse. ```python >>> import asyncpg, asyncio >>> async def run(): ... conn = await asyncpg.connect() ... stmt = await conn.prepare('''SELECT 2 ^ $1''') ... print(await stmt.fetchval(10)) ... print(await stmt.fetchval(20)) ... >>> asyncio.run(run()) 1024.0 1048576.0 ``` -------------------------------- ### Listening for PostgreSQL Notifications with asyncpg Source: https://context7.com/magicstack/asyncpg/llms.txt Utilize asyncpg to listen for and handle PostgreSQL NOTIFY messages. Callbacks can be regular functions or async coroutines. Remember to add and remove listeners as needed. ```python import asyncio import asyncpg async def main(): conn = await asyncpg.connect(user='postgres', database='test') # Define a notification callback def handle_notification(conn, pid, channel, payload): print(f"Received on '{channel}' from PID {pid}: {payload}") # Async callback also supported async def async_handle_notification(conn, pid, channel, payload): print(f"Async handler: {payload}") await asyncio.sleep(0) # Can await here # Add listeners await conn.add_listener('my_channel', handle_notification) await conn.add_listener('my_channel', async_handle_notification) # Send a notification from another connection conn2 = await asyncpg.connect(user='postgres', database='test') await conn2.execute("NOTIFY my_channel, 'Hello from conn2!'") await conn2.close() # Wait a bit for notification to arrive await asyncio.sleep(0.1) # Remove listener when done await conn.remove_listener('my_channel', handle_notification) await conn.close() asyncio.run(main()) ``` -------------------------------- ### Fetch Results for Multiple Parameter Sets with fetchmany() Source: https://context7.com/magicstack/asyncpg/llms.txt Use fetchmany() to execute a query for each sequence of arguments and return all results as a list of Records. This is useful for batch operations that return data. ```python import asyncio import asyncpg async def main(): conn = await asyncpg.connect(user='postgres', database='test') # Insert multiple rows and return the inserted data args = [('Eve', 'eve@example.com'), ('Frank', 'frank@example.com')] rows = await conn.fetchmany(''' INSERT INTO users(name, email) VALUES($1, $2) RETURNING id, name, email ''', args) for row in rows: print(f"Inserted user {row['id']}: {row['name']}") await conn.close() asyncio.run(main()) ``` -------------------------------- ### Using Prepared Statements in asyncpg Source: https://context7.com/magicstack/asyncpg/llms.txt Prepared statements optimize query performance by parsing and compiling queries once. They can be used for fetching single rows, multiple rows, or executing many statements. The execution plan can also be retrieved. ```python import asyncio import asyncpg async def main(): conn = await asyncpg.connect(user='postgres', database='test') # Create a prepared statement stmt = await conn.prepare('SELECT * FROM users WHERE id = $1') # Get statement info print(f"Query: {stmt.get_query()}") print(f"Parameters: {stmt.get_parameters()}") print(f"Attributes: {stmt.get_attributes()}") # Execute the prepared statement multiple times user1 = await stmt.fetchrow(1) user2 = await stmt.fetchrow(2) user3 = await stmt.fetchval(3, column=1) # Get name column # Fetch multiple rows stmt2 = await conn.prepare('SELECT * FROM users WHERE name LIKE $1') users = await stmt2.fetch('A%') # Execute many with prepared statement stmt3 = await conn.prepare('INSERT INTO logs(user_id, action) VALUES($1, $2)') await stmt3.executemany([ (1, 'login'), (1, 'view_page'), (1, 'logout'), ]) # Get execution plan stmt4 = await conn.prepare('SELECT * FROM users WHERE email = $1') plan = await stmt4.explain('test@example.com') print(plan) # JSON execution plan # Explain with analyze (actually executes the query in a transaction) plan_analyzed = await stmt4.explain('test@example.com', analyze=True) await conn.close() asyncio.run(main()) ``` -------------------------------- ### Iterate Cursor with async for Source: https://github.com/magicstack/asyncpg/blob/master/docs/api/index.md Use `Connection.cursor()` to iterate over large query results efficiently without fetching all rows at once. Cursors are non-scrollable and require a transaction. ```python async def iterate(con: Connection): async with con.transaction(): # Postgres requires non-scrollable cursors to be created # and used in a transaction. async for record in con.cursor('SELECT generate_series(0, 100)'): print(record) ``` -------------------------------- ### Configure Automatic JSON Conversion in asyncpg Source: https://github.com/magicstack/asyncpg/blob/master/docs/usage.md Use this snippet to set up asyncpg for automatic JSON encoding and decoding using Python's `json` module. Ensure the 'json' type codec is registered with the 'pg_catalog' schema. ```python import asyncio import asyncpg import json async def main(): conn = await asyncpg.connect() try: await conn.set_type_codec( 'json', encoder=json.dumps, decoder=json.loads, schema='pg_catalog' ) data = {'foo': 'bar', 'spam': 1} res = await conn.fetchval('SELECT $1::json', data) finally: await conn.close() asyncio.run(main()) ``` -------------------------------- ### Copying Data with COPY in asyncpg Source: https://context7.com/magicstack/asyncpg/llms.txt Use asyncpg's COPY protocol methods for high-performance bulk data transfer to and from PostgreSQL tables. Supports various sources like iterables, generators, files, and query results. Ensure proper table creation and column mapping. ```python import asyncio import asyncpg import io async def main(): conn = await asyncpg.connect(user='postgres', database='test') await conn.execute(''' CREATE TABLE IF NOT EXISTS measurements( id serial PRIMARY KEY, sensor_id int, value float, timestamp timestamp ) ''') # Copy from records (binary format, most efficient) records = [ (1, 23.5, '2024-01-01 10:00:00'), (1, 24.1, '2024-01-01 10:01:00'), (2, 19.8, '2024-01-01 10:00:00'), ] result = await conn.copy_records_to_table( 'measurements', records=records, columns=['sensor_id', 'value', 'timestamp'] ) print(result) # COPY 3 # Copy from async generator (for large datasets) async def generate_records(count): for i in range(count): yield (1, 20.0 + i * 0.1, f'2024-01-02 {i:02d}:00:00') await conn.copy_records_to_table( 'measurements', records=generate_records(100), columns=['sensor_id', 'value', 'timestamp'] ) # Copy to file (CSV format) await conn.copy_from_table( 'measurements', output='measurements.csv', format='csv', header=True ) # Copy from file await conn.copy_to_table( 'measurements', source='measurements.csv', format='csv', header=True, columns=['sensor_id', 'value', 'timestamp'] ) # Copy query results await conn.copy_from_query( 'SELECT sensor_id, avg(value) FROM measurements GROUP BY sensor_id', output='averages.csv', format='csv' ) # Copy to in-memory buffer buffer = io.BytesIO() await conn.copy_from_table('measurements', output=buffer, format='csv') print(buffer.getvalue().decode()) await conn.close() asyncio.run(main()) ``` -------------------------------- ### Connection Settings Source: https://github.com/magicstack/asyncpg/blob/master/docs/api/index.md ConnectionSettings provides access to read-only connection parameters. ```APIDOC ### *class* ConnectionSettings A read-only collection of Connection settings. ### settings.setting_name Return the value of the “setting_name” setting. Raises an `AttributeError` if the setting is not defined. Example: ```pycon >>> connection.get_settings().client_encoding 'UTF8' ``` ``` -------------------------------- ### Prepared Statements Source: https://github.com/magicstack/asyncpg/blob/master/docs/api/index.md Prepared statements optimize query performance by allowing the server to parse, analyze, and compile queries once, enabling reuse for subsequent executions. asyncpg automatically caches recently executed queries. ```APIDOC ## Prepared Statements ### Description Prepared statements are a PostgreSQL feature that can be used to optimize the performance of queries that are executed more than once. When a query is *prepared* by a call to `Connection.prepare()`, the server parses, analyzes and compiles the query allowing to reuse that work once there is a need to run the same query again. ### Request Example ```pycon >>> import asyncpg, asyncio >>> async def run(): ... conn = await asyncpg.connect() ... stmt = await conn.prepare('''SELECT 2 ^ $1''') ... print(await stmt.fetchval(10)) ... print(await stmt.fetchval(20)) ... >>> asyncio.run(run()) 1024.0 1048576.0 ``` ### Notes - asyncpg automatically maintains a small LRU cache for queries executed during calls to the `fetch()`, `fetchrow()`, or `fetchval()` methods. ### Warnings - If you are using pgbouncer with `pool_mode` set to `transaction` or `statement`, prepared statements will not work correctly. See [Why am I getting prepared statement errors?](../faq.md#asyncpg-prepared-stmt-errors) for more information. ``` -------------------------------- ### Configure PostGIS Geometry Type Conversion Source: https://github.com/magicstack/asyncpg/blob/master/docs/usage.md Configure asyncpg to encode and decode PostGIS geometry types using the geo interface specification and WKB format. Requires Shapely or a similar library. ```python import asyncio import asyncpg import shapely.geometry import shapely.wkb from shapely.geometry.base import BaseGeometry async def main(): conn = await asyncpg.connect() try: def encode_geometry(geometry): if not hasattr(geometry, '__geo_interface__'): raise TypeError('{g} does not conform to ' 'the geo interface'.format(g=geometry)) shape = shapely.geometry.shape(geometry) return shapely.wkb.dumps(shape) def decode_geometry(wkb): return shapely.wkb.loads(wkb) await conn.set_type_codec( 'geometry', # also works for 'geography' encoder=encode_geometry, decoder=decode_geometry, format='binary', ) data = shapely.geometry.Point(-73.985661, 40.748447) res = await conn.fetchrow( '''SELECT 'Empire State Building' AS name, $1::geometry AS coordinates ''', data) print(res) finally: await conn.close() asyncio.run(main()) ``` -------------------------------- ### Decode hstore Values as Dict Source: https://github.com/magicstack/asyncpg/blob/master/docs/usage.md Register a codec to decode and encode hstore values as Python dict objects. Requires the hstore extension to be enabled in the database. ```python import asyncpg import asyncio async def run(): conn = await asyncpg.connect() # Assuming the hstore extension exists in the public schema. await conn.set_builtin_type_codec( 'hstore', codec_name='pg_contrib.hstore') result = await conn.fetchval("SELECT 'a=>1,b=>2,c=>NULL'::hstore") assert result == {'a': '1', 'b': '2', 'c': None} asyncio.run(run()) ``` -------------------------------- ### Fetch a Single Row with fetchrow() Source: https://context7.com/magicstack/asyncpg/llms.txt Execute a query and return only the first row as a Record object, or None if no rows match. More efficient than fetch() for single results. Supports timeouts. ```python import asyncio import asyncpg async def main(): conn = await asyncpg.connect(user='postgres', database='test') # Fetch a single row user = await conn.fetchrow( 'SELECT id, name, email FROM users WHERE id = $1', 1 ) if user: print(f"Found user: {user['name']}") # Check if column exists print('email' in user) # True else: print("User not found") # Fetch with timeout user = await conn.fetchrow( 'SELECT * FROM users ORDER BY created_at DESC LIMIT 1', timeout=5.0 ) await conn.close() asyncio.run(main()) ``` -------------------------------- ### Custom Record Class with Dot Notation Source: https://github.com/magicstack/asyncpg/blob/master/docs/faq.md Implement a custom Record class to enable dot-notation access to column values. This class inherits from asyncpg.Record and overrides __getattr__ to provide the desired behavior. ```python class MyRecord(asyncpg.Record): def __getattr__(self, name): return self[name] ``` -------------------------------- ### Use Transaction with async with Source: https://github.com/magicstack/asyncpg/blob/master/docs/api/index.md Manage database transactions using the `async with` statement for automatic commit or rollback. ```python async with connection.transaction(): await connection.execute("INSERT INTO mytable VALUES(1, 2, 3)") ``` -------------------------------- ### Creating and Using Connection Pools in asyncpg Source: https://context7.com/magicstack/asyncpg/llms.txt Connection pools manage reusable database connections for improved performance in high-concurrency applications. They handle connection lifecycle, health checks, and resets. Pools offer convenience methods for common operations and provide status information. ```python import asyncio import asyncpg async def main(): # Create a connection pool pool = await asyncpg.create_pool( user='postgres', password='secret', database='mydb', host='localhost', min_size=5, # Minimum connections to maintain max_size=20, # Maximum connections allowed max_queries=50000, # Queries before connection replacement max_inactive_connection_lifetime=300.0, # Seconds before idle cleanup command_timeout=60 ) # Use pool as async context manager (recommended) async with pool.acquire() as conn: result = await conn.fetch('SELECT * FROM users') print(f"Found {len(result)} users") # Connection automatically released back to pool # Pool has convenience methods that acquire/release automatically users = await pool.fetch('SELECT * FROM users LIMIT 10') count = await pool.fetchval('SELECT count(*) FROM users') user = await pool.fetchrow('SELECT * FROM users WHERE id = $1', 1) await pool.execute('UPDATE users SET last_login = now() WHERE id = $1', 1) # Pool status print(f"Pool size: {pool.get_size()}") print(f"Idle connections: {pool.get_idle_size()}") print(f"Min/Max: {pool.get_min_size()}/{pool.get_max_size()}") # Expire all connections (useful after schema changes) await pool.expire_connections() # Close the pool gracefully await pool.close() ``` -------------------------------- ### Decode Numeric Columns as Floats Source: https://github.com/magicstack/asyncpg/blob/master/docs/usage.md Instruct asyncpg to decode numeric columns as Python floats instead of the default Decimal instances. Uses 'pg_catalog' schema and 'text' format. ```python import asyncio import asyncpg async def main(): conn = await asyncpg.connect() try: await conn.set_type_codec( 'numeric', encoder=str, decoder=float, schema='pg_catalog', format='text' ) res = await conn.fetchval("SELECT $1::numeric", 11.123) print(res, type(res)) finally: await conn.close() asyncio.run(main()) ``` -------------------------------- ### Register Connection and Log Listeners Source: https://context7.com/magicstack/asyncpg/llms.txt Register callbacks for server log messages and connection termination events. Ensure listeners are removed when no longer needed to prevent memory leaks. ```python import asyncio import asyncpg async def main(): conn = await asyncpg.connect(user='postgres', database='test') # Log listener for server messages (NOTICE, WARNING, etc.) def log_handler(conn, message): print(f"Server {message.severity}: {message.message}") conn.add_log_listener(log_handler) # Termination listener def on_terminate(conn): print("Connection terminated!") conn.add_termination_listener(on_terminate) # This will trigger a NOTICE await conn.execute(''' DO $$ BEGIN RAISE NOTICE 'This is a notice message'; END $$ ''') # Remove listeners conn.remove_log_listener(log_handler) conn.remove_termination_listener(on_terminate) await conn.close() asyncio.run(main()) ``` -------------------------------- ### Using Cursors for Large Result Sets in asyncpg Source: https://context7.com/magicstack/asyncpg/llms.txt Cursors allow efficient traversal of large query results without loading everything into memory. They support async iteration and manual navigation, but must be used within a transaction. Cursors can also be created from prepared statements. ```python import asyncio import asyncpg async def main(): conn = await asyncpg.connect(user='postgres', database='test') # Cursors must be used within a transaction async with conn.transaction(): # Async iteration with automatic prefetching (default 50 rows) async for record in conn.cursor('SELECT * FROM users'): print(record['name']) # Custom prefetch size async for record in conn.cursor('SELECT * FROM users', prefetch=100): print(record['name']) # Manual cursor control cur = await conn.cursor('SELECT * FROM users ORDER BY id') # Skip forward 10 rows await cur.forward(10) # Fetch one row row = await cur.fetchrow() print(f"Row 11: {row}") # Fetch next 5 rows rows = await cur.fetch(5) print(f"Rows 12-16: {len(rows)} rows") # Cursor from prepared statement async with conn.transaction(): stmt = await conn.prepare('SELECT * FROM users WHERE created_at > $1') async for record in stmt.cursor('2024-01-01'): print(record['name']) await conn.close() asyncio.run(main()) ``` -------------------------------- ### Custom Type Codecs for JSON, Numeric, and Composite Types Source: https://context7.com/magicstack/asyncpg/llms.txt Illustrates defining custom encoders and decoders for PostgreSQL data types like JSON, JSONB, numeric, and custom composite types. Enables automatic conversion between PostgreSQL and Python objects. ```python import asyncio import asyncpg import json from datetime import datetime from decimal import Decimal async def main(): conn = await asyncpg.connect(user='postgres', database='test') # JSON codec - automatically encode/decode JSON await conn.set_type_codec( 'json', encoder=json.dumps, decoder=json.loads, schema='pg_catalog' ) data = {'users': [1, 2, 3], 'active': True} result = await conn.fetchval('SELECT $1::json', data) print(result) # {'users': [1, 2, 3], 'active': True} # JSONB codec await conn.set_type_codec( 'jsonb', encoder=json.dumps, decoder=json.loads, schema='pg_catalog' ) # Numeric as float (instead of Decimal) await conn.set_type_codec( 'numeric', encoder=str, decoder=float, schema='pg_catalog', format='text' ) price = await conn.fetchval('SELECT $1::numeric', 19.99) print(type(price), price) # 19.99 # Custom composite type codec await conn.execute(''' CREATE TYPE point3d AS (x float, y float, z float) ''') await conn.set_type_codec( 'point3d', encoder=lambda p: (p['x'], p['y'], p['z']), decoder=lambda t: {'x': t[0], 'y': t[1], 'z': t[2]}, format='tuple' ) point = await conn.fetchval( 'SELECT $1::point3d', {'x': 1.0, 'y': 2.0, 'z': 3.0} ) print(point) # {'x': 1.0, 'y': 2.0, 'z': 3.0} # Reset codec to default await conn.reset_type_codec('numeric', schema='pg_catalog') await conn.close() asyncio.run(main()) ``` -------------------------------- ### Using Transactions with asyncpg Source: https://context7.com/magicstack/asyncpg/llms.txt Transactions ensure atomic database operations. asyncpg supports transactions via the transaction() method, usable as an async context manager or manually. The async with statement automatically commits or rolls back. ```python import asyncio import asyncpg async def main(): conn = await asyncpg.connect(user='postgres', database='test') # Transaction using async with (recommended) async with conn.transaction(): await conn.execute( 'INSERT INTO users(name, email) VALUES($1, $2)', 'Grace', 'grace@example.com' ) await conn.execute( 'UPDATE users SET name = $1 WHERE email = $2', 'Grace Hopper', 'grace@example.com' ) # Transaction commits automatically on exit # Transaction with isolation level async with conn.transaction(isolation='serializable', readonly=False): await conn.execute('UPDATE accounts SET balance = balance - 100 WHERE id = 1') await conn.execute('UPDATE accounts SET balance = balance + 100 WHERE id = 2') # Nested transactions create savepoints async with conn.transaction(): await conn.execute("INSERT INTO users(name, email) VALUES('Test1', 'test1@example.com')") try: async with conn.transaction(): # Creates a savepoint await conn.execute("INSERT INTO users(name, email) VALUES('Test2', 'test2@example.com')") raise Exception("Rollback inner transaction") except Exception: pass # Inner transaction rolled back, outer continues # Test1 is still inserted, Test2 is rolled back # Manual transaction control tr = conn.transaction() await tr.start() try: await conn.execute("INSERT INTO users(name, email) VALUES('Manual', 'manual@example.com')") await tr.commit() except Exception: await tr.rollback() raise await conn.close() asyncio.run(main()) ``` -------------------------------- ### Manual Cursor Iteration Source: https://github.com/magicstack/asyncpg/blob/master/docs/api/index.md Manually iterate over a cursor by fetching rows or chunks. This method allows for more control over fetching but does not prefetch rows by default. ```python async def iterate(con: Connection): async with con.transaction(): # Postgres requires non-scrollable cursors to be created # and used in a transaction. # Create a Cursor object cur = await con.cursor('SELECT generate_series(0, 100)') # Move the cursor 10 rows forward await cur.forward(10) # Fetch one row and print it print(await cur.fetchrow()) # Fetch a list of 5 rows and print it print(await cur.fetch(5)) ``` -------------------------------- ### Execute Multiple Parameter Sets with executemany() Source: https://context7.com/magicstack/asyncpg/llms.txt Use executemany() to execute a command for each sequence of arguments in an iterable. This operation is atomic. You can also specify a timeout. ```python import asyncio import asyncpg async def main(): conn = await asyncpg.connect(user='postgres', database='test') # Prepare data to insert users_data = [ ('Bob', 'bob@example.com'), ('Charlie', 'charlie@example.com'), ('Diana', 'diana@example.com'), ] # Insert multiple rows atomically await conn.executemany(''' INSERT INTO users(name, email) VALUES($1, $2) ''', users_data) # With timeout await conn.executemany( 'UPDATE users SET name = $1 WHERE email = $2', [('Robert', 'bob@example.com'), ('Chuck', 'charlie@example.com')], timeout=30.0 ) await conn.close() asyncio.run(main()) ``` -------------------------------- ### Transactions Source: https://github.com/magicstack/asyncpg/blob/master/docs/api/index.md Manage database transactions using `async with connection.transaction():` for automatic commit/rollback or manually with `tr.start()`, `tr.commit()`, and `tr.rollback()`. ```APIDOC ## Transactions ### Description The most common way to use transactions is through an `async with` statement. asyncpg supports nested transactions, which create savepoints. ### Request Example ```python async with connection.transaction(): await connection.execute("INSERT INTO mytable VALUES(1, 2, 3)") ``` ### Nested Transactions Example ```python async with connection.transaction(): await connection.execute('CREATE TABLE mytab (a int)') try: # Create a nested transaction: async with connection.transaction(): await connection.execute('INSERT INTO mytab (a) VALUES (1), (2)') # This nested transaction will be automatically rolled back: raise Exception except: # Ignore exception pass # Because the nested transaction was rolled back, there # will be nothing in `mytab`. assert await connection.fetch('SELECT a FROM mytab') == [] ``` ### Manual Transaction Management Example ```python tr = connection.transaction() await tr.start() try: ... except: await tr.rollback() raise else: await tr.commit() ``` ### See Also - `Connection.transaction()` ``` -------------------------------- ### Fetch Single Value with fetchval() Source: https://context7.com/magicstack/asyncpg/llms.txt Use fetchval() to retrieve a single value from the first row of a query result. It returns None if no rows match. You can specify a column index to retrieve a value other than the first column. ```python import asyncio import asyncpg async def main(): conn = await asyncpg.connect(user='postgres', database='test') # Get count of users count = await conn.fetchval('SELECT count(*) FROM users') print(f"Total users: {count}") # Total users: 42 # Get a specific column (0-indexed) email = await conn.fetchval( 'SELECT id, name, email FROM users WHERE id = $1', 1, column=2 # Get the email column (index 2) ) print(f"Email: {email}") # Returns None if no rows match result = await conn.fetchval( 'SELECT name FROM users WHERE id = $1', 99999 ) print(result) # None await conn.close() asyncio.run(main()) ``` -------------------------------- ### Record Objects Source: https://github.com/magicstack/asyncpg/blob/master/docs/api/index.md Record objects represent rows returned by fetch methods. They are a hybrid of tuple and dictionary, allowing access by index or field name. ```APIDOC ## Record Objects Each row (or composite type value) returned by calls to `fetch*` methods is represented by an instance of the [`Record`](#asyncpg.Record) object. `Record` objects are a tuple-/dict-like hybrid, and allow addressing of items either by a numeric index or by a field name. ### *class* Record A read-only representation of PostgreSQL row. ### len(r) Return the number of fields in record *r*. ### r[field] Return the field of *r* with field name or index *field*. ### name in r Return `True` if record *r* has a field named *name*. ### iter(r) Return an iterator over the *values* of the record *r*. ### get(name[, default]) Return the value for *name* if the record has a field named *name*, else return *default*. If *default* is not given, return `None`. #### Versionadded Added in version 0.18. #### values() Return an iterator over the record values. #### keys() Return an iterator over the record field names. #### items() Return an iterator over `(field, value)` pairs. ``` -------------------------------- ### Handle Schema Changes with reload_schema_state Source: https://context7.com/magicstack/asyncpg/llms.txt Force asyncpg to reload its cached type information after schema changes, such as altering ENUM types. This ensures new type values are recognized. ```python import asyncio import asyncpg async def main(): conn = await asyncpg.connect(user='postgres', database='test') # Create initial type await conn.execute(''' CREATE TYPE status AS ENUM ('pending', 'active', 'closed') ''') # Use the type await conn.execute(''' CREATE TABLE orders(id serial, status status) ''') await conn.execute("INSERT INTO orders(status) VALUES('pending')") # Alter the type await conn.execute("ALTER TYPE status ADD VALUE 'archived'") # Reload schema state to pick up changes await conn.reload_schema_state() # Now we can use the new enum value await conn.execute("INSERT INTO orders(status) VALUES('archived')") await conn.close() asyncio.run(main()) ```