Of late I was troubleshooting a connection issue with AWS Transit Gateway (TGW), and I thought of penning down what I learnt and how the issue was gradually resolved.
AWS Transit Gateway is a peering solution which offers a central hub to manage connectivity b/w VPC’s. It’s better than VPC peering when ‘peering at scale‘ is required. VPC peering might hit hard limits and the configurations might be difficult to maintain. The TGW setup is quite straight forward and if I were to put in simple words (without getting into technicalities), the below steps would suffice to get a cross VPC connection working:
Hub Setup (Say in AWS account A – Can be source account as well)
1. Create Transit Gateway (if using AWS console – under VPC -> Transit Gateway)
3. Resource Access Manager (RAM) – Share created TGW and add principals of Account B, C etc..
4. Once attachments from other accounts are created, they will appear in TGWAttachment section
5. Create TGWRoute Tables for each attachment – The routes should have the destination CIDR listed
6. Add Routes to VPC subnet Route Table, setting source CIDR and destination as TGW
In Destination Account B (Which we want to peer with Account A)
1. Goto RAM and accept shared TGW
2. The shared TGW should be visible now in TGW section
3. Create Attachment specifying VPC and TGW
Note
Transit GW Route Tables/Associations/Propagations can be only seen in hub and not in other accounts
4. Add Routes to VPC subnet Route Table, setting source CIDR and destination as the shared TGW
Similar configurations can be done in other accounts (B, C.. etc) which we want to peer with.
Troubleshooting
For troubleshooting connectivity, the below should be checked (in Hub, Source and Destination accounts):
1. Check TGW Transit Gateway Routes
The destination CIDR -> Attachment route should be present
2. Check VPC Subnet Routes
The destination VPC CIDR -> TGW route must be present
3. Check Subnet NACL
Connections must be allowed (both inbound and outbound) to the destination VPC CIDR
4. Check Instance SG
Connections must be allowed (outbound/inbound depending on connection initiator) to the destination VPC CIDR
It was pretty simple to troubleshot once the above steps were understood. In my case the issue was with the VPC Subnet Route which was missing the destination VPC CIDR -> TGW route.
How did your troubleshooting go ๐
Hope this is of some help to those who are stuck with TGW connectivity issues!
Today, I’ll be talking about an interesting combination. Running Kafka consumer as a scheduled Celery task. Go on reading, if you find this interesting ๐
First of all, there are multiple ways to get a Kafka consumer running. Some of them are:
Having a separate microservice – This is a good approach as it allows independent scaling of the consumers. However there is an overhead of managing, monitoring this microservice, ensuring connectivity is fine etc. Still ideal for large, complex setup where we want to have dedicated consumers capable of scaling in/out on demand.
Running as a daemon task/background task – The consumer can as well run in background or as a daemon process. This will be a forever running process as consumers are meant to be. However troubleshooting/monitoring daemon processes can often be cumbersome.
We did not go with the above approaches, as ours is a light weight application and we did not want a forever running process running along with our application for the consumer. Comes in Celery ๐
Before I dive into the implementation specific details, below is the tech stack we use:
Django (django-2.1.7)
Celery (celery-4.3.0)
Kafka (kafka-python-2.0.2)
Python (python-3.2)
Celery is a distributed task scheduling module which can be easily integrated with applications (in our case, python celery) and provide task scheduling options in a simple yet reliable manner. We decided to use Celery to schedule consumer polls at regular intervals instead of running it forever, however things like graceful message consumption, offset management had to be tuned to make this work. Let’s discuss how we went about this.
We use kafka-python module in our Django app to implement the consumer. Although kafka-python does not support transactional producer/consumer at the moment, it does not cause any issues per se. Some caveats being, there might be a lag in message consumption as the control batch messages (commit/abort markers) will likely not get consumed.
Below are the steps we followed:
Define a Celery task to poll the consumer every 5 mins.
Consumer has a poll timeout of 3 mins (meaning it will stop polling and end task if no messages are available for consumption within this duration).
The Celery task is force killed once the consumer times out, to prevent the task from running infinitely.
A new consumer will run every 5 minutes (in a new Celery task) and even if an old one is running, it will not impact as the consumers will be part of same consumer group.
The consumer has auto commit enabled which will commit the offset for consumed messages every second.
In-case the consumer goes down in between/before/after processing a message, and commit is not done, it will be re-tried once a new consumer is up. Duplicate messages are handled with some explicit checks as well (based on our use-case)
In-case the consumer goes down in between/before/after processing a message and commit is done, it will not be re-tried.
The auto_offset_reset flag is set to latest, meaning in-case the offset is corrupted (due to Kafka crash etc), messages will not be re consumed from beginning, preventing duplicate message consumption.
Let’s have a look at the consumer implementation. I’ll not go over the Celery task setup part as that is readily available in the Celerygetting-started guide.
import os, signal, time
from kafka import KafkaConsumer
def kafka_consumer():
try:
# Signal handler to kill Celery task
def handler(signum, frame):
print("Hard exiting celery task as consumer poller has timed out...")
sys.exit(0)
# Initialize Kafka consumer
consumer = KafkaConsumer('topic_name',bootstrap_servers=['kafka_broker:port'],
api_version=(0, 10),
# SSL settings required incase your broker has mTLS configured, else skip
security_protocol='SSL',
ssl_check_hostname=True,
ssl_cafile='ca.pem',
ssl_certfile='cert.pem',
ssl_keyfile='key.pem',
consumer_timeout_ms=180000,
group_id='consumer_group_name',
auto_offset_reset='latest', # Guarantees no re-consumption of messages in-case of Kafka crash
enable_auto_commit=True,
auto_commit_interval_ms=1000)
print("Polling for topic: {topic}".format(topic='topic_name'))
for msg in consumer:
print("Message Key is {k} and Value is {v}".format(k=msg.key,v=msg.value))
# TODO: Process Message
# TODO: Handle duplicate consumption explicitly if required
# Allow time for auto-commit before closing consumer thread
time.sleep(2)
except Exception as e:
print('Exception {err} while consuming message(s) from Kafka: '.format(err=str(e)))
finally:
consumer.close
# Kill celery task
signal.signal(signal.SIGALRM, handler)
signal.alarm(1)
Some points to note in the implementation above:
The Celery task is only killed once the consumer times out and has no new messages to consume. This ensures consumer is not killed in between while consuming/processing messages.
A delay of 2 secs is introduced with auto-commit duration set to 1 sec. This ensures processed message is always committed.
Note the explicit handling of duplicate message processing is important as in-case of consumer/Kafka crash, messages may be re-consumed, in-case the commit has not been done.
The above setup is live for our application, deployed in K8s and works exactly the way we want!
Recently I was interviewed by Glovo for the position of Senior DevOps Engineer which is one of Barcelona’s fastest growing Gen 2 startup. It was a great experience and I am dotting down the process, my learnings and some tips which should be helpful to people attending interviews with similar companies/roles.
There were 6 rounds and a final feedback session in total:
Introductory HR + Basic technical – 45 mins
Technical round – 1.5 hrs
Codility – Online take home test (a 2 hrs timed test, to be completed within a week)
System architecture & scalability – 1hr
Pair Programming/Scripting – 1.5 hrs
Behavioural round with manager – 1 hr
Final result and feedback with HR – 30 mins
Each round was well planned and structured. Post every round, the next step in the process was explained and feedback was shared over email. Expectation and interview structure/interviewer details were shared with some basic tips before the interview.
Now lets discuss each round in detail.
Introductory HR + Basic technical – 45 mins
This was a friendly HR round where I was asked basic stuff about myself, my interests, experience etc.
Also knowledge regarding the company was checked and the job profile was discussed in detail.
Some technical questions (basic screening questions) were thrown in towards the end.
Time was given to answer my questions as well.
Overall they checked my interest, communication skills, basic technical knowledge and if I fit the role being offered.
Tips
- Have your CV tailored according to the JD.
- Ensure to read about the company and know basic stats/it's core business etc.
Technical round – 1.5 hrs
A document with the expectation and topics to come prepared was given well before the interview.
It was a good technical discussion with one of the engineers whom I would be probably working with.
Questions were mostly situation based and how I would approach the problem and solve it.
Although some direct technical questions were asked as well.
Basically my technical expertise and problem solving approach was checked.
Tips
- Answer all approaches you can think of for a given problem.
- Give real world examples to show your expertise on the topic.
- Ensure to brush up basics before the interview.
Codility – Online take home test
There were 2 questions and Codility was the platform where the code was to be written and submitted on any language of my choice. (I chose Python3)
Some sample questions were shared and it’s very helpful to solve these sample questions to get used to with the Codility environment.
It was a 2 hr timed test, so timing was the key.
There were 2 questions – 1 simple (which had to be coded in shell script) and 1 medium to hard level difficulty (IMO)
Codility also runs performance test on the solution submitted and scores accordingly, so taking care of time and space complexity while writing the code was crucial.
Tips
- Get some competitive coding experience (Join Hackerrank, Leetcode, Codility, Codechef, Codeforce, Geeksforgeeks whichever you find good)
- Ensure to solve some sample questions in Codility to get a hang of it.
- You are also allowed to code in your IDE and copy the code in Codility (if that helps)
System architecture & scalability – 1hr
Again the basic expectation and tools to be used (draw.io to draw HLD for eg.) was shared before hand.
A common application was asked to be designed end to end with some constraints/conditions pre-defined.
Any approach/design/tool/technology was allowed be used, however every selection had to be backed with proper justification. (e.g. monolith vs microservices)
It was a highly interactive session and I was allowed to ask as many questions and justify/explain my design decisions.
The architecture diagram (HLD) was to be designed in draw.io with screen sharing/video turned on.
From DevOps perspective: HA, scalability, DR, CDN, LB, use of DB (CAP theorem) had to be known and explained in detail.
Overall it was a very interesting session.
Tips
- Read and have a good grasp on best infra design patterns, HA-DR, CDN, MessageBrokers, Caching, DB's (when to use what), CAP theorem, common bottlenecks etc
- Subscribe to any wesbite/youtube channel that provides system designing courses/videos.(e.g GauravSen's channel)
- Ensure to go through the system design of common apps like - Youtube, Facebook, Swiggy, Whatsapp, Uber. Seldom an out of the box app is asked to be designed. It will mostly be one of the commonly used apps.
Pair Programming/Scripting – 1.5 hrs
This was again on Codility, with screen sharing and video turned on.
It was pair programming, and the interviewer coded some lines and asked me to code next in some questions/or if stuck.
There were 2 interviewers, one active and one passive.
Again I felt the questions were given in order of their difficulty, starting with a program to be coded with bash script and the remaining 2 on any language of my choice.
It was again very interactive, and all constraints/edge cases were promptly answered when asked.
Overall my coding skills, problem solving approach, attention to details, good coding practices was tested.
Tips
- Ensure to ask as many questions as possible to narrow down all edge cases and constraints.
- Use comments, proper naming convention and test cases. (I missed writing test cases and was told in the feedback)
- Write the algo you intend to follow before starting to code.
- Think loud and convey your thought process.
- If you are unable to code the solution, at-least try to provide a high level algo.
Behavioural round with manager – 1 hr
This round was taken by one of the engineering managers and focussed totally on my values/communication skills and thought process.
Questions were basically on how I would react on a given situation, what I think are good engineering practices, how to promote collaboration, how good I can be in a diverse work environment etc.
Also how I handle failure and criticism, my view on feedback coupled with some questions on high level engineering practices/value delivery/scrum processes/CI-CD techniques etc.
Being a senior role, questions related to coaching junior developers, conflict management etc. were asked as well.
Sufficient time was allotted to answer all questions asked my me in the end.
Tips
- Give examples from your experience wherever possible.
- Prepare well for answering questions like - Your strength, weakness; Your most difficult project; How you handle failure etc.
- Speak slowly and confidently.
- Ask questions you may want, to show your interest and also convey your working style.
The final HR discussion was to give detailed feedback of all the rounds and declare the final outcome. Also a detailed explanation of the company benefits was given.
Fortunately the outcome was positive in my case ๐
I have been always interested in Terraform and I am happy finally I could take out some time for this certification. Am stating my learning path, resources I referred and experience I had to get myself Hashicorpcertified!
What I feel is if you are into DevOps but are new to Terraform, max 1 week is sufficient to understand Terraform concepts and clear the certification. Of-course a right learning plan is required.
And if you already have experience/work with Terraform on a day to day basis, 2 days is enough!
I will not cover the basic details such as how much the course costs, how you can register etc, as these things are readily out there and if you just visit the Hashicorp site you will find all info. I will focus more on my learning path.
So here it is! The learning path I followed:
For understanding basic concepts, I went through the below courses:
If you do-not want to register with any Cloud provider, Katakoda is a good place to practise basic stuff. (Although I would recommend signing up with some provider and try to experiment while being in the free tier)
However, the real prize is when you actually use Terraform to manage your infra with a breeze and feel “Where were you all this time?” ๐
All the best to all looking forward to this certification!
Earlier today, I cleared the SAFe 5 Scrum Master certification and am feeling pretty good about it. Am penning down my thoughts and some tips on the same.
SAFe is Scaled Agile Framework, and although I had heard a lot about it before, I didn’t get a chance until recently, to understand and know more about it. I should say, it’s great and has quite changed my perspective on how things are supposed to run in a true agile setup.
Agile/Scrum/Kanban – These have been buzz words but knowing more about it and understanding its true essence and how it actually facilitates quality software development end to end is really the key.
So we need to mandatorily attend the SAFe SM training to appear for the certification. The training itself was very interactive and informative and I should say 75% of it is done if the training is attended and understood. The remaining 25% lies with going over the excellent content again provided by SAFe to refer back and get the basics right.
Not to mislead, but there is ample amount of theory to it, but the training with interactive activities and sessions helped a lot to grasp the concept. I would easily say, the training + 1 day of preparation/revision would suffice to clear the certification.
For me, the training content, the practise exam (provided by SAFe) along with some more practise samples from coursehero worked.
So that’s about it. All the best to all aspiring SSM’s. ๐
Response time is of utmost importance for any application. It not only makes the application more responsive but also enhances the user experience. In one of my previous post we utilized memcache to cache view response and improve the response time (performance) of the site by almost 10X. This was basically at server-side. However we can further improve this by making some smart client side (browser) caching. Django’sConditional View Processing is very apt for this scenario.
Django http decorator (django.views.decorators.http) functions provide an easy way to set cache for conditional requests. Etags or content based caching can be very useful for cases where setting time based caching can be a challenge. When etag is set, an If-None-Match request header is sent with the etag value of the last requested version of the resource for all subsequent requests. If the current version has the same etag value, indicating its value is the same as the browserโs cached copy, then an HTTP status of 304 is returned and content is served from the browser cache, boosting the response time as content does not need to be fetched from server/server cache.
The advantage with etag is that, caching can be done based on the response content. Although this involves in generating the etag based on the content which can be a hash or any other string identifier. Web servers (like nginx) can also do this nowadays, however we will see in this post how to set this from application end in Django.
The @etag decorator really makes things simple in Django. You can read more about this decorator in the Django official doc, but here I’ll focus on the implementation.
In my scenario, I am caching the etag as well so that the same etag can be used and also it provides me a manual way to clear the cache and reset the etag (django-clearcache) if required without making any code change/server restarts.
The first thing is to write the get_etag function which returns the etag.
# Get etag (for client side caching)
def get_etag(request, **kwargs):
etag_key = request.path.split('/')[2]
return cache.get(etag_key, None)
The snippet above returns the etag from cache. The cache key is based on the request path, so that the etag decorator can use the same get_etag function for multiple view functions. Once this is done we just need to specify the decorator in our view function.
It’s as simple as shown above. Now the etag is set along with the view’s response cache as below:
# Handle caching
def cache_response(view, url, cache_name, etag_cache_name):
# Get response from cache
response = cache.get(cache_name)
# Invoke POST call to get data from DRF if cache is not set
if not response:
status, response = send_api_request(url, view, None, None)
if status != 200:
raise Exception('Error fetching data from API: ' + response.content)
else:
# Set cache
cache.set(cache_name, response, None)
cache.set(etag_cache_name, str(datetime.datetime.now()), None)
return response
This would set the response as well as the etag in cache. Here am using the datetime stamp as the etag value.
Now, for etag cache invalidation, it should be the same process as followed for the response cache invalidation, i.e. whenever the model is changed the cache should be invalidated. It is not time/user based. Thus once the cache is set, all users would benefit from it and would get invalidated only when the model has been changed. Cache invalidation is a very crucial factor and is totally based on your application/requirements and how static/dynamic your content is. Be sure to spend some time understanding which approach would best fit before finalizing the design.
In my case, the post save signal of the model is where I put the invalidation logic, which would basically be triggered whenever the model is changed.
# Signal to handle cache invalidation@receiver(post_create_historical_record)
def invalidate_cache(sender, **kwargs):
model_name = kwargs.get('instance').__class__.__name__
if model_name == 'Server' and cache.has_key('server_cache') and cache.has_key('server_data'):
cache.delete('server_cache')
cache.delete('server_data')
if model_name == 'Uri' and cache.has_key('uri_cache') and cache.has_key('uri_data'):
cache.delete('uri_cache')
cache.delete('uri_data')
I used the django_simple_history post save signal as I have history enabled for multiple models. Thus instead of using different signals for different models, the django_simple_history’s post save signal can be used.
Note – Incase you have deployed your application in Kubernetes and use Nginx Ingress Controller, ensure to set gzip to off, else Nginx will discard etag from response header. This can be done by the below annotation in your Ingress manifest:
Kafka is one of the most popular open source distributed streaming platform used very often for multiple requirements. However its deployment can get complicated given its architecture especially for HA setups.
Deployment in Kubernetes is simplified a lot using Helm, however in-case of customizations required for project specific needs, it can be tricky if we do-not understand how the helm charts are configured.
So while working for Kakfa setup in one of my current projects, we had some custom requirements, with mTLS and exposing the services externally. Have tried to state and changes done to make the deployment seamless with these custom requirements.
Let’s start with the basic setup first. It’s pretty simple and well documented (at least for the Bitnami Helm, which we use). So just adding the chart repo and executing:
should get the Kafka up and running as a stateful set. You can check the options that can be overridden in the readme file. However, for further customizations (if required), it’s better to download the chart and use it from a custom repo.
Some customizations that I would be discussing are:
External access setup using LoadBalancers with custom DNS
External access setup using NodePort and external load balancer
mTLS (mutual TLS or 2 way TLS) setup with wildcard cert (i.e. one cert used by multiple brokers)
External access with LoadBalancer service type and custom DNS
Now, the bitnamihelm supports setting up LoadBalancers for external access. You can refer the configuration here. What I would focus on, is specifying custom domain for these LoadBalancers which is not an option by default, as the helm depends on the LoadBalancer service to provide the DNS as output once created (which does not happen with all providers).
If we see the file – scripts-configmap.yaml in the helm template:
k8s_svc_lb_ip() {
local namespace=${1:?namespace is missing}
local service=${2:?service is missing}
local service_ip=$(kubectl get svc "$service" -n "$namespace" -o jsonpath="{.status.loadBalancer.ingress[0].ip}")
local service_hostname=$(kubectl get svc "$service" -n "$namespace" -o jsonpath="{.status.loadBalancer.ingress[0].hostname}")if [[ -n ${service_ip} ]]; then
echo "${service_ip}"
else
echo "${service_hostname}"
fi
k8s_svc_node_port "{{ $releaseNamespace }}" "$SVC_NAME" | tee "$SHARED_FILE"
...
...
export EXTERNAL_ACCESS_IP=$(echo '{{ .Values.externalAccess.service.loadBalancerIPs }}' | tr -d '[]' | cut -d ' ' -f "$(($ID + 1))")
As you can see above, the hostname is set only when the Loadbalancer IP is not set. This needs to be updated for supporting our requirement to specify the domain as below:
# Change to specify custom LoadBalancer DNS instead of IP
export EXTERNAL_ACCESS_IP="broker${ID}.{{ .Values.externalAccess.service.loadBalancerDomain }}"
This basically allows to set the domain from Values file rather than populating it from the LoadBalancer response. Each broker will thus have the external IP set to the DNS name: broker{id}.<domain>, (where id is the broker id – e.g. broker0.example.com) which will be further added in the ADVERTISED_LISTENER property. Note, that it is important to set the EXTERNAL_ACCESS_IP as this is set in the ADVERTISED_LISTENER property:
If this is not set correctly, although you might be able to connect to Kafka, however you will get errors while producing/consuming messages.
Once this change is done, you can add the domain entry in values.yaml as below:
externalAccess:
service:
## Ensure the LoadBalancer IP created has a DNS entry added as broker<ID>.domain.com (e.g. broker0.example.com)
loadBalancerDomain: example.com
This works seamlessly, and you will be able to access Kafka using the DNS name now. However, note that the A record for the DNS pointing to the LoadBalancerIP needs to be set manually for each broker at your DNS provider end.
2. External access setup using NodePort and external load balancer
The process is quite similar for NodePort setup, the only difference being that LoadBalancer setup needs to be handled totally at the provider end and will not be taken care by Kubernetes. So just update the service type to NodePort and there is already a property (externalAccess.service.domain) which is specifically to set domain for NodePort supported out of the box by the bitnami helm. Just ensure you add the K8s nodes with correct port to the LoadBalancer pool members for each broker.
3. mTLS setup with wildcard cert (used by multiple brokers)
Ok, now we come to a really interesting part, which can give you headaches and often not work due to incorrect configurations. I’ll divide this section into 2 parts:
How to generate a self signed wildcard cert to be used
Customizations required in the helm to support this wildcard cert
Generate SSL certs
While there are tons of resources in the net to do this, I’ll point the exact steps to generate a self-signed SSL cert which can be used for enabling mTLS in Kafka. Before that, mutual TLS is basically a 2 way TLS, which requires a client cert to achieve AuthN. Thus once Kafka has mTLS, enabled, a client side cert will be required to connect to it, which is much safer than using a password.
Steps to generate self-signed SSL cert and import it to keystore and truststore (will be using java keytool and openssl):
1. Create a certificate authority (CA)
openssl req -new -x509 -keyout ca-key -out ca-cert -days 1200 -passin pass:kafkapassword -passout pass:kafkapassword -subj "/CN=*.example.com/OU=<UNIT>/O=<ORG>/L=<LOCATION>/C=<COUNTRY>"
# Client Certs
2. Create client keystore
keytool -noprompt -keystore kafka.client.keystore.jks -genkey -alias localhost -keyalg RSA -keysize 2048 -dname "CN=*.example.com/OU=<UNIT>/O=<ORG>/L=<LOCATION>/C=<COUNTRY>" -storepass kafkapassword -keypass kafkapassword
3. Create client certificate signing request
keytool -noprompt -keystore kafka.client.keystore.jks -alias localhost -certreq -file cert-client-unsigned -storepass kafkapasswordย
4. Sign the client certificate with the CA created
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-client-unsigned -out cert-client-signed -days 1200 -CAcreateserial
5. Import CA and signed client certificate into client keystore
keytool -import -keystore kafka.client.keystore.jks -file ca-cert -alias theCARoot
keytool -noprompt -keystore kafka.client.keystore.jks -alias localhost -import -file cert-client-signed -storepass kafkapassword
# Server Certs
6. Import CA into server truststore
keytool -noprompt -keystore kafka.server.truststore.jks -alias theCARoot -import -file ca-cert -storepass kafkapassword
7. Create server keystore
keytool -noprompt -keystore kafka.server.keystore.jks -genkey -alias buildserver -keyalg RSA -keysize 2048 -dname "CN=*.example.com/OU=<UNIT>/O=<ORG>/L=<LOCATION>/C=<COUNTRY>" -storepass kafkapassword -keypass kafkapassword
8. Sign server certificate
keytool -noprompt -keystore kafka.server.keystore.jks -alias buildserver -certreq -file cert-server-unsigned -storepass kafkapassword
9. Sign the server certificate with the CA created
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-server-unsignedย -out cert-server-signed -days 1200 -CAcreateserial
10. Import CA and signed server certificate into server keystore
keytool -import -keystore kafka.server.keystore.jks -file ca-cert -alias theCARoot
keytool -noprompt -keystore kafka.server.keystore.jks -alias buildserver -import -file cert-server-signed -storepass kafkapassword
The above would, generate the required client and server certs, keystore and truststore required to setup mTLS.
Note, you may specify SAN/IP’s additionally in the certs if you have such requirements or want to make the mTLS work with IP addresses/other sub domains. (using -ext SAN flag). You can google up this flag to read more.
Incase you have a valid CA, you can sign the generated certs using the CA, instead of using the self created CA.
2. Customizations required in the helm to support this wildcard cert
Once the certs are generated and in place, we need to make some modifications in the helm, as by default it expects each broker to have an individual cert. Firstly, set the required parameters correctly as specified here. To use the wildcard cert for all broker, update the file scripts-configmap.yaml as below:
# Support wildcard cert (one keystore for all brokers)
if [[ -f "/certs/kafka.truststore.jks" ]] && [[ -f "/certs/kafka.keystore.jks" ]]; then
mkdir -p /opt/bitnami/kafka/config/certscp "/certs/kafka.truststore.jks" "/opt/bitnami/kafka/config/certs/kafka.truststore.jks"
cp "/certs/kafka.keystore.jks" "/opt/bitnami/kafka/config/certs/kafka.keystore.jks"
else
echo "Couldn't find the expected Java Key Stores (JKS) files! They are mandatory when encryption via TLS is enabled."
exit 1
fi
The original file will have the {ID} configured, which basically needs to be removed to make it a global cert to be used by all brokers. The server keystore and trustore needs to be copied to the folder: files>jks. And thats about it. ๐
You should have a HA stateful Kafka cluster up and running in no time…
This post is kind of continuation of my last post on Inventory app with DRF in Django. As an enhancement, an approval workflow to moderate any updation/deletion/addition of a new model entry was to be put in place. Now, I did look up for ready made solutions and could find a few open source ones – Django Moderation was one of them. However, it had its limitation and was not compatible with the latest Django3.
So, I decided to come-up with a custom solution utilizing the DRF framework as well. This post basically outlines the approach I took, the challenges I faced and how they were resolved.
To start with, below is the flow diagram of the approval workflow that has been implemented:
The approach followed to implement the above:
Create an abstract model which needs moderation
Create 2 models inheriting the abstract model – One acts as the primary model, and the other an approval model, where changes can be moderated and then saved to the primary model based on actions taken by moderator
Instead of using Django admin site for data entry, create a form (template) to Add/Update data – Used DRF Forms for this as all permission and field serialization is handled by DRF – so effective DRY principal!
Create a separate template for the Approval model, from where entries can be moderated.
Whenever an entry is added/updated, it is first saved in the approval model. On successful moderation, the entry is saved in the primary model. Sounds simple right? ๐ Well, let’s find that out.
Now, a lot of templating work can be prevented in-case you go with the out of the box Django admin site, however I would suggest to spend some time to have templates designed for these actions which would give a more professional look to the site and also allow granular customizations. (especially in the UI)
Let’s have a look at the model:
# Abstract Model
class AbstractServer(models.Model):
component = models.ForeignKey(Component, blank=False, null=False, on_delete=models.CASCADE)
name = models.CharField(max_length=50, unique=True, blank=False, null=False)
ip = models.GenericIPAddressField(unique=True, blank=False, null=False)
dc = models.ForeignKey(Dc, blank=False, null=False, on_delete=models.CASCADE)
environment = models.ForeignKey(Environment, blank=False, null=False, on_delete=models.CASCADE)
type = models.ForeignKey(Type, blank=True, null=True, on_delete=models.CASCADE)
state = models.CharField(
max_length=10,
choices=STATES,
blank=False
)
tags = models.ManyToManyField(Tag, blank=True)
group = models.ForeignKey(Group, blank=False, null=False, on_delete=models.CASCADE)
description = models.TextField(blank=True, null=True)
requestor = models.ForeignKey(User, blank=False, null=False, default=1, on_delete=models.CASCADE)
class Meta:
abstract = True
# Primary Model (inherited from abstract model)
class Server(AbstractServer):
changed_by = models.ForeignKey(User, blank=False, null=False, on_delete=models.CASCADE, related_name='server_changed_by')
history = HistoricalRecords()
@property
def _history_user(self):
return self.changed_by
@_history_user.setter
def _history_user(self, value):
self.changed_by = value
def __str__(self):
return self.name
# Approval Model (inherited from abstract model)
class ServerApproval(AbstractServer):
action = models.CharField(
max_length=10,
choices=APPROVAL_ACTIONS,
blank=False
)
approver = models.ForeignKey(User, blank=False, null=False, on_delete=models.CASCADE, related_name='server_approver')
status = models.CharField(
max_length=10,
choices=APPROVAL_STATES,
blank=False
)
datetime = models.DateTimeField(blank=False)
comments = models.TextField(blank=True, null=True)
def __str__(self):
return self.name
Note the ‘abstract=True‘ flag in the abstract model’s meta class. This is to ensure that the abstract model is not created in DB and only the inherited models are. Perfect use-case for our requirement. You can read more about model inheritance from the Django official documentation. There are some fields specific to the primary model as it has django-simple-histiory enabled, while the approval model has fields such as action, status, approver very specific to the model to capture moderation details as the intention is to save these information as well.
Now that our model is ready, let’s see how we can use DRF to provide a form to the user to add/update changes. And these changes should be read from the primary model however saved in the approval model first. Now the same thing can be done using Django formset totally leaving out DRF. However as we already have DRF configured and things as such as permissions, field serialization already done, I found using DRF’s TemplateHTMLRenderer a much elegant solution.
Once DRF is setup, meaning you have necessary serializers, url routers in place, we would need an additional approval model view to render the form and also save the details in the approval model.
# Server edit form backend
class ServerEditForm(APIView):
permission_classes = [IsAuthenticatedOrStaffUser,]
renderer_classes = [TemplateHTMLRenderer]
template_name = 'server_edit_form.html'
style = {'vertical_style': {'template_pack': 'rest_framework/vertical'},
'horizontal_style': {'template_pack': 'rest_framework/horizontal'}}
def get(self, request, **kwargs):
param = kwargs.get('param')
# Pass users
users = User.objects.exclude(id=request.user.id)
# Render add server form
if param == 'add':
serializer = ServerApprovalWriteSerializer()
return Response({'serializer': serializer, 'action': 'add', 'users': users, 'style': self.style})
# Render update server form
elif 'update' in param:
server = get_object_or_404(Server, pk=kwargs.get('param').replace('update',''))
serializer = ServerWriteSerializer(server)
return Response({'serializer': serializer, 'server': server, 'users': users, 'style': self.style})
def post(self, request, **kwargs):
serializer = None
param = kwargs.get('param')
# Update status and requestor fields
request.data._mutable = True
request.data['status'] = 'Pending'
request.data['approver'] = request.POST.get('approver')
request.data['requestor'] = request.user.pk
request.data['datetime'] = datetime.datetime.now()
# Submit add server request
if param == 'add':
request.data['action'] = 'Add'
name = request.data['name']
ip = request.data['ip']
# Validate if server exists in Server model
if Server.objects.filter(name=name).exists() or Server.objects.filter(ip=ip).exists():
return JsonResponse({'response': 'ERROR adding server Server/IP already exists!'})
# Validate if server exists in ServerApproval model
if ServerApproval.objects.filter(name=name).exists() or ServerApproval.objects.filter(ip=ip).exists():
server = ServerApproval.objects.get(Q(name=name) | Q(ip=ip))
if (server.status == 'Pending' or server.status == 'On Hold'):
return JsonResponse({'response': 'ERROR adding server Existing request found for server({0})/IP({1})! You may raise a new request once the existing request is Approved or Rejected'.format(name, ip)})
else:
# Updating server approval entry
serializer = ServerApprovalWriteSerializer(server, data=request.data)
else:
# Add new entry
serializer = ServerApprovalWriteSerializer(context={'request': request}, data=request.data)
# Submit update server request
else:
try:
request.data['action'] = request.POST.get('action').capitalize()
request.data['comments'] = request.POST.get('comments')
existing_server = get_object_or_404(Server, pk=kwargs.get('param'))
server = ServerApproval.objects.get(name=existing_server.name)
serializer = ServerApprovalWriteSerializer(server, data=request.data)
except ServerApproval.DoesNotExist:
# Server does not exists in approval queue! Create new entry in approval queue
serializer = ServerApprovalWriteSerializer(context={'request': request}, data=request.data)
# Handle serializer error
if not serializer.is_valid():
return JsonResponse({'response': 'ERROR submitting server details {0}'.format(str(serializer.errors))})
# Save changes
serializer.save()
# Redirect to response page
return JsonResponse({'response': 'Change successfully submitted to approval queue!'})
Now, the approval model APIView class as shown above, might look a little complicated however, much of it is because of the extra fields and requirement specific customizations I had. In simpler terms. what it does is, override the GET and POST methods of the APIView. The GET method returns the form fields which is rendered in template and shown to user to request for entry addition/updates. If you notice the serializers used in GET, for additions it uses ServerApproval serializer and for updations it uses Server serializer. This is done to get primary model fields for updations and approval model fields for new entries. For POST which actually performs the save(), only ServerApproval serializer is used, meaning changes will be read from primary model but will be saved into the new approval model only, which is what we want. Apart from this, there are some checks/validations to handle edge cases. (for e.g. User tries to update entry already requested and so on). There are some fields which are set explicitly, like requestor/datetime/status which basically is not rendered in the form as we do-not want the user to specify these.
Now, lets have a look at the form template and url patterns.
I have removed css/js from the template to reduce the LOC, however you can see how the DRF form is rendered. Some fields for which I want custom data (e.g. User fields with only current user), I do-not use the form-renderer, however simple HTML inputs. If you go back to the APIView class, you will find, data for these custom fields are fed separately to the serializer. You can find more information about DRF form rendering in the DRF official documentation. One thing I would like to highlight is, to customize the UI (e.g. add validations/style etc) a lot of element overrides needs to be done as the out of the box styling for DRF forms is pretty basic.
So majority of the work is completed and the only thing pending is handling the approval model template/actions. I additionally enhanced the approval model template to display a comparison table for update requests which show original data vs changes done for the entry. Below in the views.py you may refer the function to achieve this.
Basically, the approval model entry id is returned from the approval template and its corresponding primary model data is matched and returned back to the template to show the comparison.
For the final save, a simple model save action is triggered when the approval status is updated from the UI template.
All invocations is done using AJAX and the response is returned as in the functions above to display to the user.
When a model approval change is saved with status approved, a post-save signal is used to save the entry to the primary model as below:
# Signal to handle server approval
@receiver(post_save, sender=ServerApproval)
def create_update_approved_server(sender, instance, **kwargs):
# Add server entry on approval
if instance.status == 'Approved' and instance.action == 'Add':
server_obj = Server.objects.create(name = instance.name,
component = instance.component,
ip = instance.ip,
dc = instance.dc,
environment = instance.environment,
type = instance.type,
state = instance.state,
group = instance.group,
description = instance.description,
requestor = instance.requestor,
changed_by = instance.approver
)
server_obj.tags.add(*instance.tags.all())
# Update server entry on approval
if instance.status == 'Approved' and instance.action == 'Update':
server_obj, created = Server.objects.update_or_create(name = instance.name,
defaults = {'component': instance.component,
'ip': instance.ip,
'dc': instance.dc,
'environment': instance.environment,
'type': instance.type,
'state': instance.state,
'group': instance.group,
'description': instance.description,
'requestor': instance.requestor,
'changed_by': instance.approver
}
)
server_obj.tags.add(*instance.tags.all())
# Delete server entry on approval
if instance.status == 'Approved' and instance.action == 'Delete':
server_obj = Server.objects.get(name = instance.name)
server_obj.delete()
As I had a many-to-many field, had to add those after the model was saved using the .add () method as shown above. The signal ensures to add an entry to the primary table if not present (for add requests) and update an existing one if already present (update requests).
I am using datatables with jquery for the template. Snippet to display the approval form (in jquery dialog)
I hope you have got a basic understanding of how we can achieve moderation in Django models. The solution works seamlessly and also gives us control to customize stuff based on requirements.
I think thats a pretty long post. Hope you don’t get bored reading it ๐
Of late, I was creating a dashboard in Django for infra inventory tracking and was looking how best to do it. Basically, the requirements were:
Tabular view of servers and related details
Audit and History views for any change done
RESTful services to expose API’s to get/post/patch/delete data
Fast performance with minimal lag
Scalable/Configurable
I have worked on such requirements before, however the API part was something new. Enter the Django Rest Framework (DRF). Am listing down some of the challenges I faced while developing this.
The base was pretty simple, and it was a not-so-complex model based view that I wrote. Basically, a server model with server host, ip and other details. It had some foreign key’s like DC, Environment etc.
Now, with DRF, serializing the fields the way I want was the first hurdle. I wanted different serializers for read and write modes, where read required no authentication, better display of API data (especially the foreign keys) and eager loading enabled. Ended up with a mixin to achieve this. Note the SELECTFIELDS for 1-1 mapped fields and PREFETCH_FIELDS for 1-Many in the eager loading mixin. This was done to get around the infamous n+1 queries issue.
#serializer.py
# Eager load mxin
class EagerLoadingMixin:
@classmethod
def eager_loading(cls, queryset):
if hasattr(cls, "_SELECT_FIELDS"):
queryset = queryset.select_related(*cls._SELECT_FIELDS)
if hasattr(cls, "_PREFETCH_FIELDS"):
queryset = queryset.prefetch_related(*cls._PREFETCH_FIELDS)
return queryset
class ServerReadSerializer(serializers.ModelSerializer, EagerLoadingMixin):
component = ComponentSerializer(read_only=True)
dc = DcSerializer(read_only=True)
environment = EnvironmentSerializer(read_only=True)
type = TypeSerializer(read_only=True)
tags = TagSerializer(read_only=True, many=True)
group = GroupSerializer(read_only=True)
changed_by = UserSerializer(read_only=True)
_SELECT_FIELDS = ['component','dc', 'environment', 'type', 'group', 'changed_by']
_PREFETCH_FIELDS = ['tags',]
class Meta:
model = Server
fields = '__all__'
class ServerWriteSerializer(serializers.ModelSerializer):
class Meta:
model = Server
fields = '__all__'
read_only_fields = ('changed_by',)
def create(self, validated_data):
user = self.context['request'].user
validated_data['changed_by'] = user
return super().create(validated_data)
Also note, the ‘changed_by‘ readonly field, which was added to audit changes done by user, using the simple-history module. Will come to that in a bit, however the readonly setting for this field was important as we do-not want users to select this field while making a post request to the API. Also see the create method override, which is done to set the changed_by field to the current logged in user.
For permissions, had a custom permission class and the noteworthy django-filter which I find so much better than the default search field provided by DRF. Check the views.py file below:
#views.py
# Serializer mixins
class ReadWriteSerializerMixin(object):
read_serializer_class = None
write_serializer_class = None
def get_serializer_class(self):
if self.action in ["create", "update", "partial_update", "destroy"]:
return self.get_write_serializer_class()
return self.get_read_serializer_class()
def get_read_serializer_class(self):
assert self.read_serializer_class is not None, (
"'%s' should either include a `read_serializer_class` attribute,"
"or override the `get_read_serializer_class()` method."
% self.__class__.__name__
)
return self.read_serializer_class
def get_write_serializer_class(self):
assert self.write_serializer_class is not None, (
"'%s' should either include a `write_serializer_class` attribute,"
"or override the `get_write_serializer_class()` method."
% self.__class__.__name__
)
return self.write_serializer_class
# Custom permission
class IsAuthenticatedOrSuperUser(BasePermission):
def has_permission(self, request, view):
return bool(
request.method in ('GET', 'HEAD', 'OPTIONS') or
request.user and
request.user.is_authenticated and
request.user.is_staff and
request.user.is_superuser
)
# Viewsets
class ServerViewSet(ReadWriteSerializerMixin, viewsets.ModelViewSet):
queryset = Server.objects.all()
queryset = ServerReadSerializer.eager_loading(queryset) # Eager loading to improve performance
read_serializer_class = ServerReadSerializer
write_serializer_class = ServerWriteSerializer
permission_classes = [IsAuthenticatedOrSuperUser,]
filter_backends = (filters.DjangoFilterBackend, )
filterset_fields = {'component__name': ['exact', 'iexact'],
'name': ['exact', 'iexact'],
'ip': ['exact', 'iregex'],
'dc__name': ['exact', 'iexact', 'iregex'],
'environment__name': ['exact', 'iexact'],
'type__name': ['exact', 'iexact'],
'state': ['exact', 'iexact'],
'tags__name': ['exact', 'iexact', 'icontains', 'iregex'],
'group__name': ['exact', 'iexact'],
'description': ['exact', 'icontains', 'iregex']
}
This worked perfect, and with the eager loading setup, could see some performance improvement as well. But it was not enough.
Enter memcached. There can be a debate between memcached vs redis. However I just wanted a simple caching backend which just does caching. Redis is so much more to be frank and thus I decided to carry on with memcached. Did not go with the default memcached middleware which caches basically all sites. Had a custom caching configured with a preset key, so that the invalidation would be easier. Ignore, the ‘url filtering‘ logic below as that’s something my requirement specific, but otherwise the caching is quite straight forward.
# Handle caching
def cache_response(view, url, cache_name):
# Do-not serve from cache if filter is passed
if '&' in url:
status, response = send_api_request(url, view, None, None)
if status != 200:
raise Exception('Error fetching data from API: ' + response.content)
else:
# Get response from cache
response = cache.get(cache_name)
# Invoke POST call to get data from DRF if cache is not set
if not response:
status, response = send_api_request(url, view, None, None)
if status != 200:
raise Exception('Error fetching data from API: ' + response.content)
else:
# Set cache
cache.set(cache_name, response, None)
return response
For invalidation, utilized the simple-history post-save signal, which I also used, to add audit entries whenever a model was updated.
@receiver(post_create_historical_record)
def post_create_record_callback(sender, **kwargs):
# Define audit action
# Get model class name
model_name = kwargs.get('instance').__class__.__name__
action = 'Unknown'
history = kwargs.get('history_instance')
history_type = history.history_type
if history_type == '+':
action = 'Added'
elif history_type == '-':
action = 'Deleted'
elif history_type == '~':
action = 'Updated'
# Add audit entry
audit = Audit(name = action,
logs = '{0} {1}'.format(model_name, history.name),
datetime = datetime.datetime.now(),
user = kwargs.get('history_user'))
audit.save()
# Clear cache
if cache.has_key('audit_cache'):
cache.delete('audit_cache')
if model_name == 'Server' and cache.has_key('server_cache'):
cache.delete('server_cache')
One thing to highlight here is the ‘.__class__.__name__’ function. This function returns the class name of an object, thus allowing to get the model name modified and in turn used this information to invalidate the appropriate cache key. Would also like to point, that there is no direct way to clear a prefix cache which I tried before this approach. With this approach I have more control on what to cache and when to invalidate selective cache keys. This signal is very specific to django-simple-history module, so google up the same if some of the kwargs options are not clear. Practically the cache set had a None timeout, meaning it would never get invalidated on its own and would only get invalidated with any change with the model which is exactly what I wanted. This led to an almost 10X performance improvement. From ~ 20 seconds the response time came down to ~ 2 seconds.
While integrating simple-history found this very useful to have the history enabled in admin site with my custom model fields listed.
The save_model override is again for the changed_by field. The field is in the exclude list as well as we don’t want to show it to anyone even in admin site.
The model reference:
#models.py
class Server(models.Model):
component = models.ForeignKey(Component, blank=False, null=False, on_delete=models.CASCADE)
name = models.CharField(max_length=50, unique=True, blank=False, null=False)
ip = models.GenericIPAddressField(unique=True, blank=False, null=False)
dc = models.ForeignKey(Dc, blank=False, null=False, on_delete=models.CASCADE)
environment = models.ForeignKey(Environment, blank=False, null=False, on_delete=models.CASCADE)
type = models.ForeignKey(Type, blank=True, null=True, on_delete=models.CASCADE)
state = models.CharField(
max_length=10,
choices=STATES,
blank=False
)
tags = models.ManyToManyField(Tag, blank=True, related_name='tags')
group = models.ForeignKey(Group, blank=False, null=False, on_delete=models.CASCADE)
description = models.TextField(blank=True, null=True)
changed_by = models.ForeignKey(User, blank=False, null=False, on_delete=models.CASCADE)
history = HistoricalRecords()
@property
def _history_user(self):
return self.changed_by
@_history_user.setter
def _history_user(self, value):
self.changed_by = value
def __str__(self):
return self.name
You can see the simple-history settings configured and changed_by field introduced.
Hope that clears thing up. Let’s get to the template now. I used datatables for the presentation layer, and a djangorestframework-datatables module to fetch the data from the API itself which my app would be exposing, rather than querying the models again. While this may look like a round about way of achieving things, but would definitely remove a lot of lines of code and also enable filtering capabilities which django-filter provides for DRF. Finally this would also kind of be a unit testing for the API’s we write. The approach is definitely arguable, but it worked for me and worked really good.
So just had to use the DRFAPIRequestFactory to send a GET request to my API with an added ‘?format=datatables‘ param and wohoo, the response was well-formatted and ready for datatables to consume.
The request function using APIRequestFactory is as below:
However, one challenge I faced was to construct the URL. I did not want to use the nasty getpath/host commands and hardcode the context. This is what I found after much detailed lookup of the DRF documentation:
The router configurations for ViewSet is well documented in DRF site, so not going into that detail. But the above worked perfect. Note the ‘-list ‘ with the basename. This is something that DRF sets. You can refer the DRF docs to know more.
So we are pretty much close to the end. Some more enhancements I did was on the UI side (using JQuery/Colvis/Buttons/Menu.js/Intro.js) just to make the UI more appealing.
On the memcached management front, installed 2 modules, django-memcache-status, django-clearcache, which gave me the options to see the memcached stats and also clear the cache from Admin site.
That’s pretty much about it. The end result is something that I am really liking ! ๐
I passed the CKAD (Certified Kubernetes Application Developer) exam earlier this month and was since asked by many regarding my experience and tips. So here it goes…
There are so many tips and suggestions in net regarding vim setup, tackle easy ones first, time management, use the notepad etc.
IMO the most important thing is you should know to use imperative commands to create/modify resources. (Basically the kubectl create/run/label/annotate/set commands).
Secondly you should bookmark all important links from the documentation as for everything we canโt use imperative commands, so the copy-paste from documentation should be fast and less time should be spent to find what you need. Thus bookmarks.
If you are proficient with these 2 aspects it’s really simple. I attempted all questions sequentially, no custom vim settings, did not use notepad. I was able to complete the test with 15 mins remaining. Although I kept verification for the end.
Another important thing is the namespace. Ensure to set the context before each question without fail. Itโs very important and donโt ignore it.
For me the above worked ๐
Now regarding the prep courses and materials I used.
I did these 3-4 times till I could complete all of them in time. The lightning labs are really good and if you can solve them within the time I feel you are ready. Even after knowing the answers, I could not complete these initially within time. So the usage of bookmarks, copy-pasting everything was put to test and it really helped a lot.
One good thing about CKAD is the syllabus isnโt vast. There are just 10-15 imperative commands you should know IMO. I also glanced once at the CKAD exercise (just read through once) – https://github.com/dgkanatsios/CKAD-exercises
Again, if you know K8s basics, 1 week of preparation and the labs practise is more than enough to clear this exam.