### Pool Initialization Example Source: https://magicstack.github.io/asyncpg/current/_modules/asyncpg/pool.html This code demonstrates the creation of an asyncpg Pool instance with various configuration parameters. It shows how to specify connection limits, query limits, and custom coroutines for connection initialization and setup. ```python return Pool( dsn, connection_class=connection_class, record_class=record_class, min_size=min_size, max_size=max_size, max_queries=max_queries, loop=loop, connect=connect, setup=setup, init=init, reset=reset, max_inactive_connection_lifetime=max_inactive_connection_lifetime, **connect_kwargs, ) ``` -------------------------------- ### Transaction Management Example Source: https://magicstack.github.io/asyncpg/current/_modules/asyncpg/transaction.html Demonstrates how to start, commit, and rollback a transaction using asyncpg. This is useful for ensuring data integrity by grouping multiple operations into a single atomic unit. ```python async def transaction_example(conn): async with conn.transaction(): # Operations within the transaction await conn.execute("INSERT INTO users (name) VALUES ($1)", "Alice") await conn.execute("UPDATE products SET price = price * 1.1 WHERE id = 1") # If an exception occurs within the 'async with' block, the transaction # will be rolled back automatically. Otherwise, it will be committed. ``` -------------------------------- ### Install asyncpg from source Source: https://magicstack.github.io/asyncpg/current/installation.html Install asyncpg from a Git checkout. Ensure you have a C compiler and CPython header files installed. ```bash $ pip install -e . ``` -------------------------------- ### Install asyncpg with pip Source: https://magicstack.github.io/asyncpg/current/installation.html Use this command for a standard installation of the asyncpg library. ```bash $ pip install asyncpg ``` -------------------------------- ### Install asyncpg with GSSAPI/SSPI authentication Source: https://magicstack.github.io/asyncpg/current/installation.html Install asyncpg with support for GSSAPI/SSPI authentication. This command installs SSPI support on Windows and GSSAPI support on non-Windows platforms. ```bash $ pip install 'asyncpg[gssauth]' ``` -------------------------------- ### Web Service with Asyncpg Connection Pool Source: https://magicstack.github.io/asyncpg/current/usage.html This example demonstrates a simple aiohttp web service that uses an asyncpg connection pool to handle requests. It shows how to create a pool, acquire connections, manage transactions, and execute queries within a web request context. The pool is initialized during application setup and closed upon cleanup. ```python import asyncio import asyncpg from aiohttp import web async def handle(request): """Handle incoming requests.""" pool = request.app['pool'] power = int(request.match_info.get('power', 10)) # Take a connection from the pool. async with pool.acquire() as connection: # Open a transaction. async with connection.transaction(): # Run the query passing the request argument. result = await connection.fetchval('select 2 ^ $1', power) return web.Response( text="2 ^ {} is {}".format(power, result)) async def init_db(app): """Initialize a connection pool.""" app['pool'] = await asyncpg.create_pool(database='postgres', user='postgres') yield await app['pool'].close() def init_app(): """Initialize the application server.""" app = web.Application() # Create a database context app.cleanup_ctx.append(init_db) # Configure service routes app.router.add_route('GET', '/{power:\d+}', handle) app.router.add_route('GET', '/', handle) return app app = init_app() web.run_app(app) ``` -------------------------------- ### Basic Connection and Fetch Example Source: https://magicstack.github.io/asyncpg/current/api/index.html Demonstrates establishing a basic connection to a PostgreSQL server and fetching data from the 'pg_type' table. ```python import asyncpg import asyncio async def run(): con = await asyncpg.connect(user='postgres') types = await con.fetch('SELECT * FROM pg_type') print(types) asyncio.run(run()) ``` -------------------------------- ### Run asyncpg tests Source: https://magicstack.github.io/asyncpg/current/installation.html Execute the asyncpg test suite. This requires PostgreSQL to be installed. ```bash $ python setup.py test ``` -------------------------------- ### Setup Inactive Connection Timer Source: https://magicstack.github.io/asyncpg/current/_modules/asyncpg/pool.html Configures and starts a timer for detecting inactive connections. If a connection remains unused for a specified duration, it will be deactivated. ```python def _setup_inactive_callback(self) -> None: if self._inactive_callback is not None: raise exceptions.InternalClientError( 'pool connection inactivity timer already exists') if self._max_inactive_time: self._inactive_callback = self._pool._loop.call_later( self._max_inactive_time, self._deactivate_inactive_connection) ``` -------------------------------- ### Create Pool, Acquire Connection, and Perform Operations Source: https://magicstack.github.io/asyncpg/current/api/index.html This example shows how to create a connection pool and then acquire a connection to perform multiple operations, including table creation and data fetching. ```python async with asyncpg.create_pool(user='postgres', command_timeout=60) as pool: async with pool.acquire() as con: await con.execute(''' CREATE TABLE names ( id serial PRIMARY KEY, name VARCHAR (255) NOT NULL) ''') await con.fetch('SELECT 1') ``` -------------------------------- ### Valid PostgreSQL Connection URI Example Source: https://magicstack.github.io/asyncpg/current/api/index.html Illustrates a correctly formatted libpq connection URI, including proper quoting for IPv6 addresses. ```python postgres://dbuser@[fe80::1ff:fe23:4567:890a%25eth0]/dbname ``` -------------------------------- ### Get Execution Plan with Statistics (EXPLAIN ANALYZE) Source: https://magicstack.github.io/asyncpg/current/_modules/asyncpg/prepared_stmt.html Retrieve the execution plan for a prepared statement and execute it to gather runtime statistics. Use with caution as the statement will be executed. ```python plan_with_stats = await stmt.explain(analyze=True) # The 'plan_with_stats' variable will contain EXPLAIN ANALYZE output. ``` -------------------------------- ### Get Execution Plan (EXPLAIN) Source: https://magicstack.github.io/asyncpg/current/_modules/asyncpg/prepared_stmt.html Retrieve the execution plan for a prepared statement without executing it. This is useful for performance analysis. ```python plan = await stmt.explain() # The 'plan' variable will contain a deserialized JSON object of the EXPLAIN output. ``` -------------------------------- ### Programmatic SSL Context Configuration (verify-full) Source: https://magicstack.github.io/asyncpg/current/_modules/asyncpg/connection.html Configure an SSL context programmatically for secure connections, equivalent to DSN parameters like sslmode=verify-full, sslcert, sslkey, and sslrootcert. This example demonstrates loading CA bundles, enabling hostname checking, and providing client certificates and keys. ```pycon >>> import asyncpg >>> import asyncio >>> import ssl >>> async def main(): ... # Load CA bundle for server certificate verification, ... # equivalent to sslrootcert= in DSN. ... sslctx = ssl.create_default_context( ... ssl.Purpose.SERVER_AUTH, ... cafile="path/to/ca_bundle.pem") ... # If True, equivalent to sslmode=verify-full, if False: ... # sslmode=verify-ca. ... sslctx.check_hostname = True ... # Load client certificate and private key for client ... # authentication, equivalent to sslcert= and sslkey= in ... # DSN. ... sslctx.load_cert_chain( ... "path/to/client.cert", ... keyfile="path/to/client.key", ... ) ... con = await asyncpg.connect(user='postgres', ssl=sslctx) ... await con.close() >>> asyncio.run(main()) ``` -------------------------------- ### Query Logger Example Source: https://magicstack.github.io/asyncpg/current/api/index.html Uses a context manager to add a callback for logging queries. The callback receives a LoggedQuery object containing details about the query execution. ```python >>> class QuerySaver: def __init__(self): self.queries = [] def __call__(self, record): self.queries.append(record.query) >>> with con.query_logger(QuerySaver()): >>> await con.execute("SELECT 1") >>> print(log.queries) ['SELECT 1'] ``` -------------------------------- ### Copy Records to Table Example Source: https://magicstack.github.io/asyncpg/current/_modules/asyncpg/connection.html Demonstrates copying a list of records to a table using the copy_records_to_table method. Requires an active asyncpg connection. ```python import asyncpg import asyncio async def run(): con = await asyncpg.connect(user='postgres') result = await con.copy_records_to_table( 'mytable', records=[ (1, 'foo', 'bar'), (2, 'ham', 'spam')]) print(result) asyncio.run(run()) ``` -------------------------------- ### Pool Initialization Source: https://magicstack.github.io/asyncpg/current/_modules/asyncpg/pool.html This section details the parameters available when initializing an asyncpg Pool. These parameters allow for customization of connection behavior, including setup, initialization, and reset routines. ```APIDOC ## Pool() ### Description Initializes a new connection pool. ### Parameters * **dsn** (string) - The database connection string. * **connection_class** (class) - The connection class to use. Defaults to `asyncpg.connection.Connection`. * **record_class** (class) - The record class to use for query results. Defaults to `asyncpg.Record`. * **min_size** (integer) - The minimum number of connections in the pool. * **max_size** (integer) - The maximum number of connections in the pool. * **max_queries** (integer) - The maximum number of queries per connection before it is reset. * **loop** (asyncio.BaseEventLoop) - The asyncio event loop to use. Defaults to the default event loop. * **connect** (coroutine) - A coroutine to run when a connection is established. * **setup** (coroutine) - A coroutine to prepare a connection before it is returned from `Pool.acquire()`. * **init** (coroutine) - A coroutine to initialize a connection when it is created. * **reset** (coroutine) - A coroutine to reset a connection before it is returned to the pool by `Pool.release()`. * **max_inactive_connection_lifetime** (float) - The maximum time in seconds a connection can be inactive before being closed. ### Returns An instance of `asyncpg.pool.Pool`. ``` -------------------------------- ### Build asyncpg with debug checks Source: https://magicstack.github.io/asyncpg/current/installation.html Create a debug build of asyncpg with additional runtime checks by setting the ASYNCPG_DEBUG environment variable during installation. ```bash $ env ASYNCPG_DEBUG=1 pip install -e . ``` -------------------------------- ### PreparedStatement.get_parameters Source: https://magicstack.github.io/asyncpg/current/_modules/asyncpg/prepared_stmt.html Returns a description of the types of the parameters used in the prepared statement. Includes an example demonstrating its output. ```APIDOC ## get_parameters() ### Description Return a description of statement parameters types. ### Method `get_parameters()` ### Example ```python stmt = await connection.prepare('SELECT ($1::int, $2::text)') print(stmt.get_parameters()) # Will print: # (Type(oid=23, name='int4', kind='scalar', schema='pg_catalog'), # Type(oid=25, name='text', kind='scalar', schema='pg_catalog')) ``` ### Returns - `tuple`: A tuple of :class:`asyncpg.types.Type` objects describing the statement parameters. ``` -------------------------------- ### Starting a Transaction Source: https://magicstack.github.io/asyncpg/current/_modules/asyncpg/transaction.html Initiates a new transaction block. This method is called automatically when entering an `async with` block with a Transaction object. ```python await self.start() ``` -------------------------------- ### Entering Transaction Context Source: https://magicstack.github.io/asyncpg/current/_modules/asyncpg/transaction.html Asynchronous context manager entry point for a transaction. It ensures the transaction is started and manages its state. ```python async def __aenter__(self): if self._managed: raise apg_errors.InterfaceError( 'cannot enter context: already in an `async with` block') self._managed = True await self.start() ``` -------------------------------- ### Programmatic SSL Context Configuration (require) Source: https://magicstack.github.io/asyncpg/current/_modules/asyncpg/connection.html Configure an SSL context programmatically for connections requiring SSL, equivalent to sslmode=require. This setup bypasses server certificate and host verification. ```pycon >>> import asyncpg >>> import asyncio >>> import ssl >>> async def main(): ... # sslmode=require (no server certificate or host verification) ... sslctx = ssl.create_default_context() ... con = await asyncpg.connect(user='postgres', ssl=sslctx) ... await con.close() >>> asyncio.run(main()) ``` -------------------------------- ### Reload Schema State Example Source: https://magicstack.github.io/asyncpg/current/api/index.html Demonstrates how to inform asyncpg to reload its cached schema information after database schema changes. This prevents OutdatedSchemaCacheError. ```python >>> import asyncpg >>> import asyncio >>> async def change_type(con): ... result = await con.fetch('SELECT id, info FROM tbl') ... # Change composite's attribute type "int"=>"text" ... await con.execute('ALTER TYPE custom DROP ATTRIBUTE y') ... await con.execute('ALTER TYPE custom ADD ATTRIBUTE y text') ... await con.reload_schema_state() ... for id_, info in result: ... new = (info['x'], str(info['y'])) ... await con.execute( ... 'UPDATE tbl SET info=$2 WHERE id=$1', id_, new) ... >>> async def run(): ... # Initial schema: ... # CREATE TYPE custom AS (x int, y int); ... # CREATE TABLE tbl(id int, info custom); ... con = await asyncpg.connect(user='postgres') ... async with con.transaction(): ... # Prevent concurrent changes in the table ... await con.execute('LOCK TABLE tbl') ... await change_type(con) ... >>> asyncio.run(run()) ``` -------------------------------- ### Get Prepared Statement Parameters Source: https://magicstack.github.io/asyncpg/current/_modules/asyncpg/prepared_stmt.html Retrieve a description of the data types for the parameters expected by the prepared statement. This returns a tuple of asyncpg.types.Type objects. ```python stmt = await connection.prepare('SELECT ($1::int, $2::text)') print(stmt.get_parameters()) # Will print: # (Type(oid=23, name='int4', kind='scalar', schema='pg_catalog'), # Type(oid=25, name='text', kind='scalar', schema='pg_catalog')) ``` -------------------------------- ### Get Server Version Source: https://magicstack.github.io/asyncpg/current/_modules/asyncpg/connection.html Fetches and returns the version of the connected PostgreSQL server. The version is returned as a named tuple with major, minor, micro, releaselevel, and serial components. ```pycon >>> con.get_server_version() ServerVersion(major=9, minor=6, micro=1, releaselevel='final', serial=0) ``` -------------------------------- ### Create Pool and Acquire Connection with Async With Source: https://magicstack.github.io/asyncpg/current/_modules/asyncpg/pool.html Shows how to create a pool and then acquire a single connection using an async with block for executing multiple commands. ```python async with asyncpg.create_pool(user='postgres', command_timeout=60) as pool: async with pool.acquire() as con: await con.execute(''' CREATE TABLE names ( id serial PRIMARY KEY, name VARCHAR (255) NOT NULL) ''') await con.fetch('SELECT 1') ``` -------------------------------- ### Pool Creation and Basic Usage Source: https://magicstack.github.io/asyncpg/current/_modules/asyncpg/pool.html Demonstrates how to create a connection pool and acquire a connection to execute a query. ```python import asyncpg import asyncio async def run_pool(): # Create a connection pool pool = await asyncpg.create_pool(user='postgres', password='password', database='database', host='127.0.0.1') # Acquire a connection from the pool async with pool.acquire() as connection: # Execute a query result = await connection.fetchval('SELECT 1') print(f'Query result: {result}') # Close the pool when done await pool.close() if __name__ == '__main__': asyncio.run(run_pool()) ``` -------------------------------- ### Preparing and Executing a Query Source: https://magicstack.github.io/asyncpg/current/api/index.html Demonstrates how to prepare a SQL query and execute it multiple times with different parameters using `Connection.prepare()` and `PreparedStatement.fetchval()`. ```python >>> import asyncpg, asyncio >>> async def run(): ... conn = await asyncpg.connect() ... stmt = await conn.prepare('''SELECT 2 ^ $1''') ... print(await stmt.fetchval(10)) ... print(await stmt.fetchval(20)) ... >>> asyncio.run(run()) 1024.0 1048576.0 ``` -------------------------------- ### get_idle_size Source: https://magicstack.github.io/asyncpg/current/api/index.html Get the number of idle connections currently available in the pool. ```APIDOC ## get_idle_size ### Description Get the number of idle connections currently available in the pool. ### Method GET (or similar, conceptually) ### Endpoint Pool.get_idle_size() ### Parameters None ### Request Example ```python idle_connections = pool.get_idle_size() print(f"Idle connections: {idle_connections}") ``` ### Response #### Success Response (200) - **size** (int) - The number of idle connections. #### Response Example (No specific example provided, but returns an integer) ``` -------------------------------- ### Get Maximum Pool Size Source: https://magicstack.github.io/asyncpg/current/_modules/asyncpg/pool.html Returns the maximum number of connections allowed in the pool. ```python def get_max_size(self): """Return the maximum allowed number of connections in this pool. .. versionadded:: 0.25.0 """ return self._maxsize ``` -------------------------------- ### Basic asyncpg Connection and Operations Source: https://magicstack.github.io/asyncpg/current/usage.html Demonstrates establishing a connection, creating a table, inserting data, fetching a record, and closing the connection. Uses PostgreSQL's $n syntax for query arguments. ```python import asyncio import asyncpg import datetime async def main(): # Establish a connection to an existing database named "test" # as a "postgres" user. conn = await asyncpg.connect('postgresql://postgres@localhost/test') # Execute a statement to create a new table. await conn.execute(''' CREATE TABLE users( id serial PRIMARY KEY, name text, dob date ) ''') # Insert a record into the created table. await conn.execute(''' INSERT INTO users(name, dob) VALUES($1, $2) ''', 'Bob', datetime.date(1984, 3, 1)) # Select a row from the table. row = await conn.fetchrow( 'SELECT * FROM users WHERE name = $1', 'Bob') # *row* now contains # asyncpg.Record(id=1, name='Bob', dob=datetime.date(1984, 3, 1)) # Close the connection. await conn.close() asyncio.run(main()) ``` -------------------------------- ### Get Current Pool Size Source: https://magicstack.github.io/asyncpg/current/_modules/asyncpg/pool.html Returns the number of connections currently established in the pool. ```python def get_size(self): """Return the current number of connections in this pool. .. versionadded:: 0.25.0 """ return sum(h.is_connected() for h in self._holders) ``` -------------------------------- ### Create and Use Pool with Async With Source: https://magicstack.github.io/asyncpg/current/_modules/asyncpg/pool.html Demonstrates creating a connection pool and using it within an async with block for fetching data. ```python async with asyncpg.create_pool(user='postgres', command_timeout=60) as pool: await pool.fetch('SELECT 1') ``` -------------------------------- ### Get Idle Pool Size Source: https://magicstack.github.io/asyncpg/current/_modules/asyncpg/pool.html Returns the number of connections in the pool that are currently idle and available. ```python def get_idle_size(self): """Return the current number of idle connections in this pool. .. versionadded:: 0.25.0 """ return sum(h.is_connected() and h.is_idle() for h in self._holders) ``` -------------------------------- ### PreparedStatement.get_query Source: https://magicstack.github.io/asyncpg/current/_modules/asyncpg/prepared_stmt.html Returns the SQL query text associated with the prepared statement. Includes an example of its usage. ```APIDOC ## get_query() ### Description Return the text of the query for this prepared statement. ### Method `get_query()` ### Example ```python stmt = await connection.prepare('SELECT $1::int') assert stmt.get_query() == "SELECT $1::int" ``` ### Returns - `str`: The SQL query string. ``` -------------------------------- ### Get Server PID Source: https://magicstack.github.io/asyncpg/current/_modules/asyncpg/connection.html Retrieves the process ID (PID) of the PostgreSQL server process to which the connection is established. ```python return self._protocol.get_server_pid() ``` -------------------------------- ### Create Pool and Acquire Connection Manually Source: https://magicstack.github.io/asyncpg/current/_modules/asyncpg/pool.html Illustrates the manual creation of a pool and acquisition/release of a connection using await and a try-finally block. This method is not recommended for general use. ```python pool = await asyncpg.create_pool(user='postgres', command_timeout=60) con = await pool.acquire() try: await con.fetch('SELECT 1') finally: await pool.release(con) ``` -------------------------------- ### Get Minimum Pool Size Source: https://magicstack.github.io/asyncpg/current/_modules/asyncpg/pool.html Returns the configured minimum number of connections that the pool aims to maintain. ```python def get_min_size(self): """Return the minimum number of connections in this pool. .. versionadded:: 0.25.0 """ return self._minsize ``` -------------------------------- ### PreparedStatement.get_statusmsg Source: https://magicstack.github.io/asyncpg/current/_modules/asyncpg/prepared_stmt.html Retrieves the status message of the last executed command for this prepared statement. Provides an example for clarity. ```APIDOC ## get_statusmsg() ### Description Return the status of the executed command. ### Method `get_statusmsg()` ### Example ```python stmt = await connection.prepare('CREATE TABLE mytab (a int)') await stmt.fetch() assert stmt.get_statusmsg() == "CREATE TABLE" ``` ### Returns - `str`: The status message of the executed command, or None if not yet executed. ``` -------------------------------- ### BaseCursor Initialization Source: https://magicstack.github.io/asyncpg/current/_modules/asyncpg/cursor.html Initializes a BaseCursor, setting up connection, query, and state. Attaches the state to the connection if provided. ```python def __init__(self, connection, query, state, args, record_class): super().__init__(connection) self._args = args self._state = state if state is not None: state.attach() self._portal_name = None self._exhausted = False self._query = query self._record_class = record_class ``` -------------------------------- ### Get Connection Settings Source: https://magicstack.github.io/asyncpg/current/_modules/asyncpg/connection.html Retrieves the current connection settings for the PostgreSQL server. Returns an object of type `asyncpg.ConnectionSettings`. ```python return self._protocol.get_settings() ``` -------------------------------- ### Pool Initialization Logic Source: https://magicstack.github.io/asyncpg/current/_modules/asyncpg/pool.html Illustrates the internal logic for initializing a connection pool, including creating connection holders and pre-connecting a minimum number of connections. ```python async def _initialize(self): self._queue = asyncio.LifoQueue(maxsize=self._maxsize) for _ in range(self._maxsize): ch = PoolConnectionHolder( self, max_queries=self._max_queries, max_inactive_time=self._max_inactive_connection_lifetime, setup=self._setup) self._holders.append(ch) self._queue.put_nowait(ch) if self._minsize: # Since we use a LIFO queue, the first items in the queue will be # the last ones in `self._holders`. We want to pre-connect the # first few connections in the queue, therefore we want to walk # `self._holders` in reverse. # Connect the first connection holder in the queue so that # any connection issues are visible early. first_ch = self._holders[-1] # type: PoolConnectionHolder await first_ch.connect() if self._minsize > 1: connect_tasks = [] for i, ch in enumerate(reversed(self._holders[:-1])): # `minsize - 1` because we already have first_ch if i >= self._minsize - 1: break connect_tasks.append(ch.connect()) await asyncio.gather(*connect_tasks) ``` -------------------------------- ### PreparedStatement.get_attributes Source: https://magicstack.github.io/asyncpg/current/_modules/asyncpg/prepared_stmt.html Retrieves a description of the relation attributes (columns) for the prepared statement. An example is provided to illustrate the output format. ```APIDOC ## get_attributes() ### Description Return a description of relation attributes (columns). ### Method `get_attributes()` ### Example ```python st = await self.con.prepare(''' SELECT typname, typnamespace FROM pg_type ''') print(st.get_attributes()) # Will print: # (Attribute( # name='typname', # type=Type(oid=19, name='name', kind='scalar', # schema='pg_catalog')), # Attribute( # name='typnamespace', # type=Type(oid=26, name='oid', kind='scalar', # schema='pg_catalog'))) ``` ### Returns - `tuple`: A tuple of :class:`asyncpg.types.Attribute` objects describing the relation attributes. ``` -------------------------------- ### Get New Connection from Pool Source: https://magicstack.github.io/asyncpg/current/_modules/asyncpg/pool.html Internal method to establish and return a new connection, applying user-defined initialization logic. ```python async def _get_new_connection(self): con = await self._connect( *self._connect_args, loop=self._loop, connection_class=self._connection_class, record_class=self._record_class, **self._connect_kwargs, ) if not isinstance(con, self._connection_class): good = self._connection_class good_n = f'{good.__module__}.{good.__name__}' bad = type(con) if bad.__module__ == "builtins": bad_n = bad.__name__ else: bad_n = f'{bad.__module__}.{bad.__name__}' raise exceptions.InterfaceError( "expected pool connect callback to return an instance of " f"'{good_n}', got " f"'{bad_n}'" ) if self._init is not None: try: await self._init(con) except (Exception, asyncio.CancelledError) as ex: # If a user-defined `init` function fails, we don't # know if the connection is safe for re-use, hence # we close it. A new connection will be created # when `acquire` is called again. try: ``` -------------------------------- ### Connection Establishment Source: https://magicstack.github.io/asyncpg/current/_modules/asyncpg/connection.html Establishes a connection to a PostgreSQL database. Ensure the database is accessible and credentials are correct. ```python async def connect(user=None, password=None, database=None, host=None, port=None, *, loop=None, ssl=None, timeout=None, command_timeout=None, max_cached_statement_lifetime=300.0, max_statements=100, max_query_size=10485760, max_param_size=10485760, statement_cache_size=5000, json_loads=json.loads, json_dumps=json.dumps, json_encoder=None, json_decoder=None, server_version=None, connection_class=Connection, kwargs: dict = None): """Connect to a PostgreSQL server. See :ref:`connection_parameters` for a full list of connection parameters. """ if loop is None: loop = asyncio.get_event_loop() if kwargs is None: kwargs = {} if server_version is None: # Try to determine server version from connection parameters # This is a heuristic and might fail if the server does not support # the PG* protocol version we are trying to use. try: server_version = await _get_server_version(host, port, user, password, database, loop, ssl, timeout) except Exception: pass conn = connection_class(user=user, password=password, database=database, host=host, port=port, loop=loop, ssl=ssl, timeout=timeout, command_timeout=command_timeout, max_cached_statement_lifetime=max_cached_statement_lifetime, max_statements=max_statements, max_query_size=max_query_size, max_param_size=max_param_size, statement_cache_size=statement_cache_size, json_loads=json_loads, json_dumps=json_dumps, json_encoder=json_encoder, json_decoder=json_decoder, server_version=server_version, kwargs=kwargs) await conn._do_connect() return conn ``` -------------------------------- ### BaseCursor Bind and Execute Source: https://magicstack.github.io/asyncpg/current/_modules/asyncpg/cursor.html Binds arguments to a prepared statement and executes it, creating a portal. Ensures the cursor is ready and no portal is already open. Returns the buffer from the protocol. ```python async def _bind_exec(self, n, timeout): self._check_ready() if self._portal_name: raise exceptions.InterfaceError( 'cursor already has an open portal') con = self._connection protocol = con._protocol self._portal_name = con._get_unique_id('portal') buffer, _, self._exhausted = await protocol.bind_execute( self._state, self._args, self._portal_name, n, True, timeout) return buffer ``` -------------------------------- ### Establishing a Connection with asyncpg Source: https://magicstack.github.io/asyncpg/current/_modules/asyncpg/connection.html Demonstrates the `connect` coroutine for establishing a connection to a PostgreSQL server. It supports connection parameters via DSN string or keyword arguments, with keyword arguments overriding DSN values. ```python async def connect(dsn=None, *, host=None, port=None, user=None, password=None, passfile=None, service=None, servicefile=None, database=None, loop=None, timeout=60, statement_cache_size=100, max_cached_statement_lifetime=300, max_cacheable_statement_size=1024 * 15, command_timeout=None, ssl=None, direct_tls=None, connection_class=Connection, record_class=protocol.Record, server_settings=None, target_session_attrs=None, krbsrvname=None, gsslib=None): r"""A coroutine to establish a connection to a PostgreSQL server. The connection parameters may be specified either as a connection URI in *dsn*, or as specific keyword arguments, or both. If both *dsn* and keyword arguments are specified, the latter override the corresponding values parsed from the connection URI. The default values for the majority of arguments can be specified using `environment variables `_. Returns a new :class:`~asyncpg.connection.Connection` object. :param dsn: Connection arguments specified using as a single string in the `libpq connection URI format`_: ``postgres://user:password@host:port/database?option=value``. The following options are recognized by asyncpg: ``host``, ``port``, ``user``, ``database`` (or ``dbname``), ``password``, ``passfile``, ``sslmode``, ``sslcert``, ``sslkey``, ``sslrootcert``, and ``sslcrl``. Unlike libpq, asyncpg will treat unrecognized options as `server settings`_ to be used for the connection. .. note:: The URI must be *valid*, which means that all components must be properly quoted with :py:func:`urllib.parse.quote_plus`, and any literal IPv6 addresses must be enclosed in square brackets. For example: .. code-block:: text postgres://dbuser@[fe80::1ff:fe23:4567:890a%25eth0]/dbname :param host: Database host address as one of the following: - an IP address or a domain name; - an absolute path to the directory containing the database server Unix-domain socket (not supported on Windows); - a sequence of any of the above, in which case the addresses will be tried in order, and the first successful connection will be returned. If not specified, asyncpg will try the following, in order: - host address(es) parsed from the *dsn* argument, - the value of the ``PGHOST`` environment variable, - on Unix, common directories used for PostgreSQL Unix-domain sockets: ``"/run/postgresql"``, ``"/var/run/postgresl"``, ``"/var/pgsql_socket"``, ``"/private/tmp"``, and ``"/tmp"``, - ``"localhost"``. :param port: Port number to connect to at the server host (or Unix-domain socket file extension). If multiple host addresses were specified, this parameter may specify a sequence of port numbers of the same length as the host sequence, or it may specify a single port number to be used for all host addresses. If not specified, the value parsed from the *dsn* argument is used, or the value of the ``PGPORT`` environment variable, or ``5432`` if neither is specified. :param user: The name of the database role used for authentication. If not specified, the value parsed from the *dsn* argument is used, or the value of the ``PGUSER`` environment variable, or the operating system name of the user running the application. ``` -------------------------------- ### Create Pool and Manage Connection Manually Source: https://magicstack.github.io/asyncpg/current/api/index.html This snippet illustrates how to create a connection pool and manually acquire and release connections using await. This method is not recommended for general use. ```python pool = await asyncpg.create_pool(user='postgres', command_timeout=60) con = await pool.acquire() try: await con.fetch('SELECT 1') finally: await pool.release(con) ``` -------------------------------- ### Get Connection Reset Query Source: https://magicstack.github.io/asyncpg/current/_modules/asyncpg/connection.html Retrieves the SQL query used to reset the connection state when it's released back to a pool. ```python def get_reset_query(self): """Return the query sent to server on connection release. The query returned by this method is used by :meth:`Connection.reset`, which is, in turn, used by :class:`~asyncpg.pool.Pool` before making the connection available to another acquirer. .. versionadded:: 0.30.0 """ if self._reset_query is not None: return self._reset_query caps = self._server_caps _reset_query = [] if caps.advisory_locks: _reset_query.append('SELECT pg_advisory_unlock_all();') if caps.sql_close_all: _reset_query.append('CLOSE ALL;') if caps.notifications and caps.plpgsql: _reset_query.append('UNLISTEN *;') if caps.sql_reset: _reset_query.append('RESET ALL;') _reset_query = '\n'.join(_reset_query) self._reset_query = _reset_query return self._reset_query ``` -------------------------------- ### Get Prepared Statement Query Text Source: https://magicstack.github.io/asyncpg/current/_modules/asyncpg/prepared_stmt.html Retrieve the SQL query string associated with a prepared statement. This is useful for debugging or logging. ```python stmt = await connection.prepare('SELECT $1::int') assert stmt.get_query() == "SELECT $1::int" ``` -------------------------------- ### Programmatic SSL Context Configuration (verify-full) Source: https://magicstack.github.io/asyncpg/current/api/index.html Configures an SSL context for secure connections with server certificate verification and client authentication. Equivalent to sslmode=verify-full with specified certificate paths. ```python import asyncpg import asyncio import ssl async def main(): # Load CA bundle for server certificate verification, # equivalent to sslrootcert= in DSN. sslctx = ssl.create_default_context( ssl.Purpose.SERVER_AUTH, cafile="path/to/ca_bundle.pem") # If True, equivalent to sslmode=verify-full, if False: # sslmode=verify-ca. sslctx.check_hostname = True # Load client certificate and private key for client # authentication, equivalent to sslcert= and sslkey= in # DSN. sslctx.load_cert_chain( "path/to/client.cert", keyfile="path/to/client.key", ) con = await asyncpg.connect(user='postgres', ssl=sslctx) await con.close() asyncio.run(main()) ``` -------------------------------- ### Initialize Cursor State Source: https://magicstack.github.io/asyncpg/current/_modules/asyncpg/cursor.html Initializes the cursor's internal state by getting the statement from the connection and attaching it. This is a prerequisite for most cursor operations. ```python async def _init(self, timeout): if self._state is None: self._state = await self._connection._get_statement( self._query, timeout, named=True, record_class=self._record_class, ) self._state.attach() self._check_ready() await self._bind(timeout) return self ``` -------------------------------- ### Connect to PostgreSQL with asyncpg Source: https://magicstack.github.io/asyncpg/current/_modules/asyncpg/connection.html Establishes a connection to a PostgreSQL database using asyncpg. Supports various connection parameters like DSN, host, port, user, password, and SSL settings. It also allows for custom connection and record classes, timeouts, and server-specific configurations. ```python async with compat.timeout(timeout): return await connect_utils._connect( loop=loop, connection_class=connection_class, record_class=record_class, dsn=dsn, host=host, port=port, user=user, password=password, passfile=passfile, service=service, servicefile=servicefile, ssl=ssl, direct_tls=direct_tls, database=database, server_settings=server_settings, command_timeout=command_timeout, statement_cache_size=statement_cache_size, max_cached_statement_lifetime=max_cached_statement_lifetime, max_cacheable_statement_size=max_cacheable_statement_size, target_session_attrs=target_session_attrs, krbsrvname=krbsrvname, gsslib=gsslib, ) ``` -------------------------------- ### Connection Pool Configuration Options Source: https://magicstack.github.io/asyncpg/current/_modules/asyncpg/pool.html Illustrates various configuration options available when creating a connection pool, such as max connections and timeouts. ```python import asyncpg import asyncio async def run_configured_pool(): # Create a connection pool with specific configurations pool = await asyncpg.create_pool( user='postgres', password='password', database='database', host='127.0.0.1', min_size=5, # Minimum number of connections in the pool max_size=10, # Maximum number of connections in the pool max_queries=50000, # Max queries per connection before it's closed max_inactive_connection_lifetime=300.0 # Max lifetime in seconds ) async with pool.acquire() as connection: result = await connection.fetchval('SELECT 2') print(f'Configured pool query result: {result}') await pool.close() if __name__ == '__main__': asyncio.run(run_configured_pool()) ``` -------------------------------- ### PreparedStatement.explain Source: https://magicstack.github.io/asyncpg/current/_modules/asyncpg/prepared_stmt.html Generates and returns the execution plan for the prepared statement. Can optionally include detailed run time statistics if 'analyze' is set to True. ```APIDOC ## explain(*args, analyze=False) ### Description Return the execution plan of the statement. ### Method `explain(*args, analyze=False)` ### Parameters - `*args`: Query arguments. - `analyze` (bool): If ``True``, the statement will be executed and the run time statitics added to the return value. ### Returns - `object`: An object representing the execution plan. This value is actually a deserialized JSON output of the SQL ``EXPLAIN`` command. ``` -------------------------------- ### Acquire Connection with async with Source: https://magicstack.github.io/asyncpg/current/_modules/asyncpg/pool.html Acquires a database connection from the pool using an async with block for automatic release. ```python async with pool.acquire() as con: await con.execute(...) ``` -------------------------------- ### Manual Transaction Management Source: https://magicstack.github.io/asyncpg/current/api/index.html Manually manage transactions using start(), commit(), and rollback() methods. This provides explicit control over transaction lifecycle. ```python tr = connection.transaction() await tr.start() try: ... except: await tr.rollback() raise else: await tr.commit() ``` -------------------------------- ### asyncpg.connect Source: https://magicstack.github.io/asyncpg/current/api/index.html Establishes a connection to a PostgreSQL server. Connection parameters can be provided via a DSN string or keyword arguments. Returns a new `Connection` object. ```APIDOC ## asyncpg.connect ### Description A coroutine to establish a connection to a PostgreSQL server. The connection parameters may be specified either as a connection URI in _dsn_, or as specific keyword arguments, or both. If both _dsn_ and keyword arguments are specified, the latter override the corresponding values parsed from the connection URI. The default values for the majority of arguments can be specified using environment variables. Returns a new `Connection` object. ### Method Corutine ### Endpoint N/A (SDK method) ### Parameters #### Path Parameters N/A #### Query Parameters N/A #### Request Body N/A ### Parameters - **dsn** (string) - Optional - Connection arguments specified using as a single string in the libpq connection URI format: `postgres://user:password@host:port/database?option=value`. The following options are recognized by asyncpg: `host`, `port`, `user`, `database` (or `dbname`), `password`, `passfile`, `sslmode`, `sslcert`, `sslkey`, `sslrootcert`, and `sslcrl`. Unlike libpq, asyncpg will treat unrecognized options as server settings to be used for the connection. - **host** (string) - Optional - Database host address as one of the following: an IP address or a domain name; an absolute path to the directory containing the database server Unix-domain socket (not supported on Windows); a sequence of any of the above, in which case the addresses will be tried in order, and the first successful connection will be returned. - **port** (integer) - Optional - Port number to connect to at the server host (or Unix-domain socket file extension). - **user** (string) - Optional - The name of the database role used for authentication. - **database** (string) - Optional - The name of the database to connect to. - **password** (string or callable) - Optional - Password to be used for authentication, if the server requires one. - **passfile** (string) - Optional - The name of the file used to store passwords. - **service** (string) - Optional - The name of the postgres connection service stored in the postgres connection service file. - **servicefile** (string) - Optional - The location of the connnection service file used to store connection parameters. - **loop** (asyncio event loop) - Optional - An asyncio event loop instance. If `None`, the default event loop will be used. - **timeout** (float) - Optional - Connection timeout in seconds. Defaults to 60. - **statement_cache_size** (integer) - Optional - The size of prepared statement LRU cache. Pass `0` to disable the cache. Defaults to 100. - **max_cached_statement_lifetime** (integer) - Optional - The maximum time in seconds a prepared statement will stay in the cache. Pass `0` to allow statements be cached indefinitely. Defaults to 300. - **max_cacheable_statement_size** (integer) - Optional - The maximum size in bytes of a statement to be cached. Defaults to 15360. - **command_timeout** (float) - Optional - Timeout in seconds for executing a command. - **ssl** (ssl context or boolean) - Optional - SSL context or boolean to control SSL connection. - **direct_tls** (boolean) - Optional - Whether to use direct TLS connection. - **connection_class** (class) - Optional - The class to use for creating connections. Defaults to `asyncpg.connection.Connection`. - **record_class** (class) - Optional - The class to use for creating records. Defaults to `asyncpg.protocol.record.Record`. - **server_settings** (dict) - Optional - Dictionary of server settings to apply to the connection. - **target_session_attrs** (string) - Optional - Target session attributes for the connection. - **krbsrvname** (string) - Optional - The Kerberos service name. - **gsslib** (string) - Optional - The GSSAPI library to use. ### Request Example N/A ### Response #### Success Response - **Connection object** - A new `Connection` object representing the established connection. ``` -------------------------------- ### Pool Connection Holder Initialization Source: https://magicstack.github.io/asyncpg/current/_modules/asyncpg/pool.html Initializes a holder for a connection within the pool. It stores configuration such as maximum queries, setup callbacks, and inactivity timeouts. ```Python class PoolConnectionHolder: __slots__ = ('_con', '_pool', '_loop', '_proxy', '_max_queries', '_setup', '_max_inactive_time', '_in_use', '_inactive_callback', '_timeout', '_generation') def __init__( self, pool: "Pool", *, max_queries: float, setup: Optional[Callable[[PoolConnectionProxy], Awaitable[None]]], max_inactive_time: float, ) -> None: self._pool = pool self._con: Optional[connection.Connection] = None self._proxy: Optional[PoolConnectionProxy] = None self._max_queries = max_queries self._max_inactive_time = max_inactive_time self._setup = setup self._inactive_callback: Optional[Callable] = None self._in_use: Optional[asyncio.Future] = None self._timeout: Optional[float] = None self._generation: Optional[int] = None ``` -------------------------------- ### Basic Connection and Query Execution Source: https://magicstack.github.io/asyncpg/current/_modules/asyncpg/connection.html Establishes a basic connection to a PostgreSQL database and fetches all records from the 'pg_type' table. This snippet demonstrates a common use case for retrieving data. ```python >>> import asyncpg >>> import asyncio >>> async def run(): ... con = await asyncpg.connect(user='postgres') ... types = await con.fetch('SELECT * FROM pg_type') ... print(types) ... >>> asyncio.run(run()) ```