Commit 1ab5eed9 authored by ozzeh's avatar ozzeh
Browse files

add reflect_event function

parent e224e9ac
from .client import SchemaRegistry, Schema
from .reflection import SchemaReflector
from .reflection import SchemaReflector, reflect_event
from .models import Event
from .errors import SchemaRegistryError, ModelNotRegisteredError
......@@ -11,7 +11,8 @@ import boto3
from pydantic import BaseModel
from .models import (
from schema_registry.models import (
Event,
_SchemaPageModel,
_SchemaVersionsPageModel,
_SchemaVersionModel,
......@@ -19,9 +20,7 @@ from .models import (
_SchemaCreateUpdateModel,
)
from .reflection import SchemaReflector
from .errors import SchemaRegistryError, ModelNotRegisteredError
from schema_registry.errors import SchemaRegistryError, ModelNotRegisteredError
logger = logging.getLogger("schema_registry")
......@@ -71,9 +70,6 @@ class Schema:
else:
return self._versions[version]
def reflect(self, version=None) -> SchemaReflector:
schema: _SchemaContentModel = self.get(version)
return SchemaReflector(schema.content_dict)
def __repr__(self):
return f"Schema<{self.schema_name}, versions: {len(self._versions)}, default version: {self.default_version}>"
......@@ -108,6 +104,10 @@ class SchemaRegistry:
self.schema_client, self.registry_name, schema.schema_name
)
def get_schema(self, name) -> Schema:
return self._schemas.get(name)
def register_model(
self, namespace, model: Type[BaseModel]
) -> _SchemaCreateUpdateModel:
......@@ -146,10 +146,6 @@ class SchemaRegistry:
return self._model_schemas[model].dict(
include=set(["schema_arn", "schema_name", "schema_version"])
)
def reflect_event(self) -> Dict[str, SchemaReflector]:
pass
def send_event(
......
......@@ -60,11 +60,11 @@ class _SchemaPageModel(_AWSResponseModel):
class Event(BaseModel):
event_version: str
event_version: str = Field(..., alias="version")
id: str
detail_type: str = Field(..., alias="detail-type")
detail: Json
detail: dict
source: str
account: str
......@@ -82,5 +82,5 @@ class Event(BaseModel):
return self.detail_type.split(":")[1]
@property
def model_name(self) -> str:
def schema_name(self) -> str:
return self.detail_type.split("/")[1].split(":")[0]
\ No newline at end of file
......@@ -3,6 +3,21 @@ from pydantic import BaseModel, create_model, Field
from devtools import debug
from jsonpointer import resolve_pointer
from schema_registry.models import Event
from schema_registry.client import SchemaRegistry
def reflect_event(event_dict: dict) -> BaseModel:
event: Event = Event.parse_obj(event_dict)
registry: SchemaRegistry = SchemaRegistry(event.schema_registry)
schema = registry.get_schema(event.schema_name)
version = schema.get(version=event.schema_version)
reflector = SchemaReflector(version.content_dict)
model = reflector.create_model_for_jsonschema()
setattr(model, "__detail_type__", event_dict.get("detail-type"))
return model.parse_obj(event_dict.get("detail"))
class _ReflectedModel(BaseModel):
class Config:
......@@ -10,11 +25,12 @@ class _ReflectedModel(BaseModel):
class SchemaReflector:
def __init__(self, schema):
def __init__(self, schema, registry=None):
self.schema = schema
self.fields = {}
self.references = {}
self.definitions = {}
self.registry = registry
def _resolve_reference(self, ref_value):
if ref_value.startswith("#"):
......
from typing import Optional
import json
import pytest
import boto3
from typing import List
from schema_registry import SchemaRegistry
from schema_registry import SchemaRegistry, Event, reflect_event
from pydantic import BaseModel
import logging
......@@ -109,7 +111,6 @@ def reflected_simple_model(simple_model, named_registry):
from schema_registry.reflection import SchemaReflector
model = SchemaReflector(simple_model).create_model_for_jsonschema()
debug(model.__fields__)
named_registry.register_model("com.pleaseignore.tvm.test.reflection", model)
yield model
......@@ -119,7 +120,6 @@ def reflected_complex_model(complex_model, named_registry):
from schema_registry.reflection import SchemaReflector
model = SchemaReflector(complex_model).create_model_for_jsonschema()
debug(model.__fields__)
named_registry.register_model("com.pleaseignore.tvm.test.reflection", model)
yield model
......@@ -129,7 +129,6 @@ def reflected_complex_referenced_model(complex_referenced_model, named_registry)
from schema_registry.reflection import SchemaReflector
model = SchemaReflector(complex_referenced_model).create_model_for_jsonschema()
debug(model.__fields__)
named_registry.register_model("com.pleaseignore.tvm.test.reflection", model)
yield model
......@@ -147,4 +146,12 @@ def test_complex_referenced_model_reflection(reflected_complex_referenced_model)
def test_simple_reflection_registration(reflected_simple_model, named_registry):
debug(named_registry.schema_for_model(reflected_simple_model))
named_registry.schema_for_model(reflected_simple_model)
def test_event_reflection():
event_json = '{"version": "0", "id": "d944d595-b186-4b86-43fe-b096d7e13bb3", "detail-type": "TAPI-TEST/schema_registry.test.TestingModel:1", "source": "com.pleaseignore.tvm.test", "account": "740218546536", "time": "2020-11-27T16:53:00Z", "region": "eu-west-1", "resources": ["pydantic-schema-registry"], "detail": {"name": "ozzeh", "description": "big willy johnston"}}'
event = Event.parse_raw(event_json)
model = reflect_event(json.loads(event_json))
debug(model)
debug(model.__fields__)
debug(model.__detail_type__)
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment