Skip to content
GitLab
Menu
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
test-it
Pydantic Schema Registry
Commits
5eaa35fe
Commit
5eaa35fe
authored
Nov 25, 2020
by
ozzeh
Browse files
Blacken
parent
a15c07f0
Changes
2
Hide whitespace changes
Inline
Side-by-side
schema_registry/client.py
View file @
5eaa35fe
...
...
@@ -74,7 +74,9 @@ class Schema:
class
SchemaRegistry
:
standard_resources
=
[
"pydantic-schema-registry"
,
]
standard_resources
=
[
"pydantic-schema-registry"
,
]
def
__init__
(
self
,
registry_name
:
Optional
[
str
]
=
None
,
*
,
prefix
:
str
=
None
,
**
boto_opts
...
...
@@ -142,7 +144,9 @@ class SchemaRegistry:
include
=
set
([
"schema_arn"
,
"schema_name"
,
"schema_version"
])
)
def
send_event
(
self
,
event_bus
,
sender
,
model
:
BaseModel
,
extra_resources
:
List
[
str
]
=
None
):
def
send_event
(
self
,
event_bus
,
sender
,
model
:
BaseModel
,
extra_resources
:
List
[
str
]
=
None
):
cls
=
model
.
__class__
if
cls
not
in
self
.
_model_schemas
:
...
...
@@ -155,10 +159,21 @@ class SchemaRegistry:
resources
+=
extra_resources
event_data
=
{
"schema"
:
schema_info
.
dict
(
include
=
set
([
"schema_arn"
,
"schema_name"
,
"schema_version"
])),
"event"
:
model
.
dict
()
"schema"
:
schema_info
.
dict
(
include
=
set
([
"schema_arn"
,
"schema_name"
,
"schema_version"
])
),
"event"
:
model
.
dict
(),
}
entry
=
dict
(
Source
=
sender
,
Detail
=
json
.
dumps
(
event_data
),
Resources
=
resources
,
DetailType
=
cls
.
__name__
,
EventBusName
=
event_bus
)
response
=
self
.
session
.
client
(
"events"
).
put_events
(
Entries
=
[
entry
,])
entry
=
dict
(
Source
=
sender
,
Detail
=
json
.
dumps
(
event_data
),
Resources
=
resources
,
DetailType
=
cls
.
__name__
,
EventBusName
=
event_bus
,
)
response
=
self
.
session
.
client
(
"events"
).
put_events
(
Entries
=
[
entry
,
]
)
tests/test_schema_registry.py
View file @
5eaa35fe
...
...
@@ -71,4 +71,4 @@ def test_complex_mode(complex_model, named_registry):
def
test_send_simple_message
(
test_model
,
named_registry
):
instance
=
test_model
(
name
=
"Test"
,
description
=
"Hello!"
)
log
.
debug
(
instance
)
named_registry
.
send_event
(
"auth-dev"
,
"com.pleaseignore.tvm.test"
,
instance
)
\ No newline at end of file
named_registry
.
send_event
(
"auth-dev"
,
"com.pleaseignore.tvm.test"
,
instance
)
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment