Python template
Python DDD Tutorial
Model
There are two Order and Delivery Services in the Model.
Order
- There are two events of Order: OrderPlaced and OrderCancelled. OrderPlaced set trigger with PostPersist, and OrderCancelled set trigger with PreRemove.
- OrderPlaced, that is, an ordered event, communicates with the Delivery Service by issuing an event to the kafka channel through Pub/Sub communication.
- OrderCancelled, that is, order canceled event is communicated with Delivery Service through REST API through Res/Req communication. OrderCancelled logic is executed only after DeliveryCancelled logic in Delivery Service precedes.
Delivery
- The two events of Delivery are DeliveryStarted and DeliveryCancelled. DeliveryStarted sets a trigger with PostPersist, and DeliveryCanclled sets a trigger with PreRemove.
Code
If you click the code on the top right of the EventStorming screen and set the language to python, the code above is generated. If you want to download it, click Download Archive.
Test
Running the application
The python code generated by MSAEZ must have a python version of at least 3.7.
- Go to the downloaded path and move to each service file.
- After checking whether app.py and requirements.txt exist in the current directory, enter the following command. (This command is a command to install the necessary libraries at once.)
pip install -r requirements.txt
- Enter the following command to run the application.
- OSX environment
python3 app.py
- Windows environment
python app.py
python technology stack
- Web Framework : Flask
- ORM : SqlAlchemy
- Kafka: Flask-kafka (for consumer), kafka (general) In Python, the flask-kafka API is used because the kafka consumer is created in a different thread for channel listening.
- DB : SQlite
- REST Api : requests
Before creating the Template Code
Python Template is implemented based on Spring-boot template. The parts that are linguistically different from Java will be described in the detailed description. Functions that are implemented in Spring-boot but not provided in python are implemented separately.
Python template file structure
A source code for Python template is generated like Spring boot based on model driven. The comparison between Spring-boot and Python is as follows.
role | Spring boot | Go |
---|---|---|
Aggregate | Entity.java | Entity.py |
AbstractEvent | AbstractEvent.java | AbstractEvent.py |
Event | Event.java | Event.py |
PolicyHandler | PolicyHandler.java | PolicyHandler.py |
PolicyEvent | PolicyEvent.java | PolicyEvent.py |
ExternalEntity | ExternalEntity.java | ExternalEntity.py |
ExternalService | ExternalService.java | ExternalService.py |
Repository | Repoistory.java | Repository.py |
Controller | Controller.java | Controller.py |
Application | Application.java | app.go |
Kafka | KafkaProcessor.go | |
DB | DB.py | |
utility | Util.py | |
hateoas | Hateoas.py |
Template description for each model
- This section describes the codes that are model-driven and generated based on the models generated through eventstorming.
· Entity.py
- Create aggregate code
forEach: Aggregate
fileName: {{namePascalCase}}.py
path: {{boundedContext.name}}
---
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, String, Integer, event, Float, Boolean
from datetime import datetime
import util
{{#events}}
from {{namePascalCase}} import {{namePascalCase}}
{{#relationCommandInfo}}
{{#commandValue}}
from external.{{aggregate.namePascalCase}} import {{aggregate.namePascalCase}}
from external.{{aggregate.namePascalCase}}Service import {{namePascalCase}}
{{/commandValue}}
{{/relationCommandInfo}}
{{/events}}
Base = declarative_base()
class {{namePascalCase}}(Base):
__tablename__ = '{{namePascalCase}}_table'
{{#aggregateRoot.fieldDescriptors}}
{{#isKey}}
{{nameCamelCase}} = Column(Integer, primary_key=True)
{{/isKey}}
{{^isKey}}
{{nameCamelCase}} = Column({{#typeCheckinEntity className}}{{/typeCheckinEntity}})
{{/isKey}}
{{/aggregateRoot.fieldDescriptors}}
def __init__(self):
{{#aggregateRoot.fieldDescriptors}}
self.{{nameCamelCase}} = None
{{/aggregateRoot.fieldDescriptors}}
{{#lifeCycles}}
@event.listens_for({{../namePascalCase}}, '{{#triggerCheck trigger}}{{/triggerCheck}}')
def {{trigger}}(mapper, connection, target):
{{#events}}
event = {{namePascalCase}}()
event = util.AutoBinding(target, event)
event.Publish()
{{#relationCommandInfo}}
{{#commandValue}}
{{aggregate.nameCamelCase}} = {{aggregate.namePascalCase}}()
response = {{namePascalCase}}({{aggregate.nameCamelCase}})
print(response)
{{/commandValue}}
{{/relationCommandInfo}}
{{/events}}
{{/lifeCycles}}
<function>
window.$HandleBars.registerHelper('typeCheckinEntity', function (className) {
if(className.endsWith("String")){
return "String(50)"
}
else if(className.endsWith("Integer")){
return "Integer"
}
else if(className.endsWith("Float")){
return "Float"
}
else if(className.endsWith("Long")){
return "Integer"
}
else if(className.ensWith("Boolean")){
return "Boolean"
}
else if(className.ensWith("Double")){
return "Float"
}
});
window.$HandleBars.registerHelper('triggerCheck', function (trigger) {
if(trigger.endsWith("PreRemove")){
return "before_delete"
}
else if(trigger.endsWith("PostRemove")){
return "after_delete"
}
else if(trigger.endsWith("PrePersist")){
return "before_insert"
}
else if(trigger.endsWith("PostPersist")){
return "after_insert"
}
else if(trigger.endsWith("PreUpdate")){
return "before_update"
}
else{
return "after_update"
}
});
</function>
HandlerBar function
- typeCheckinEntity
Converts a Java variable type to a variable type suitable for Python. In Python, the variable types of the entity class exist in a different form from the basic python variable type, but the type of the variable is the same. In Python, long and double do not exist, so they are replaced with int and float.
- triggerCheck
After checking the trigger in the event, it is converted into a trigger suitable for python SqlAlchemy.
Java | Python |
---|---|
PostPersist | after_insert |
PrePersist | before_insert |
PostUpdate | after_update |
PreUpdate | before_update |
PostDelete | after_delete |
PreDelete | before_delete |
Detail
- Create one Entity.py per Aggregate.
- It specifies that the tablename in the entity class is the class mapped with the .db file.
- @event.listen_for() annotation continuously listens to the status of the db and plays the role of trigger.
· AbstractEvent.py
- Creating AbstractEvent code that implements common variables and methods in Event class
from datetime import datetime
import KafkaProcessor
class AbstractEvent:
eventType : str
timeStamp : str
def __init__(self):
self.eventType = self.__class__.__name__
self.timeStamp = str(datetime.now())
def ToJson(self):
return self.__dict__
def Publish(self):
msg = self.ToJson()
processor = KafkaProcessor.streamhandler
processor.produce(msg)
Detail
- AbstractEvent has eventType with event type and timeStamp with event issue time.
- When the class is created, eventType is initialized with the name of the created class, and timeStamp is initialized with the created time.
- ToJson() is a function for serializing a function.
- Publish() is a function that publishes an event to a kafka channel.
· Event.py
- Create event code
forEach: Event
fileName: {{namePascalCase}}.py
path: {{boundedContext.name}}
---
from AbstractEvent import AbstractEvent
import json
from datetime import datetime
class {{namePascalCase}}(AbstractEvent):
{{#fieldDescriptors}}
{{nameCamelCase}} : {{#typeCheck className}}{{/typeCheck}}
{{/fieldDescriptors}}
def __init__(self):
super().__init__()
{{#fieldDescriptors}}
self.{{nameCamelCase}} = None
{{/fieldDescriptors}}
<function>
window.$HandleBars.registerHelper('typeCheck', function (className) {
if(className.endsWith("String")){
return "str"
}
else if(className.endsWith("Integer")){
return "int"
}
else if(className.endsWith("Float")){
return "float"
}
else if(className.endsWith("Long")){
return "int"
}
else if(className.ensWith("Boolean")){
return "bool"
}
else if(className.ensWith("Double")){
return "int"
}
});
</function>
HandlerBar function
- typeCheck
Converts a Java-based variable type to a python variable type. The handlebar function transforms it into a basic python variable type.
Detail
- Inherit AbstractEvent
· PolicyHandler.py
- Create PolicyHandler code
forEach: BoundedContext
fileName: PolicyHandler.py
path: {{name}}
---
import util
{{#aggregates}}
import {{namePascalCase}}DB
from {{namePascalCase}} import {{namePascalCase}}
{{nameCamelCase}}repository = {{namePascalCase}}DB.repository
{{/aggregates}}
{{#policies}}
{{#relationEventInfo}}
from {{eventValue.namePascalCase}} import {{eventValue.namePascalCase}}
def whenever{{eventValue.namePascalCase}}_{{../namePascalCase}}(data):
event = {{eventValue.namePascalCase}}()
event = util.AutoBinding(data, event)
{{#../../aggregates}}
{{nameCamelCase}} = {{namePascalCase}}()
{{nameCamelCase}}repository.save({{nameCamelCase}})
{{/../../aggregates}}
{{/relationEventInfo}}
{{/policies}}
Detail
- When the corresponding event is received from the kafka channel, the serialized message is converted into an event object through AutoBinding of uitl.
- Logic can be additionally implemented using the converted event object.
· PolicyEvent.py
- Implementing event code used in PolicyHandler
forEach: RelationEventInfo
fileName: {{eventValue.namePascalCase}}.py
path: {{boundedContext.name}}
---
from AbstractEvent import AbstractEvent
import json
class {{eventValue.namePascalCase}}(AbstractEvent):
{{#eventValue.fieldDescriptors}}
{{nameCamelCase}} : {{#typeCheck className}}{{/typeCheck}}
{{/eventValue.fieldDescriptors}}
def __init__(self):
super().__init__()
{{#eventValue.fieldDescriptors}}
self.{{nameCamelCase}} = None
{{/eventValue.fieldDescriptors}}
<function>
window.$HandleBars.registerHelper('typeCheck', function (className) {
if(className.endsWith("String")){
return "str"
}
else if(className.endsWith("Integer")){
return "int"
}
else if(className.endsWith("Float")){
return "float"
}
else if(className.endsWith("Long")){
return "int"
}
else if(className.ensWith("Boolean")){
return "bool"
}
else if(className.ensWith("Double")){
return "int"
}
});
</function>
Detail
- Create code for events of other aggregates that are connected to the policyHandler of the current aggregate.
- Same as Event.py code
· ExternalService.py
- Create ExternalService code that implements logic to communicate with external service command and res/req method
forEach: RelationCommandInfo
fileName: {{commandValue.aggregate.namePascalCase}}Service.py
path: {{boundedContext.name}}/external
---
from external.{{commandValue.aggregate.namePascalCase}} import {{commandValue.aggregate.namePascalCase}}
import requests
import json
{{#MethodGet commandValue.restRepositoryInfo.method}}
def {{commandValue.namePascalCase}}(id):
headers = {'Content-Type':'application/json'}
ip = "http://localhost:"
port = "### this should be changed ###"
target = "/{{commandValue.aggregate.namePlural}}/"+str(id)
address = ip+port+target
response = requests.get(address, headers=headers)
response = response.content.decode('utf8').replace("'", '"')
return response
{{/MethodGet}}
{{#MethodPost commandValue.restRepositoryInfo.method}}
def {{commandValue.namePascalCase}}(obj):
headers = {'Content-Type':'application/json'}
ip = "http://localhost:"
port = "### this should be changed ###"
target = "/{{commandValue.aggregate.namePlural}}"
address = ip+port+target
data = obj.__dict__
response = requests.post(address,data=data, headers=headers)
response = response.content.decode('utf8').replace("'", '"')
'''
LOGIC GOES HERE
'''
return response
{{/MethodPost}}
{{#MethodUpdate commandValue.restRepositoryInfo.method}}
def {{commandValue.namePascalCase}}(id, obj):
headers = {'Content-Type':'application/json'}
ip = "http://localhost:"
port = "### this should be changed ###"
target = "/{{commandValue.aggregate.namePlural}}/"+str(id)
data = obj.__dict__
response = requests.put(address,data=data, headers=headers)
response = response.content.decode('utf8').replace("'", '"')
return response
{{/MethodUpdate}}
{{#MethodDelete commandValue.restRepositoryInfo.method}}
def {{commandValue.namePascalCase}}(id):
headers = {'Content-Type':'application/json'}
ip = "http://localhost:"
port = "### this should be changed ###"
target = "/{{commandValue.aggregate.namePlural}}/"+str(id)
response = requests.delete(address, headers=headers)
return response
{{/MethodDelete}}
<function>
window.$HandleBars.registerHelper('MethodGet', function(method, options){
if(method.endsWith('GET')){
return options.fn(this)
}
else{
return options.inverse(this)
}
});
window.$HandleBars.registerHelper('MethodPost', function(method, options){
if(method.endsWith('POST')){
return options.fn(this)
}
else{
return options.inverse(this)
}
});
window.$HandleBars.registerHelper('MethodUpdate', function(method, options){
if(method.endsWith('PUT')){
return options.fn(this)
}
else{
return options.inverse(this)
}
});
window.$HandleBars.registerHelper('MethodDelete', function(method, options){
if(method.endsWith('DELETE')){
return options.fn(this)
}
else{
return options.inverse(this)
}
});
</function>
HandleBar Function
Implement a function such as switch/case that can distinguish methods because it is necessary to generate a code suitable for the method according to the method
- MethodGet
return true for get method
- MethodPost
return true for post method
- MethodUpdate
In case of update method, return true
- MethodDelete
In case of Delete method, return true
Detail
- The requests library, which is a rest api, was used for the req/res communication method.
· ExternalEntity.py
- Create entity code corresponding to aggregate of external service connected by res/req communication
forEach: RelationCommandInfo
fileName: {{commandValue.aggregate.namePascalCase}}.py
path: {{boundedContext.name}}/external
---
{{#commandValue.aggregate}}
class {{namePascalCase}}:
{{#aggregateRoot.fieldDescriptors}}
{{nameCamelCase}} : {{#typeCheck className}}{{/typeCheck}}
{{/aggregateRoot.fieldDescriptors}}
def __init__(self):
{{#aggregateRoot.fieldDescriptors}}
{{nameCamelCase}} = None
{{/aggregateRoot.fieldDescriptors}}
{{/commandValue.aggregate}}
<function>
window.$HandleBars.registerHelper('typeCheck', function (className) {
if(className.endsWith("String")){
return "str"
}
else if(className.endsWith("Integer")){
return "int"
}
else if(className.endsWith("Float")){
return "float"
}
else if(className.endsWith("Long")){
return "int"
}
else if(className.ensWith("Boolean")){
return "bool"
}
else if(className.ensWith("Double")){
return "int"
}
});
</function>
Detail
- It is similar to the aggregate code, but there is no mapping with the db table. It is a simple object that only contains information from an external aggregate.
· Repository.py
- Implementing Repository code with DB and related data logic implemented
forEach: Aggregate
fileName: {{namePascalCase}}Repository.py
path: {{boundedContext.name}}
---
from flask import Response, request
import {{namePascalCase}}DB
import util
from {{namePascalCase}} import {{namePascalCase}}
repository = {{namePascalCase}}DB.repository
def Create(request):
entity = {{namePascalCase}}()
entity = util.AutoBinding(request.json, entity)
repository.save(entity)
return entity
def Read(request):
entity_list = repository.list()
return entity_list
def Read_by_id(id, request):
ele = repository.find_by_id(id)
return ele
def Update(id, request):
ele = repository.update(id, request.json)
return ele
def Delete(id, request):
ele = repository.delete(id)
return ele
Detail
- Basic CRUD related to DB is implemented.
- To maintain persistence, import the repository object created in DB code and execute DB logic.
- The result is returned from the repository to the controller. In this case, if an error occurs, 'error' is returned, and if it works normally, data corresponding to each function is returned.
· Controller.py
- Creating Controller code that implements basic CRUD of REST API
forEach: Aggregate
fileName: {{namePascalCase}}Controller.py
path: {{boundedContext.name}}
---
from flask import Flask, abort, jsonify, request, Blueprint, Response
import {{namePascalCase}}Repository
import Hateoas
bp = Blueprint('{{nameCamelCase}}', __name__, url_prefix='/{{namePlural}}')
@bp.route("", methods=["POST"])
def Post():
response = {{namePascalCase}}Repository.Create(request)
response = Hateoas.POST_response(request.base_url, response)
return response
@bp.route("", methods=["GET"])
def Get():
response = {{namePascalCase}}Repository.Read(request)
response = Hateoas.GET_list_response(request.url_root, request.base_url, response)
return response
@bp.route("/<int:id>", methods=["GET"])
def Get_By_Id(id: int):
response = {{namePascalCase}}Repository.Read_by_id(id, request)
if response != 'error':
response = Hateoas.GET_id_response(request.base_url, response)
else:
response = Response(response, status=404,mimetype='application/json')
return response
@bp.route("/<int:id>", methods=["UPDATE"])
def Update(id: int):
response = {{namePascalCase}}Repository.Update(id, request)
if response != 'error':
response = Hateoas.GET_id_response(request.base_url, response)
else:
response = Response(response, status=403,mimetype='application/json')
return response
@bp.route("/<int:id>", methods=["DELETE"])
def Delete(id: int):
response = {{namePascalCase}}Repository.Delete(id)
if response == None:
response = Response(status=200, mimetype='application/json')
else :
response = Response(response, status=403,mimetype='application/json')
return response
{{#commands}}
{{#isRestRepository}}
{{/isRestRepository}}
{{^isRestRepository}}
@bp.route("/{{controllerInfo.apiPath}}", methods=["{{controllerInfo.method}}"])
def {{nameCamelCase}}(): # -> edit this part
'''
LOGIC GOES HERE
'''
return 'hello world!'
{{/isRestRepository}}
{{/commands}}
@bp.errorhandler(404)
def resource_not_found(e):
return jsonify(error=str(e)), 404
Detail
- Create a controller per aggregate.
- In flask, you need to manage the route function in the file where the Flask() object exists, so I used the blueprint api in flask to manage it in other files as well.
- Because the controller determines the action for the request, it responds by calling the corresponding repository logic according to the request.
- It is the data returned through the repository. If it is a string type error, it responds with an error response, and if it is normal data, it responds with the hateoas method.
- When the command is not in RestRepository, the route and method are set with the received apiPath.
· app.py
- Creating application code that can run the application
forEach: BoundedContext
fileName: app.py
path: {{name}}
---
from flask import Flask
import KafkaProcessor
{{#aggregates}}
import {{namePascalCase}}Controller
{{/aggregates}}
app = Flask(__name__)
sh = KafkaProcessor.streamhandler
{{#aggregates}}
app.register_blueprint({{namePascalCase}}Controller.bp)
{{/aggregates}}
if __name__ == "__main__":
sh.consumer.run()
app.run(debug=True, port={{portGenerated}})
Detail
- Since there can be multiple aggregates in one boundedContext, import all the controllers corresponding to the aggregate.
- The streamhandler created in kafkaprocessor is executed together when the application is executed. (At this time, a new thread is created for the kafka consumer.)
Python-specific templates
- This section describes the code that is abstracted in Java and implemented in python format.
- Since there is no In-Memory DB in python, SQlite was used.
- Kafka API is used for Kafka producer and overall Kafka management, and flask-kafka API is used to listen to the flask thread and other threads.
· DB.py
- Create Sqlite DB (H2 DB is used in the spring boot tutorial.)
- All logic related to DB is executed in this code
forEach: Aggregate
fileName: {{namePascalCase}}DB.py
path: {{boundedContext.name}}
---
from typing import List
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from {{namePascalCase}} import {{namePascalCase}}
class {{namePascalCase}}DB():
def __init__(self):
self.engine = create_engine('sqlite:///{{namePascalCase}}_table.db', echo=True, connect_args={'check_same_thread': False})
{{namePascalCase}}.__table__.create(bind=self.engine, checkfirst=True)
def list(self):
DBsession = sessionmaker(bind=self.engine)
session = DBsession()
query = session.query({{namePascalCase}}).all()
return query
def save(self, entity):
DBsession = sessionmaker(bind=self.engine)
session = DBsession()
session.add(entity)
session.commit()
def find_by_id(self, id:int):
DBsession = sessionmaker(bind=self.engine)
session = DBsession()
try:
query = session.query({{namePascalCase}}).filter_by(id=id).one()
except Exception as err:
print(type(err))
return err
else:
return None
def update(self, id, request):
DBsession = sessionmaker(bind=self.engine)
session = DBsession()
try:
query = session.query({{namePascalCase}}).filter({{namePascalCase}}.id==id).first()
query_dict = query.__dict__
for key, value in request.items():
if key in query_dict:
setattr(query, key, value)
except Exception as err:
print(err)
return err
else:
session.commit()
return query
def delete(self, id):
DBsession = sessionmaker(bind=self.engine)
session = DBsession()
try:
query = session.query({{namePascalCase}}).filter_by(id=id).one()
session.delete(query)
except Exception as err:
print('error:',err)
return err
else:
session.commit()
return None
repository = {{namePascalCase}}DB()
Detail
- Create a .db file with create_engine() and bind it with the corresponding entity class.
- In the create_engine() argument, connect_args configures the db to be accessed by other threads as well. This enables logic to be executed when an event, the corresponding eventType, is received from kafka consumer.
- The function to save to db is save, and the entity class is entered as an argument.
- The function that reads all data from db is a list, and the function that brings id as a key is find_by_id.
- The function that updates a record in the db is update, and it is entered in the form of a dict as an argument.
- The function that deletes a record from the db enters an id as an argument to delete.
- To maintain persistence, the same context can be maintained in other files by initializing the db object in the db file.
· KafkaProcessor.py
forEach: BoundedContext
fileName: KafkaProcessor.py
path: {{name}}
---
from kafka import KafkaProducer
from flask_kafka import FlaskKafka
from threading import Event
import json
{{#policies}}
{{#relationEventInfo}}
from PolicyHandler import whenever{{eventValue.namePascalCase}}_{{../namePascalCase}}
{{/relationEventInfo}}
{{/policies}}
class StreamHandler():
def __init__(self):
INTERRUPT_EVENT = Event()
self.destination = "{{options.package}}"
self.group_id = "{{name}}"
self.producer = KafkaProducer(acks=0, compression_type='gzip', bootstrap_servers=['localhost:9092'], value_serializer=lambda x: json.dumps(x).encode('utf-8'))
self.consumer = FlaskKafka(INTERRUPT_EVENT,
bootstrap_servers=["localhost:9092"],
group_id= self.group_id
)
def produce(self, msg):
self.producer.send(self.destination, value=msg)
streamhandler = StreamHandler()
@streamhandler.consumer.handle(streamhandler.destination)
def consume(msg):
my_json = msg.value.decode('utf8').replace("'", '"')
data = json.loads(my_json)
{{#policies}}
if data['eventType'] == {{#relationEventInfo}}"{{eventValue.namePascalCase}}"{{/relationEventInfo}}:
{{#relationEventInfo}}
whenever{{eventValue.namePascalCase}}_{{../namePascalCase}}(data)
{{/relationEventInfo}}
{{/policies}}
Detail
- The overall config related to kafka is initialized in StreamHandler.
- @streamhandler.consumer.handle annotation executes the function below it in a new thread.
- In the consume() function, deserialize the message received through channel listening, classify the event type, and call the corresponding function.
· util.py
- The autobinding function that exists in Java does not exist in Python. Code that implements this
def AutoBinding(source, target):
if isinstance(source, dict) == True:
source_dict = source
else:
source_dict = source.__dict__
target_dict = target.__dict__
for key, value in source_dict.items():
if key in target_dict:
setattr(target,key,value)
return target
Detail
- AutoBinding is a function that maps dict type or class to class.
- It is used when binding the request coming in the form of a dict to the entity class, binding the message coming through the kafka channel to the event class, and binding the entity class and the event class
· Hateoas.py
- Module to convert Response to Hateoas format
forEach: BoundedContext
fileName: Hateoas.py
path: {{name}}
---
import json
from collections import OrderedDict
def GET_id_response(base_url, entity):
response=OrderedDict()
entity_name = entity.__class__.__name__.lower()
response['_links'] = {
entity_name :{
'href':base_url
},
'self':{
'href':base_url
}
}
entity_dict = entity.__dict__
for key, value in entity_dict.items():
if key != '_sa_instance_state' and key != 'id':
response[key] = value
return response
def POST_response(base_url, entity):
response = OrderedDict()
url = base_url + '/'+ str(entity.id)
entity_name = entity.__class__.__name__.lower()
response['_links'] = {
entity_name: {
'href': url
},
'self':{
'href': url
}
}
entity_dict = entity.__dict__
for key, value in entity_dict.items():
if key != '_sa_instance_state' and key != 'id':
response[key] = value
return response
def GET_list_response(url_root,base_url,entity_list):
totalElements = len(entity_list)
size = 20
number = totalElements//size
response = OrderedDict()
obj_list = []
for ele in entity_list:
obj_dict = OrderedDict()
ele_dict = ele.__dict__
ele_name = ele.__class__.__name__.lower()
url = base_url + '/'+str(ele.id)
obj_dict['_list'] = {
ele_name: {
"href":url
},
"self":{
"href":url
}
}
for key, value in ele_dict.items():
if key != '_sa_instance_state' and key != 'id':
obj_dict[key] = value
obj_list.append(obj_dict)
target = base_url.split("/")[-1]
response['_embedded'] = obj_list
response['_links'] = {
'profile' : {
'href': url_root+"profile/"+target
},
'self':{
'href': base_url+"{?page,size,sort}",
'templated': True
}
}
response['page']={
'number': str(number),
'size': str(size),
'totalElements':str(totalElements),
'totalPages': str(number+1)
}
return response
Detail
- GGET_id_response is a function that creates a response format for the entity that has the id as the key. The entity class is entered as an argument. -POST_response is a function that creates a response format for the post method. As an argument, the entity class received by the post is entered.
- GET_list_response is a function that creates a response format when all records are fetched. As an argument, it is entered in the form of a list of entity classes.