Commit c4e3e1a6 authored by ozzeh's avatar ozzeh
Browse files

Checkpoint

parent 2bedcf87
from .client import SchemaRegistry
from .client import SchemaRegistry, Schema
from .reflection import SchemaReflector
from .errors import SchemaRegistryError, ModelNotRegisteredError
......@@ -19,6 +19,8 @@ from .models import (
_SchemaCreateUpdateModel,
)
from .reflection import SchemaReflector
from .errors import SchemaRegistryError, ModelNotRegisteredError
logger = logging.getLogger("schema_registry")
......@@ -63,12 +65,16 @@ class Schema:
content: _SchemaContentModel = _SchemaContentModel.parse_obj(response)
return content
def get(self, version=None):
def get(self, version=None) -> _SchemaContentModel:
if not version:
return self._versions[self.default_version]
else:
return self._versions[version]
def reflect(self, version=None) -> SchemaReflector:
schema: _SchemaContentModel = self.get(version)
return SchemaReflector(schema.content_dict)
def __repr__(self):
return f"Schema<{self.schema_name}, versions: {len(self._versions)}, default version: {self.default_version}>"
......@@ -103,9 +109,9 @@ class SchemaRegistry:
)
def register_model(
self, sender: str, model: Type[BaseModel]
self, namespace, model: Type[BaseModel]
) -> _SchemaCreateUpdateModel:
schema_name = "{}@{}".format(sender, model.__name__)
schema_name = "{}.{}".format(namespace, model.__name__)
opts = dict(
Content=model.schema_json(),
......@@ -113,9 +119,6 @@ class SchemaRegistry:
SchemaName=schema_name,
Type="JSONSchemaDraft4",
)
description = model.schema().get("description")
if description:
opts["Description"] = description
if schema_name not in self._schemas:
response = self.schema_client.create_schema(**opts)
......@@ -150,7 +153,9 @@ class SchemaRegistry:
cls = model.__class__
if cls not in self._model_schemas:
self.register_model(sender, cls)
raise ValueError(
f"Model {model.__class__.__name__} must be registered before it can be sent"
)
schema_info: _SchemaCreateUpdateModel = self._model_schemas[cls]
......@@ -165,11 +170,16 @@ class SchemaRegistry:
"event": model.dict(),
}
detail_type_formatted = "{}:{}".format(
event_data["schema"]["schema_arn"].split("/", 1)[1],
event_data["schema"]["schema_version"],
)
entry = dict(
Source=sender,
Detail=json.dumps(event_data),
Detail=json.dumps(model.dict()),
Resources=resources,
DetailType=cls.__name__,
DetailType=detail_type_formatted,
EventBusName=event_bus,
)
response = self.session.client("events").put_events(
......
......@@ -28,6 +28,7 @@ class _SchemaContentModel(_SchemaCreateUpdateModel):
content: str
_content: dict = PrivateAttr({})
@property
def content_dict(self):
if not self._content:
self._content = json.loads(self.content)
......
......@@ -32,28 +32,31 @@ class SchemaReflector:
if item_type == "$ref":
referenced_type = self._resolve_reference(item_value)
return (referenced_type if required else Optional[referenced_type], ... if required else None)
return (referenced_type, ... if required else None)
raise NotImplementedError("Cannot reflect type: {}".format(item_type))
def _resolve_property(self, name, property_info, required=True) -> tuple:
if "type" in property_info:
if "type" in property_info:
type_ = property_info.get("type")
if type_ == "string":
return (str if required else Optional[bool], ... if required else None)
return (str, ... if required else None)
elif type_ == "boolean":
return (bool if required else Optional[bool], ... if required else None)
return (bool, ... if required else None)
elif type_ in ("number", "integer"):
return (int if required else Optional[int], ... if required else None)
return (int, ... if required else None)
elif type_ == "array":
return self._resolve_array(property_info, required)
else:
if "$ref" in property_info:
return (self._resolve_reference(property_info["$ref"]), ... if required else None)
return (
self._resolve_reference(property_info["$ref"]),
... if required else None,
)
raise NotImplementedError(f"Not able to reflect the type: {property_info}")
......
import boto3
from schema_registry import SchemaRegistry, Schema, SchemaReflector
from devtools import debug
def test_loading_schemas():
registry = SchemaRegistry("TAPI-TEST")
schema: Schema = registry._schemas["schema_registry.test.TestingModel"]
debug(schema.get())
reflector: SchemaReflector = schema.reflect()
model = reflector.create_model_for_jsonschema()
debug(model.__fields__)
instance = model(name="ozzeh", description="big willy johnston")
registry.register_model("schema_registry.test", model)
registry.send_event("auth-dev", "com.pleaseignore.tvm.test", instance)
......@@ -89,7 +89,16 @@ def complex_referenced_model():
"name": {"title": "Name", "type": "string"},
},
"required": ["id", "name"],
}
},
"ReferencedGroup2": {
"title": "ReferencedGroup2",
"type": "object",
"properties": {
"id": {"title": "Id", "type": "integer"},
"name": {"title": "Name", "type": "string"},
},
"required": ["id", "name"],
},
},
}
yield item
......@@ -101,7 +110,7 @@ def reflected_simple_model(simple_model, named_registry):
model = SchemaReflector(simple_model).create_model_for_jsonschema()
debug(model.__fields__)
named_registry.register_model("com.pleaseignore.tvm.test", model)
named_registry.register_model("com.pleaseignore.tvm.test.reflection", model)
yield model
......@@ -111,7 +120,7 @@ def reflected_complex_model(complex_model, named_registry):
model = SchemaReflector(complex_model).create_model_for_jsonschema()
debug(model.__fields__)
named_registry.register_model("com.pleaseignore.tvm.test", model)
named_registry.register_model("com.pleaseignore.tvm.test.reflection", model)
yield model
......@@ -121,7 +130,7 @@ def reflected_complex_referenced_model(complex_referenced_model, named_registry)
model = SchemaReflector(complex_referenced_model).create_model_for_jsonschema()
debug(model.__fields__)
named_registry.register_model("com.pleaseignore.tvm.test", model)
named_registry.register_model("com.pleaseignore.tvm.test.reflection", model)
yield model
......@@ -132,10 +141,10 @@ def test_simple_reflection(reflected_simple_model):
def test_complex_reflection(reflected_complex_model):
pass
def test_complex_referenced_model_reflection(reflected_complex_referenced_model):
pass
def test_simple_reflection_registration(reflected_simple_model, named_registry):
debug(named_registry.schema_for_model(reflected_simple_model))
......@@ -52,8 +52,6 @@ def complex_model():
name: str
class ComplexModel(BaseModel):
"""Hi mom"""
name: str
description: Optional[str]
groups: List[Group]
......@@ -68,7 +66,6 @@ def complex_model_with_references():
name: str
class ComplexReferencedModel(BaseModel):
"""Hi mom"""
name: str
description: Optional[str]
......@@ -82,7 +79,7 @@ def test_load_named_schemas(test_model, named_registry):
def test_schema_registration(test_model, named_registry):
schema_info = named_registry.register_model("com.pleaseignore.tvm.test", test_model)
schema_info = named_registry.register_model("schema_registry.test", test_model)
def test_registered_model(test_model, named_registry):
......@@ -90,20 +87,18 @@ def test_registered_model(test_model, named_registry):
def test_complex_model(complex_model, named_registry):
named_registry.register_model("com.pleaseignore.tvm.test", complex_model)
named_registry.register_model("schema_registry.test", complex_model)
def test_nullable_model(nullable_model, named_registry):
named_registry.register_model("com.pleaseignore.tvm.test", nullable_model)
named_registry.register_model("schema_registry.test", nullable_model)
def test_complex_model_with_references(complex_model_with_references, named_registry):
named_registry.register_model(
"com.pleaseignore.tvm.test", complex_model_with_references
)
named_registry.register_model("schema_registry.test", complex_model_with_references)
def test_send_simple_message(test_model, named_registry):
instance = test_model(name="Test", description="Hello!")
log.debug(instance)
named_registry.send_event("auth-dev", "com.pleaseignore.tvm.test", instance)
named_registry.send_event("auth-dev", "schema_registry.test", instance)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment