### Start Server Source: https://github.com/cloudevents/sdk-python/blob/main/samples/http-image-cloudevents/README.md Starts the local server to receive CloudEvents. ```sh python image_sample_server.py ``` -------------------------------- ### Install Dependencies Source: https://github.com/cloudevents/sdk-python/blob/main/samples/http-image-cloudevents/README.md Installs the necessary Python packages for the sample. ```sh pip install -r requirements.txt ``` -------------------------------- ### Install CloudEvents SDK Source: https://github.com/cloudevents/sdk-python/blob/main/README.md Installs the CloudEvents SDK using pip. ```bash pip install cloudevents ``` -------------------------------- ### Start server Source: https://github.com/cloudevents/sdk-python/blob/main/samples/http-json-cloudevents/README.md Starts the local HTTP server to receive CloudEvents. ```shell python json_sample_server.py ``` -------------------------------- ### v2 Event Creation Source: https://github.com/cloudevents/sdk-python/blob/main/MIGRATION.md Example of creating a CloudEvent in v2. ```python from cloudevents.core.v1.event import CloudEvent # id, specversion, and time are auto-generated (just like v1) event = CloudEvent( attributes={"type": "com.example.test", "source": "/myapp"}, data={"message": "Hello"}, ) ``` -------------------------------- ### v1 Event Creation Source: https://github.com/cloudevents/sdk-python/blob/main/MIGRATION.md Example of creating a CloudEvent in v1. ```python from cloudevents.http import CloudEvent # id, specversion, and time are auto-generated event = CloudEvent( {"type": "com.example.test", "source": "/myapp"}, data={"message": "Hello"}, ) ``` -------------------------------- ### CloudEvents Spec v0.3 Event Creation in v2 Source: https://github.com/cloudevents/sdk-python/blob/main/MIGRATION.md Example of creating a v0.3 CloudEvent using the dedicated class in v2. ```python from cloudevents.core.v03.event import CloudEvent event = CloudEvent( attributes={ "type": "com.example.test", "source": "/myapp", "id": "123", "specversion": "0.3", "schemaurl": "https://example.com/schema", # v0.3-specific (renamed to dataschema in v1.0) }, ) ``` -------------------------------- ### Run Tests Source: https://github.com/cloudevents/sdk-python/blob/main/samples/http-image-cloudevents/README.md Executes the test suite for the sample. ```sh pytest ``` -------------------------------- ### Run Client Source: https://github.com/cloudevents/sdk-python/blob/main/samples/http-image-cloudevents/README.md Runs the client code to send a CloudEvent to the local server. ```sh python client.py http://localhost:3000/ ``` -------------------------------- ### Sending Binary HTTP CloudEvent Source: https://github.com/cloudevents/sdk-python/blob/main/README.md Demonstrates how to create and send a CloudEvent in binary HTTP format using the requests library. ```python from cloudevents.core.v1.event import CloudEvent from cloudevents.core.bindings.http import to_binary_event import requests # Create a CloudEvent # - The CloudEvent "id" is generated if omitted. "specversion" defaults to "1.0". attributes = { "type": "com.example.sampletype1", "source": "https://example.com/event-producer", } data = {"message": "Hello World!"} event = CloudEvent(attributes, data) # Creates the HTTP request representation of the CloudEvent in binary content mode message = to_binary_event(event) # POST requests.post("", data=message.body, headers=message.headers) ``` -------------------------------- ### v2 Exception Hierarchy Source: https://github.com/cloudevents/sdk-python/blob/main/MIGRATION.md Illustrates the more granular, typed exceptions introduced in v2. ```python from cloudevents.core.exceptions import ( BaseCloudEventException, # Base for all CloudEvent errors CloudEventValidationError, # Aggregated validation errors (raised on construction) MissingRequiredAttributeError, # Missing required attribute (also a ValueError) InvalidAttributeTypeError, # Wrong attribute type (also a TypeError) InvalidAttributeValueError, # Invalid attribute value (also a ValueError) CustomExtensionAttributeError, # Invalid extension name (also a ValueError) ) ``` -------------------------------- ### Sending Structured HTTP CloudEvent Source: https://github.com/cloudevents/sdk-python/blob/main/README.md Demonstrates how to create and send a CloudEvent in structured HTTP format using the requests library. ```python from cloudevents.core.v1.event import CloudEvent from cloudevents.core.bindings.http import to_structured_event import requests # Create a CloudEvent # - The CloudEvent "id" is generated if omitted. "specversion" defaults to "1.0". attributes = { "type": "com.example.sampletype2", "source": "https://example.com/event-producer", } data = {"message": "Hello World!"} event = CloudEvent(attributes, data) # Creates the HTTP request representation of the CloudEvent in structured content mode message = to_structured_event(event) # POST requests.post("", data=message.body, headers=message.headers) ``` -------------------------------- ### Receiving CloudEvents with Flask Source: https://github.com/cloudevents/sdk-python/blob/main/README.md Shows how to consume a CloudEvent using the Flask web framework. ```python from flask import Flask, request from cloudevents.core.bindings.http import from_http_event, HTTPMessage app = Flask(__name__) # create an endpoint at http://localhost:/3000/ @app.route("/", methods=["POST"]) def home(): # create a CloudEvent event = from_http_event(HTTPMessage(dict(request.headers), request.get_data())) # you can access cloudevent fields as seen below print( f"Found {event.get_id()} from {event.get_source()} with type " f"{event.get_type()} and specversion {event.get_specversion()}" ) return "", 204 if __name__ == "__main__": app.run(port=3000) ``` -------------------------------- ### Custom Serialization Formats (v2) Source: https://github.com/cloudevents/sdk-python/blob/main/MIGRATION.md Implementing the `Format` protocol for custom serialization in v2. ```python from cloudevents.core.formats.base import Format from cloudevents.core.base import BaseCloudEvent, EventFactory class YAMLFormat: """Example custom format -- implement the Format protocol.""" def read( self, event_factory: EventFactory | None, data: str | bytes, ) -> BaseCloudEvent: ... # Parse YAML into attributes dict, call event_factory(attributes, data) def write(self, event: BaseCloudEvent) -> bytes: ... # Serialize entire event to YAML bytes def write_data( self, data: dict | str | bytes | None, datacontenttype: str | None, ) -> bytes: ... # Serialize just the data payload def read_data( self, body: bytes, datacontenttype: str | None, ) -> dict | str | bytes | None: ... # Deserialize just the data payload def get_content_type(self) -> str: return "application/cloudevents+yaml" ``` ```python from cloudevents.core.bindings.http import to_binary message = to_binary(event, event_format=YAMLFormat()) ``` -------------------------------- ### v1 Exception Hierarchy Source: https://github.com/cloudevents/sdk-python/blob/main/MIGRATION.md Shows the exception classes used in v1 for error handling. ```python from cloudevents.exceptions import ( GenericException, MissingRequiredFields, InvalidRequiredFields, DataMarshallerError, DataUnmarshallerError, ) ``` -------------------------------- ### Kafka Binding - Serializing (v1) Source: https://github.com/cloudevents/sdk-python/blob/main/MIGRATION.md Serializing CloudEvents to Kafka messages using v1 functions. ```python from cloudevents.kafka import to_binary, KafkaMessage kafka_msg = to_binary(event) # kafka_msg is a NamedTuple: .headers, .key, .value ``` -------------------------------- ### AMQP Binding (v2) Source: https://github.com/cloudevents/sdk-python/blob/main/MIGRATION.md Serializing and deserializing CloudEvents to/from AMQP 1.0 messages using v2 functions. ```python from cloudevents.core.v1.event import CloudEvent from cloudevents.core.bindings.amqp import ( AMQPMessage, to_binary_event, from_amqp_event, ) # Serialize: attributes go to application_properties with cloudEvents_ prefix amqp_msg = to_binary_event(event) # amqp_msg.properties - AMQP message properties (e.g. content-type) # amqp_msg.application_properties - CloudEvent attributes # amqp_msg.application_data - event data as bytes # Deserialize: auto-detects binary vs structured event = from_amqp_event(amqp_msg) ``` -------------------------------- ### Kafka Binding - Serializing (v2) Source: https://github.com/cloudevents/sdk-python/blob/main/MIGRATION.md Serializing CloudEvents to Kafka messages using v2 functions, including custom key mapping. ```python from cloudevents.core.bindings.kafka import to_binary_event, KafkaMessage kafka_msg = to_binary_event(event) # kafka_msg is a frozen dataclass: .headers, .key, .value # Custom key mapping kafka_msg = to_binary_event( event, key_mapper=lambda e: e.get_extension("partitionkey"), ) ``` -------------------------------- ### HTTP Binding - Serializing Events (v1) Source: https://github.com/cloudevents/sdk-python/blob/main/MIGRATION.md Serializing CloudEvents to binary or structured HTTP payloads using v1 functions. ```python from cloudevents.conversion import to_binary, to_structured # Returns a (headers, body) tuple headers, body = to_binary(event) headers, body = to_structured(event) ``` -------------------------------- ### Custom Serialization Formats (v1) Source: https://github.com/cloudevents/sdk-python/blob/main/MIGRATION.md Using custom marshaller/unmarshaller callbacks for non-JSON formats in v1. ```python # v1: pass callbacks directly headers, body = to_binary(event, data_marshaller=yaml.dump) event = from_http(headers, body, data_unmarshaller=yaml.safe_load) ``` -------------------------------- ### Kafka Binding - Deserializing (v1) Source: https://github.com/cloudevents/sdk-python/blob/main/MIGRATION.md Deserializing Kafka messages back into CloudEvents using v1 functions. ```python from cloudevents.kafka import from_binary, KafkaMessage msg = KafkaMessage(headers=headers, key=key, value=value) event = from_binary(msg) ``` -------------------------------- ### HTTP Binding - Deserializing Events (v1) Source: https://github.com/cloudevents/sdk-python/blob/main/MIGRATION.md Deserializing HTTP payloads back into CloudEvents using v1 functions. ```python from cloudevents.http import from_http # Auto-detects binary vs structured from headers event = from_http(headers, body) ``` -------------------------------- ### Kafka Binding - Deserializing (v2) Source: https://github.com/cloudevents/sdk-python/blob/main/MIGRATION.md Deserializing Kafka messages back into CloudEvents using v2 functions. ```python from cloudevents.core.bindings.kafka import from_kafka_event, KafkaMessage msg = KafkaMessage(headers=headers, key=key, value=value) # Auto-detects binary vs structured and spec version event = from_kafka_event(msg) ``` -------------------------------- ### HTTP Binding - Serializing Events (v2) Source: https://github.com/cloudevents/sdk-python/blob/main/MIGRATION.md Serializing CloudEvents to binary or structured HTTP payloads using v2 functions, including custom format usage. ```python from cloudevents.core.bindings.http import to_binary_event, to_structured_event # Returns an HTTPMessage dataclass with .headers and .body message = to_binary_event(event) message = to_structured_event(event) # Use in HTTP requests requests.post(url, headers=message.headers, data=message.body) ``` ```python from cloudevents.core.bindings.http import to_binary, to_structured from cloudevents.core.formats.json import JSONFormat message = to_binary(event, event_format=JSONFormat()) message = to_structured(event, event_format=JSONFormat()) ``` -------------------------------- ### v1 Attribute Access Source: https://github.com/cloudevents/sdk-python/blob/main/MIGRATION.md Demonstrates dict-like access and modification of attributes in v1 CloudEvents. ```python # Dict-like access source = event["source"] event["subject"] = "my-subject" del event["subject"] # Iteration for attr_name in event: print(attr_name, event[attr_name]) # Membership test if "subject" in event: pass ``` -------------------------------- ### Sign your work Source: https://github.com/cloudevents/sdk-python/blob/main/CONTRIBUTING.md When submitting a pull request, ensure your commits are signed using the --signoff flag. ```console git commit --signoff ``` -------------------------------- ### HTTP Binding - Deserializing Events (v2) Source: https://github.com/cloudevents/sdk-python/blob/main/MIGRATION.md Deserializing HTTP payloads back into CloudEvents using v2 functions, including explicit content mode selection. ```python from cloudevents.core.bindings.http import from_http_event, HTTPMessage # Wrap raw headers/body into an HTTPMessage first message = HTTPMessage(headers=headers, body=body) # Auto-detects binary vs structured and spec version (v1.0 / v0.3) event = from_http_event(message) ``` ```python from cloudevents.core.bindings.http import from_binary_event, from_structured_event event = from_binary_event(message) event = from_structured_event(message) ``` -------------------------------- ### v2 Attribute Access Source: https://github.com/cloudevents/sdk-python/blob/main/MIGRATION.md Shows how to access attributes using explicit getter methods in v2 CloudEvents, including extension attributes and retrieving all attributes. ```python # Explicit getters for required attributes source = event.get_source() event_type = event.get_type() event_id = event.get_id() specversion = event.get_specversion() # Explicit getters for optional attributes (return None if absent) subject = event.get_subject() time = event.get_time() datacontenttype = event.get_datacontenttype() dataschema = event.get_dataschema() # Extension attributes custom_value = event.get_extension("myextension") # All attributes as a dict attrs = event.get_attributes() ``` -------------------------------- ### Handling CloudEventValidationError in v2 Source: https://github.com/cloudevents/sdk-python/blob/main/MIGRATION.md Demonstrates how to catch and inspect `CloudEventValidationError` which aggregates multiple validation failures. ```python try: event = CloudEvent(attributes={"source": "/test"}) # missing type except CloudEventValidationError as e: # e.errors is a dict[str, list[BaseCloudEventException]] for attr_name, errors in e.errors.items(): print(f"{attr_name}: {errors}") ``` === COMPLETE CONTENT === This response contains all available snippets from this library. No additional content exists. Do not make further requests.