Commit 9586b1f5 authored by ozzeh's avatar ozzeh
Browse files

Refactor schema registry so that it doesn't cache state, use lookups instead

parent 526983cd
schema_registry.egg-info/
.python-version
.idea/
.vscode/
\ No newline at end of file
.vscode/
.env
.env.sh
\ No newline at end of file
[tool.poetry]
name = "schema-registry"
version = "0.1.3"
version = "0.1.4"
description = ""
authors = ["ozzeh <ozzeh@pleaseignore.com>"]
......
......@@ -10,6 +10,8 @@ import boto3
from pydantic import BaseModel
from devtools import debug
from schema_registry.models import (
Event,
_SchemaPageModel,
......@@ -87,9 +89,8 @@ class SchemaRegistry:
self.prefix = prefix
self._schemas: Dict[str, Schema] = {}
self._model_schemas: Dict[Type[BaseModel], _SchemaCreateUpdateModel] = {}
self._load_schemas()
def _load_schemas(self):
def load_schemas(self):
paginator = self.schema_client.get_paginator("list_schemas")
page_options = dict(RegistryName=self.registry_name)
if self.prefix:
......@@ -103,7 +104,15 @@ class SchemaRegistry:
)
def get_schema(self, name) -> Schema:
return self._schemas[name]
schema = Schema(self.schema_client, self.registry_name, name)
return schema
def _get_schema_content_for_model(self, schema_name, model: Type[BaseModel]) -> _SchemaCreateUpdateModel:
opts = dict(RegistryName=self.registry_name, SchemaName=schema_name)
response = self.schema_client.describe_schema(**opts)
schema_info = _SchemaCreateUpdateModel.parse_obj(response)
self._model_schemas[model] = schema_info
return schema_info
def _create_schema_for_model(self, schema_name: str, model: Type[BaseModel]):
opts = dict(
......@@ -114,6 +123,7 @@ class SchemaRegistry:
)
response = self.schema_client.create_schema(**opts)
schema_info = _SchemaCreateUpdateModel.parse_obj(response)
self._model_schemas[model] = schema_info
return schema_info
def _update_schema_for_model(self, schema_name: str, model: Type[BaseModel]):
......@@ -126,31 +136,32 @@ class SchemaRegistry:
try:
response = self.schema_client.update_schema(**opts)
schema_info = _SchemaCreateUpdateModel.parse_obj(response)
self._model_schemas[model] = schema_info
return schema_info
except self.schema_client.exceptions.ConflictException:
return None
except self.schema_client.exceptions.ConflictException as e:
return self._get_schema_content_for_model(schema_name, model)
def _schema_exists(self, schema_name: str) -> bool:
schema_name = "{}.{}".format(namespace, model.__name__)
try:
response = self.schema_client.describe_schema(RegistryName=self.registry_name, SchemaName=schema_name)
response = self.schema_client.describe_schema(
RegistryName=self.registry_name, SchemaName=schema_name
)
return True
except self.schema_client.exceptions.NotFoundException:
return False
except:
raise
def register_reflected_model(self, namespace, model: Type[BaseModel]) -> _SchemaCreateUpdateModel:
def register_reflected_model(
self, namespace, model: Type[BaseModel]
) -> _SchemaCreateUpdateModel:
schema_name = "{}.{}".format(namespace, model.__name__)
exists = self._schema_exists(schema_name)
if not exists:
schema = self._create_schema_for_model(schema_name, model)
else:
schema = self._update_schema_for_model(schema_name, model)
self._model_schemas[model] = schema
return schema
def register_model(
......@@ -158,31 +169,10 @@ class SchemaRegistry:
) -> _SchemaCreateUpdateModel:
schema_name = "{}.{}".format(namespace, model.__name__)
opts = dict(
Content=model.schema_json(),
RegistryName=self.registry_name,
SchemaName=schema_name,
Type="JSONSchemaDraft4",
)
if schema_name not in self._schemas:
response = self.schema_client.create_schema(**opts)
schema_info = _SchemaCreateUpdateModel.parse_obj(response)
self._model_schemas[model] = schema_info
return schema_info
else:
try:
response = self.schema_client.update_schema(**opts)
except self.schema_client.exceptions.ConflictException:
schema_info = self._schemas[schema_name].get()
self._model_schemas[model] = schema_info
return schema_info
else:
schema_info = _SchemaCreateUpdateModel.parse_obj(response)
self._model_schemas[model] = schema_info
return schema_info
schema_info = self.register_reflected_model(namespace, model)
self._model_schemas[model] = schema_info
return schema_info
def schema_for_model(self, model: Type[BaseModel]) -> dict:
if model not in self._model_schemas:
......@@ -212,7 +202,7 @@ class SchemaRegistry:
"schema": schema_info.dict(
include={"schema_arn", "schema_name", "schema_version"}
),
"event": model.dict(),
"event": model.dict(by_alias=True),
}
detail_type_formatted = "{}:{}".format(
......@@ -222,7 +212,7 @@ class SchemaRegistry:
entry = dict(
Source=sender,
Detail=model.json(),
Detail=model.json(by_alias=True),
Resources=resources,
DetailType=detail_type_formatted,
EventBusName=event_bus,
......
......@@ -13,14 +13,21 @@ class _AWSResponseModel(BaseModel):
alias_generator = camel_generator
class _SchemaCreateUpdateModel(_AWSResponseModel):
description: Optional[str]
last_modified: datetime
class _SchemaModel(_AWSResponseModel):
last_modified: Optional[datetime]
schema_arn: str
schema_name: str
tags: Optional[Dict[str, str]]
version_count: int = 1
class _SchemaVersionModel(_SchemaModel):
schema_version: str
tags: Dict[str, str]
type_: Literal["OpenApi3", "JSONSchemaDraft4"] = Field(..., alias="Type")
class _SchemaCreateUpdateModel(_SchemaVersionModel):
description: Optional[str]
version_created_date: datetime
......@@ -36,21 +43,6 @@ class _SchemaContentModel(_SchemaCreateUpdateModel):
return self._content
class _SchemaVersionModel(_AWSResponseModel):
schema_arn: str
schema_name: str
schema_version: str
type_: Literal["OpenApi3", "JSONSchemaDraft4"] = Field(..., alias="Type")
class _SchemaModel(_AWSResponseModel):
last_modified: datetime
schema_arn: str
schema_name: str
tags: Dict[str, str]
version_count: int
class _SchemaVersionsPageModel(_AWSResponseModel):
schema_versions: List[_SchemaVersionModel]
......
......@@ -3,9 +3,3 @@ import boto3
from schema_registry import SchemaRegistry, Schema, SchemaReflector
from devtools import debug
def test_loading_schemas():
registry = SchemaRegistry("TAPI-TEST")
schema: Schema = registry._schemas["schema_registry.test.TestingModel"]
debug(schema.get())
reflector: SchemaReflector = SchemaReflector(schema.get().content_dict)
......@@ -19,22 +19,19 @@ log.setLevel(logging.WARNING)
log.propagate = True
@pytest.fixture(scope="module")
@pytest.fixture(scope="session")
def named_registry():
_registry = SchemaRegistry(registry_name="TAPI-TEST")
yield _registry
@pytest.fixture(scope="module")
@pytest.fixture(scope="session")
def empty_model():
item = {
"title": "RoleAddedEvent",
"type": "object",
"properties": {}
}
item = {"title": "RoleAddedEvent", "type": "object", "properties": {}}
yield item
@pytest.fixture(scope="module")
@pytest.fixture(scope="session")
def simple_model():
item = {
"title": "TestingModel",
......@@ -48,7 +45,7 @@ def simple_model():
return item
@pytest.fixture(scope="module")
@pytest.fixture(scope="session")
def complex_model():
item = {
"title": "ComplexModel",
......@@ -79,7 +76,38 @@ def complex_model():
yield item
@pytest.fixture(scope="module")
@pytest.fixture(scope="session")
def complex_model_blank_array():
item = {
"title": "ComplexModel",
"description": "Hi mom",
"type": "object",
"properties": {
"name": {"title": "Name", "type": "string"},
"description": {"title": "Description", "type": "string"},
"groups": {
"title": "Groups",
"type": "array",
"items": {},
},
},
"required": ["name", "groups"],
"definitions": {
"Group": {
"title": "Group",
"type": "object",
"properties": {
"id": {"title": "Id", "type": "integer"},
"name": {"title": "Name", "type": "string"},
},
"required": ["id", "name"],
}
},
}
yield item
@pytest.fixture(scope="session")
def complex_referenced_model():
item = {
"title": "ComplexReferencedModel",
......@@ -115,7 +143,7 @@ def complex_referenced_model():
yield item
@pytest.fixture(scope="module")
@pytest.fixture(scope="session")
def reflected_empty_model(empty_model, named_registry):
from schema_registry.reflection import SchemaReflector
......@@ -124,7 +152,7 @@ def reflected_empty_model(empty_model, named_registry):
yield model
@pytest.fixture(scope="module")
@pytest.fixture(scope="session")
def reflected_simple_model(simple_model, named_registry):
from schema_registry.reflection import SchemaReflector
......@@ -133,7 +161,7 @@ def reflected_simple_model(simple_model, named_registry):
yield model
@pytest.fixture(scope="module")
@pytest.fixture(scope="session")
def reflected_complex_model(complex_model, named_registry):
from schema_registry.reflection import SchemaReflector
......@@ -142,7 +170,16 @@ def reflected_complex_model(complex_model, named_registry):
yield model
@pytest.fixture(scope="module")
@pytest.fixture(scope="session")
def reflected_complex_model_blank_array(complex_model_blank_array, named_registry):
from schema_registry.reflection import SchemaReflector
model = SchemaReflector(complex_model_blank_array).create_model_for_jsonschema()
named_registry.register_model("com.pleaseignore.tvm.test.reflection", model)
yield model
@pytest.fixture(scope="session")
def reflected_complex_referenced_model(complex_referenced_model, named_registry):
from schema_registry.reflection import SchemaReflector
......@@ -175,6 +212,42 @@ def test_event_reflection():
event_json = '{"version": "0", "id": "d944d595-b186-4b86-43fe-b096d7e13bb3", "detail-type": "TAPI-TEST/schema_registry.test.TestingModel:1", "source": "com.pleaseignore.tvm.test", "account": "740218546536", "time": "2020-11-27T16:53:00Z", "region": "eu-west-1", "resources": ["pydantic-schema-registry"], "detail": {"name": "ozzeh", "description": "big willy johnston"}}'
event = Event.parse_raw(event_json)
model = reflect_event(json.loads(event_json))
debug(model)
debug(model.__fields__)
debug(model.__detail_type__)
def test_notification_reflection():
event_data = {
"version": "0",
"id": "044c0eb6-c449-c274-b0a6-45474db730f8",
"detail-type": "TAPI/auth.notifications.StructureFuelAlert:1",
"source": "com.pleaseignore.auth",
"account": "740218546536",
"time": "2020-12-21T20:34:26Z",
"region": "eu-west-1",
"resources": [
"pydantic-schema-registry",
],
"detail": {
"is_read": None,
"notification_id": 1352212309,
"sender_id": 1000137,
"sender_type": "corporation",
"timestamp": "2020-12-21T19:52:00+00:00",
"type": "StructureFuelAlert",
"listOfTypesAndQty": [
[
308,
4247,
],
],
"solarsystemID": 30003146,
"structureID": 1025844785540,
"structureShowInfoData": [
"showinfo",
35835,
1025844785540,
],
"structureTypeID": 35835,
},
}
model = reflect_event(event_data)
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment