add basic sending

import json
import sys
import logging
from typing import List, Optional, Dict, Type
from datetime import datetime
......@@ -72,6 +74,8 @@ class Schema:
class SchemaRegistry:
standard_resources = ["pydantic-schema-registry", ]
def __init__(
self, registry_name: Optional[str] = None, *, prefix: str = None, **boto_opts
......@@ -137,3 +141,24 @@ class SchemaRegistry:
return self._model_schemas[model].dict(
include=set(["schema_arn", "schema_name", "schema_version"])
def send_event(self, event_bus, sender, model: BaseModel, extra_resources: List[str] = None):
cls = model.__class__
if cls not in self._model_schemas:
self.register_model(sender, cls)
schema_info: _SchemaCreateUpdateModel = self._model_schemas[cls]
resources = self.standard_resources
if extra_resources:
resources += extra_resources
event_data = {
"schema": schema_info.dict(include=set(["schema_arn", "schema_name", "schema_version"])),
"event": model.dict()
entry = dict(Source=sender, Detail=json.dumps(event_data), Resources=resources, DetailType=cls.__name__, EventBusName=event_bus)
response = self.session.client("events").put_events(Entries=[entry,])
......@@ -66,3 +66,9 @@ def test_registered_model(test_model, named_registry):
def test_complex_mode(complex_model, named_registry):
named_registry.register_model("com.pleaseignore.tvm.test", complex_model)
def test_send_simple_message(test_model, named_registry):
instance = test_model(name="Test", description="Hello!")
named_registry.send_event("auth-dev", "com.pleaseignore.tvm.test", instance)
\ No newline at end of file
