Python template

Python DDD Tutorial

Model

model

There are two Order and Delivery Services in the Model.

Order

  • There are two events of Order: OrderPlaced and OrderCancelled. OrderPlaced set trigger with PostPersist, and OrderCancelled set trigger with PreRemove.
  • OrderPlaced, that is, an ordered event, communicates with the Delivery Service by issuing an event to the kafka channel through Pub/Sub communication.
  • OrderCancelled, that is, order canceled event is communicated with Delivery Service through REST API through Res/Req communication. OrderCancelled logic is executed only after DeliveryCancelled logic in Delivery Service precedes.

Delivery

  • The two events of Delivery are DeliveryStarted and DeliveryCancelled. DeliveryStarted sets a trigger with PostPersist, and DeliveryCanclled sets a trigger with PreRemove.

Code

code

If you click the code on the top right of the EventStorming screen and set the language to python, the code above is generated. If you want to download it, click Download Archive.

Test

code

Running the application

The python code generated by MSAEZ must have a python version of at least 3.7.

  1. Go to the downloaded path and move to each service file.
  2. After checking whether app.py and requirements.txt exist in the current directory, enter the following command. (This command is a command to install the necessary libraries at once.)
pip install -r requirements.txt
  1. Enter the following command to run the application.
  • OSX environment
python3 app.py
  • Windows environment
python app.py

python technology stack

  • Web Framework : Flask
  • ORM : SqlAlchemy
  • Kafka: Flask-kafka (for consumer), kafka (general) In Python, the flask-kafka API is used because the kafka consumer is created in a different thread for channel listening.
  • DB : SQlite
  • REST Api : requests

Before creating the Template Code

Python Template is implemented based on Spring-boot template. The parts that are linguistically different from Java will be described in the detailed description. Functions that are implemented in Spring-boot but not provided in python are implemented separately.

Python template file structure

A source code for Python template is generated like Spring boot based on model driven. The comparison between Spring-boot and Python is as follows.

role Spring boot Go
Aggregate Entity.java Entity.py
AbstractEvent AbstractEvent.java AbstractEvent.py
Event Event.java Event.py
PolicyHandler PolicyHandler.java PolicyHandler.py
PolicyEvent PolicyEvent.java PolicyEvent.py
ExternalEntity ExternalEntity.java ExternalEntity.py
ExternalService ExternalService.java ExternalService.py
Repository Repoistory.java Repository.py
Controller Controller.java Controller.py
Application Application.java app.go
Kafka KafkaProcessor.go
DB DB.py
utility Util.py
hateoas Hateoas.py

Template description for each model

  • This section describes the codes that are model-driven and generated based on the models generated through eventstorming.

· Entity.py

  • Create aggregate code
forEach: Aggregate
fileName: {{namePascalCase}}.py
path: {{boundedContext.name}}
---
from sqlalchemy.ext.declarative import declarative_base 
from sqlalchemy import Column, String, Integer, event, Float, Boolean
from datetime import datetime

import util
{{#events}}
from {{namePascalCase}} import {{namePascalCase}}
{{#relationCommandInfo}}
{{#commandValue}}
from external.{{aggregate.namePascalCase}} import {{aggregate.namePascalCase}}
from external.{{aggregate.namePascalCase}}Service import {{namePascalCase}}
{{/commandValue}}
{{/relationCommandInfo}}
{{/events}}

Base = declarative_base()

class {{namePascalCase}}(Base):
    __tablename__ = '{{namePascalCase}}_table'
    {{#aggregateRoot.fieldDescriptors}}
    {{#isKey}}
    {{nameCamelCase}} = Column(Integer, primary_key=True)
    {{/isKey}}
    {{^isKey}}
    {{nameCamelCase}} = Column({{#typeCheckinEntity className}}{{/typeCheckinEntity}})
    {{/isKey}}
    {{/aggregateRoot.fieldDescriptors}}

    def __init__(self):
        {{#aggregateRoot.fieldDescriptors}}
        self.{{nameCamelCase}} = None
        {{/aggregateRoot.fieldDescriptors}}

{{#lifeCycles}}
@event.listens_for({{../namePascalCase}}, '{{#triggerCheck trigger}}{{/triggerCheck}}')
def {{trigger}}(mapper, connection, target):
    {{#events}}
    event = {{namePascalCase}}()
    event = util.AutoBinding(target, event)

    event.Publish()
    
    {{#relationCommandInfo}}
    {{#commandValue}}
    {{aggregate.nameCamelCase}} = {{aggregate.namePascalCase}}()
    response = {{namePascalCase}}({{aggregate.nameCamelCase}})

    print(response)
    {{/commandValue}}
    {{/relationCommandInfo}}
    {{/events}}

    
{{/lifeCycles}}


<function>
    window.$HandleBars.registerHelper('typeCheckinEntity', function (className) {
        if(className.endsWith("String")){
            return "String(50)"
        }
		else if(className.endsWith("Integer")){
			return "Integer"
		}
		else if(className.endsWith("Float")){
			return "Float"
		}
		else if(className.endsWith("Long")){
			return "Integer"
		}
		else if(className.ensWith("Boolean")){
			return "Boolean"
		}
		else if(className.ensWith("Double")){
			return "Float"
		}
		
    });

    window.$HandleBars.registerHelper('triggerCheck', function (trigger) {
        if(trigger.endsWith("PreRemove")){
            return "before_delete"
        }
        else if(trigger.endsWith("PostRemove")){
            return "after_delete"
        }
		else if(trigger.endsWith("PrePersist")){
			return "before_insert"
		}
		else if(trigger.endsWith("PostPersist")){
			return "after_insert"
		}
		else if(trigger.endsWith("PreUpdate")){
			return "before_update"
		}
		else{
			return "after_update"
		}
    });
    
</function>

HandlerBar function

  • typeCheckinEntity

Converts a Java variable type to a variable type suitable for Python. In Python, the variable types of the entity class exist in a different form from the basic python variable type, but the type of the variable is the same. In Python, long and double do not exist, so they are replaced with int and float.

  • triggerCheck

After checking the trigger in the event, it is converted into a trigger suitable for python SqlAlchemy.

Java Python
PostPersist after_insert
PrePersist before_insert
PostUpdate after_update
PreUpdate before_update
PostDelete after_delete
PreDelete before_delete

Detail

  • Create one Entity.py per Aggregate.
  • It specifies that the tablename in the entity class is the class mapped with the .db file.
  • @event.listen_for() annotation continuously listens to the status of the db and plays the role of trigger.

· AbstractEvent.py

  • Creating AbstractEvent code that implements common variables and methods in Event class
from datetime import datetime

import KafkaProcessor

class AbstractEvent:
	eventType : str
	timeStamp : str

	def __init__(self):
		self.eventType = self.__class__.__name__
		self.timeStamp = str(datetime.now())

	def ToJson(self):
		return self.__dict__

	def Publish(self):
		msg = self.ToJson()

		processor = KafkaProcessor.streamhandler
		processor.produce(msg)

Detail

  • AbstractEvent has eventType with event type and timeStamp with event issue time.
  • When the class is created, eventType is initialized with the name of the created class, and timeStamp is initialized with the created time.
  • ToJson() is a function for serializing a function.
  • Publish() is a function that publishes an event to a kafka channel.

· Event.py

  • Create event code
forEach: Event
fileName: {{namePascalCase}}.py
path: {{boundedContext.name}}
---
from AbstractEvent import AbstractEvent
import json
from datetime import datetime

class {{namePascalCase}}(AbstractEvent):
    {{#fieldDescriptors}}
    {{nameCamelCase}} : {{#typeCheck className}}{{/typeCheck}}
    {{/fieldDescriptors}}
    
    def __init__(self):
        super().__init__()
        {{#fieldDescriptors}}
        self.{{nameCamelCase}} = None
        {{/fieldDescriptors}}


<function>
    window.$HandleBars.registerHelper('typeCheck', function (className) {
        if(className.endsWith("String")){
            return "str"
        }
		else if(className.endsWith("Integer")){
			return "int"
		}
		else if(className.endsWith("Float")){
			return "float"
		}
		else if(className.endsWith("Long")){
			return "int"
		}
		else if(className.ensWith("Boolean")){
			return "bool"
		}
		else if(className.ensWith("Double")){
			return "int"
		}
		
    });
</function>

HandlerBar function

  • typeCheck

Converts a Java-based variable type to a python variable type. The handlebar function transforms it into a basic python variable type.

Detail

  • Inherit AbstractEvent

· PolicyHandler.py

  • Create PolicyHandler code
forEach: BoundedContext
fileName: PolicyHandler.py
path: {{name}}
---
import util
{{#aggregates}}
import {{namePascalCase}}DB
from {{namePascalCase}} import {{namePascalCase}}
{{nameCamelCase}}repository = {{namePascalCase}}DB.repository

{{/aggregates}}

{{#policies}}
{{#relationEventInfo}}
from {{eventValue.namePascalCase}} import {{eventValue.namePascalCase}}

def whenever{{eventValue.namePascalCase}}_{{../namePascalCase}}(data):
    event = {{eventValue.namePascalCase}}()
    event = util.AutoBinding(data, event)
    
    {{#../../aggregates}}
    {{nameCamelCase}} = {{namePascalCase}}()
    {{nameCamelCase}}repository.save({{nameCamelCase}})
    {{/../../aggregates}}
    
{{/relationEventInfo}}
{{/policies}}

Detail

  • When the corresponding event is received from the kafka channel, the serialized message is converted into an event object through AutoBinding of uitl.
  • Logic can be additionally implemented using the converted event object.

· PolicyEvent.py

  • Implementing event code used in PolicyHandler
forEach: RelationEventInfo
fileName: {{eventValue.namePascalCase}}.py
path: {{boundedContext.name}}
---

from AbstractEvent import AbstractEvent
import json

class {{eventValue.namePascalCase}}(AbstractEvent):
    {{#eventValue.fieldDescriptors}}
    {{nameCamelCase}} : {{#typeCheck className}}{{/typeCheck}}
    {{/eventValue.fieldDescriptors}}
    
    def __init__(self):
        super().__init__()
        {{#eventValue.fieldDescriptors}}
        self.{{nameCamelCase}} = None
        {{/eventValue.fieldDescriptors}}


<function>
    window.$HandleBars.registerHelper('typeCheck', function (className) {
        if(className.endsWith("String")){
            return "str"
        }
		else if(className.endsWith("Integer")){
			return "int"
		}
		else if(className.endsWith("Float")){
			return "float"
		}
		else if(className.endsWith("Long")){
			return "int"
		}
		else if(className.ensWith("Boolean")){
			return "bool"
		}
		else if(className.ensWith("Double")){
			return "int"
		}
		
    });
</function>

Detail

  • Create code for events of other aggregates that are connected to the policyHandler of the current aggregate.
  • Same as Event.py code

· ExternalService.py

  • Create ExternalService code that implements logic to communicate with external service command and res/req method
forEach: RelationCommandInfo
fileName: {{commandValue.aggregate.namePascalCase}}Service.py
path: {{boundedContext.name}}/external
---
from external.{{commandValue.aggregate.namePascalCase}} import {{commandValue.aggregate.namePascalCase}}
import requests
import json

{{#MethodGet commandValue.restRepositoryInfo.method}}
def {{commandValue.namePascalCase}}(id):
	headers = {'Content-Type':'application/json'}
	ip = "http://localhost:"
	port = "### this should be changed ###"
	target = "/{{commandValue.aggregate.namePlural}}/"+str(id)
	address = ip+port+target

	response = requests.get(address, headers=headers)
	response = response.content.decode('utf8').replace("'", '"')
	
	return response
{{/MethodGet}}

{{#MethodPost commandValue.restRepositoryInfo.method}}
def {{commandValue.namePascalCase}}(obj):
	headers = {'Content-Type':'application/json'}
	ip = "http://localhost:"
	port = "### this should be changed ###"
	target = "/{{commandValue.aggregate.namePlural}}"
	address = ip+port+target
	data = obj.__dict__
	response = requests.post(address,data=data, headers=headers)
	response = response.content.decode('utf8').replace("'", '"')
	'''
    LOGIC GOES HERE
    '''

	return response
{{/MethodPost}}

{{#MethodUpdate commandValue.restRepositoryInfo.method}}
def {{commandValue.namePascalCase}}(id, obj):
	headers = {'Content-Type':'application/json'}
	ip = "http://localhost:"
	port = "### this should be changed ###"
	target = "/{{commandValue.aggregate.namePlural}}/"+str(id)
	data = obj.__dict__
	response = requests.put(address,data=data, headers=headers)
	response = response.content.decode('utf8').replace("'", '"')

	return response
{{/MethodUpdate}}

{{#MethodDelete commandValue.restRepositoryInfo.method}}
def {{commandValue.namePascalCase}}(id):
	headers = {'Content-Type':'application/json'}
	ip = "http://localhost:"
	port = "### this should be changed ###"
	target = "/{{commandValue.aggregate.namePlural}}/"+str(id)

	response = requests.delete(address, headers=headers)
	
	return response
{{/MethodDelete}}	

<function>
	window.$HandleBars.registerHelper('MethodGet', function(method, options){
        if(method.endsWith('GET')){
        	return options.fn(this)
		}
		else{
			return options.inverse(this)
		}
    });
	window.$HandleBars.registerHelper('MethodPost', function(method, options){
        if(method.endsWith('POST')){
        	return options.fn(this)
		}
		else{
			return options.inverse(this)
		}
    });
	window.$HandleBars.registerHelper('MethodUpdate', function(method, options){
        if(method.endsWith('PUT')){
        	return options.fn(this)
		}
		else{
			return options.inverse(this)
		}
    });
	window.$HandleBars.registerHelper('MethodDelete', function(method, options){
        if(method.endsWith('DELETE')){
        	return options.fn(this)
		}
		else{
			return options.inverse(this)
		}
    });
</function>

HandleBar Function

Implement a function such as switch/case that can distinguish methods because it is necessary to generate a code suitable for the method according to the method

  • MethodGet

return true for get method

  • MethodPost

return true for post method

  • MethodUpdate

In case of update method, return true

  • MethodDelete

In case of Delete method, return true

Detail

  • The requests library, which is a rest api, was used for the req/res communication method.

· ExternalEntity.py

  • Create entity code corresponding to aggregate of external service connected by res/req communication
forEach: RelationCommandInfo
fileName: {{commandValue.aggregate.namePascalCase}}.py
path: {{boundedContext.name}}/external
---
{{#commandValue.aggregate}}
class {{namePascalCase}}:
    {{#aggregateRoot.fieldDescriptors}}
    {{nameCamelCase}} : {{#typeCheck className}}{{/typeCheck}}
    {{/aggregateRoot.fieldDescriptors}}

    def __init__(self):
        {{#aggregateRoot.fieldDescriptors}}
        {{nameCamelCase}} = None
        {{/aggregateRoot.fieldDescriptors}}
{{/commandValue.aggregate}}

<function>
    window.$HandleBars.registerHelper('typeCheck', function (className) {
        if(className.endsWith("String")){
            return "str"
        }
		else if(className.endsWith("Integer")){
			return "int"
		}
		else if(className.endsWith("Float")){
			return "float"
		}
		else if(className.endsWith("Long")){
			return "int"
		}
		else if(className.ensWith("Boolean")){
			return "bool"
		}
		else if(className.ensWith("Double")){
			return "int"
		}
		
    });
</function>

Detail

  • It is similar to the aggregate code, but there is no mapping with the db table. It is a simple object that only contains information from an external aggregate.

· Repository.py

  • Implementing Repository code with DB and related data logic implemented
forEach: Aggregate
fileName: {{namePascalCase}}Repository.py
path: {{boundedContext.name}}
---
from flask import Response, request

import {{namePascalCase}}DB
import util
from {{namePascalCase}} import {{namePascalCase}}

repository = {{namePascalCase}}DB.repository

def Create(request):
    entity = {{namePascalCase}}()
    entity = util.AutoBinding(request.json, entity)
    repository.save(entity)

    return entity
    
def Read(request):
    entity_list = repository.list()
    
    return entity_list

def Read_by_id(id, request):
    
    ele = repository.find_by_id(id)

    return ele

def Update(id, request):
    
    ele = repository.update(id, request.json)
    
    return ele

def Delete(id, request):
    ele = repository.delete(id)
    
    return ele

Detail

  • Basic CRUD related to DB is implemented.
  • To maintain persistence, import the repository object created in DB code and execute DB logic.
  • The result is returned from the repository to the controller. In this case, if an error occurs, 'error' is returned, and if it works normally, data corresponding to each function is returned.

· Controller.py

  • Creating Controller code that implements basic CRUD of REST API
forEach: Aggregate
fileName: {{namePascalCase}}Controller.py
path: {{boundedContext.name}}
---
from flask import Flask, abort, jsonify, request, Blueprint, Response
import {{namePascalCase}}Repository 
import Hateoas 

bp = Blueprint('{{nameCamelCase}}', __name__, url_prefix='/{{namePlural}}')

@bp.route("", methods=["POST"])
def Post():
    response = {{namePascalCase}}Repository.Create(request)

    response = Hateoas.POST_response(request.base_url, response)
    return response

@bp.route("", methods=["GET"])
def Get():
    response = {{namePascalCase}}Repository.Read(request)
    response = Hateoas.GET_list_response(request.url_root, request.base_url, response)
    
    return response

@bp.route("/<int:id>", methods=["GET"])
def Get_By_Id(id: int):
    response = {{namePascalCase}}Repository.Read_by_id(id, request)

    if response != 'error':
        response = Hateoas.GET_id_response(request.base_url, response)
    else:
        response = Response(response, status=404,mimetype='application/json')
    return response

@bp.route("/<int:id>", methods=["UPDATE"])
def Update(id: int):
    
    response = {{namePascalCase}}Repository.Update(id, request)
    
    if response != 'error':
        response = Hateoas.GET_id_response(request.base_url, response)
    else:
        response = Response(response, status=403,mimetype='application/json')

    
    return response

@bp.route("/<int:id>", methods=["DELETE"])
def Delete(id: int):
    response = {{namePascalCase}}Repository.Delete(id)
    if response == None:
        response = Response(status=200, mimetype='application/json')
    else :
        response = Response(response, status=403,mimetype='application/json')
    
    return response

{{#commands}}
{{#isRestRepository}}
{{/isRestRepository}}
{{^isRestRepository}}
@bp.route("/{{controllerInfo.apiPath}}", methods=["{{controllerInfo.method}}"])
def {{nameCamelCase}}(): # -> edit this part
    
    '''
    LOGIC GOES HERE
    '''
    return 'hello world!'

{{/isRestRepository}}
{{/commands}}

@bp.errorhandler(404)
def resource_not_found(e):
    return jsonify(error=str(e)), 404

Detail

  • Create a controller per aggregate.
  • In flask, you need to manage the route function in the file where the Flask() object exists, so I used the blueprint api in flask to manage it in other files as well.
  • Because the controller determines the action for the request, it responds by calling the corresponding repository logic according to the request.
  • It is the data returned through the repository. If it is a string type error, it responds with an error response, and if it is normal data, it responds with the hateoas method.
  • When the command is not in RestRepository, the route and method are set with the received apiPath.

· app.py

  • Creating application code that can run the application
forEach: BoundedContext
fileName: app.py
path: {{name}}
---
from flask import Flask
import KafkaProcessor
{{#aggregates}}
import {{namePascalCase}}Controller
{{/aggregates}}

app = Flask(__name__)

sh = KafkaProcessor.streamhandler

{{#aggregates}}
app.register_blueprint({{namePascalCase}}Controller.bp)
{{/aggregates}}
if __name__ == "__main__":
	sh.consumer.run()
	app.run(debug=True, port={{portGenerated}})

Detail

  • Since there can be multiple aggregates in one boundedContext, import all the controllers corresponding to the aggregate.
  • The streamhandler created in kafkaprocessor is executed together when the application is executed. (At this time, a new thread is created for the kafka consumer.)

Python-specific templates

  • This section describes the code that is abstracted in Java and implemented in python format.
  • Since there is no In-Memory DB in python, SQlite was used.
  • Kafka API is used for Kafka producer and overall Kafka management, and flask-kafka API is used to listen to the flask thread and other threads.

· DB.py

  • Create Sqlite DB (H2 DB is used in the spring boot tutorial.)
  • All logic related to DB is executed in this code
forEach: Aggregate
fileName: {{namePascalCase}}DB.py
path: {{boundedContext.name}}
---
from typing import List

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

from {{namePascalCase}} import {{namePascalCase}}

class {{namePascalCase}}DB():
    def __init__(self):
        self.engine = create_engine('sqlite:///{{namePascalCase}}_table.db', echo=True, connect_args={'check_same_thread': False})

        {{namePascalCase}}.__table__.create(bind=self.engine, checkfirst=True)

    def list(self):
        DBsession = sessionmaker(bind=self.engine)
        session = DBsession()
        
        query = session.query({{namePascalCase}}).all()
        return query
    
    def save(self, entity):
        DBsession = sessionmaker(bind=self.engine)
        session = DBsession()
        
        session.add(entity)
        session.commit()
    
    def find_by_id(self, id:int):
        DBsession = sessionmaker(bind=self.engine)
        session = DBsession()
        try:
            query = session.query({{namePascalCase}}).filter_by(id=id).one()
        except Exception as err:
            print(type(err))
            return err
        else:
            return None
            
    def update(self, id, request):
        DBsession = sessionmaker(bind=self.engine)
        session = DBsession()
        
        try:
            query = session.query({{namePascalCase}}).filter({{namePascalCase}}.id==id).first()
            query_dict = query.__dict__
            for key, value in request.items():
                if key in query_dict:
                    setattr(query, key, value)
                
        except Exception as err:
            print(err)
            return err
        else:
            session.commit()
            return query
            
    def delete(self, id):
        DBsession = sessionmaker(bind=self.engine)
        session = DBsession()
        
        try:
            query = session.query({{namePascalCase}}).filter_by(id=id).one()
            session.delete(query)
        except Exception as err:
            print('error:',err)
            return err
        else:
            session.commit()
            return None

repository = {{namePascalCase}}DB()

Detail

  • Create a .db file with create_engine() and bind it with the corresponding entity class.
  • In the create_engine() argument, connect_args configures the db to be accessed by other threads as well. This enables logic to be executed when an event, the corresponding eventType, is received from kafka consumer.
  • The function to save to db is save, and the entity class is entered as an argument.
  • The function that reads all data from db is a list, and the function that brings id as a key is find_by_id.
  • The function that updates a record in the db is update, and it is entered in the form of a dict as an argument.
  • The function that deletes a record from the db enters an id as an argument to delete.
  • To maintain persistence, the same context can be maintained in other files by initializing the db object in the db file.

· KafkaProcessor.py

forEach: BoundedContext
fileName: KafkaProcessor.py
path: {{name}}
---
from kafka import KafkaProducer
from flask_kafka import FlaskKafka
from threading import Event
import json
{{#policies}}
{{#relationEventInfo}}
from PolicyHandler import whenever{{eventValue.namePascalCase}}_{{../namePascalCase}}
{{/relationEventInfo}}
{{/policies}}

class StreamHandler():
	
	def __init__(self):
		INTERRUPT_EVENT = Event()
		self.destination = "{{options.package}}"
		self.group_id = "{{name}}"
		self.producer = KafkaProducer(acks=0, compression_type='gzip', bootstrap_servers=['localhost:9092'], value_serializer=lambda x: json.dumps(x).encode('utf-8'))
		self.consumer = FlaskKafka(INTERRUPT_EVENT,
		         bootstrap_servers=["localhost:9092"],
		         group_id= self.group_id
		         )


	def produce(self, msg):
		self.producer.send(self.destination, value=msg)

streamhandler = StreamHandler()

@streamhandler.consumer.handle(streamhandler.destination)
def consume(msg):
    my_json = msg.value.decode('utf8').replace("'", '"')
    data = json.loads(my_json)
    {{#policies}}
    if data['eventType'] == {{#relationEventInfo}}"{{eventValue.namePascalCase}}"{{/relationEventInfo}}:
        {{#relationEventInfo}}
        whenever{{eventValue.namePascalCase}}_{{../namePascalCase}}(data)
        {{/relationEventInfo}}
    {{/policies}}

Detail

  • The overall config related to kafka is initialized in StreamHandler.
  • @streamhandler.consumer.handle annotation executes the function below it in a new thread.
  • In the consume() function, deserialize the message received through channel listening, classify the event type, and call the corresponding function.

· util.py

  • The autobinding function that exists in Java does not exist in Python. Code that implements this
def AutoBinding(source, target):
	
	if isinstance(source, dict) == True:
		source_dict = source
	else:
		source_dict = source.__dict__
	
	target_dict = target.__dict__

	for key, value in source_dict.items():
		if key in target_dict:
			setattr(target,key,value)

	return target

Detail

  • AutoBinding is a function that maps dict type or class to class.
  • It is used when binding the request coming in the form of a dict to the entity class, binding the message coming through the kafka channel to the event class, and binding the entity class and the event class

· Hateoas.py

  • Module to convert Response to Hateoas format
forEach: BoundedContext
fileName: Hateoas.py
path: {{name}}
---
import json
from collections import OrderedDict

def GET_id_response(base_url, entity):
	response=OrderedDict()
	entity_name = entity.__class__.__name__.lower()

	response['_links'] = {
		entity_name :{ 					
			'href':base_url
		},
		'self':{
			'href':base_url
		}
	}

	entity_dict = entity.__dict__
	for key, value in entity_dict.items():
		if key != '_sa_instance_state' and key != 'id':
			response[key] = value
	return response

def POST_response(base_url, entity):
	response = OrderedDict()
	url = base_url + '/'+ str(entity.id)
	entity_name = entity.__class__.__name__.lower()

	response['_links'] = {
		entity_name: {
			'href': url
		},
		'self':{
			'href': url
		}
	}
	
	entity_dict = entity.__dict__
	for key, value in entity_dict.items():
		if key != '_sa_instance_state' and key != 'id':
			response[key] = value
	return response

def GET_list_response(url_root,base_url,entity_list):
	totalElements = len(entity_list)
	size = 20
	number = totalElements//size
	response = OrderedDict()
	obj_list = []

	for ele in entity_list:
		obj_dict = OrderedDict()
		ele_dict = ele.__dict__
		ele_name = ele.__class__.__name__.lower()
		url = base_url + '/'+str(ele.id)
		
		obj_dict['_list'] = {
			ele_name: {
				"href":url
			},
			"self":{
				"href":url
			}
		}
		
		for key, value in ele_dict.items():
			if key != '_sa_instance_state' and key != 'id':
				obj_dict[key] = value

		obj_list.append(obj_dict)

	target = base_url.split("/")[-1]
	response['_embedded'] = obj_list
	response['_links'] = {
		'profile' : {
			'href': url_root+"profile/"+target
		},
		'self':{
			'href': base_url+"{?page,size,sort}",
			'templated': True
		}
	}
	response['page']={
		'number': str(number),
		'size': str(size),
		'totalElements':str(totalElements),
		'totalPages': str(number+1)
	}
	return response

Detail

  • GGET_id_response is a function that creates a response format for the entity that has the id as the key. The entity class is entered as an argument. -POST_response is a function that creates a response format for the post method. As an argument, the entity class received by the post is entered.
  • GET_list_response is a function that creates a response format when all records are fetched. As an argument, it is entered in the form of a list of entity classes.
uEngine has registered trademarks and uses trademarks. and MSA-Ez is licensed under the GNU General Public License version 3.0.