Building a Zero-Trust Port Knocking Daemon from Scratch
In this post, I will walk through building a custom “Zero-Trust” port knocking daemon in Python from scratch. While enterprise environments rightfully rely on established tools like fwknop or Single Packet Authorization, building this from the ground up was the perfect way to get a deeper grasp of OSI Layer 4 interactions and asynchronous state management.
Here is a breakdown of what I wanted to achieve, the architecture, the logic, and the challenges I faced along the way.
The Challenge
I wanted to be able to connect to a server with a closed SSH port. To achieve this, I needed to dynamically open that SSH port using a specific signal—one that I could trigger without exposing any of my listening ports to the public internet.
The Setup
To simulate a real network interaction, I used a two-machine setup:
- The Client (Machine A): A standard Linux laptop with the IP
192.168.178.13 - The Server (Machine B): A Raspberry Pi 3 B+ with the hostname
sentinel-pi
First, let’s ensure the Pi is up to date and install the necessary firewall and packet-sniffing tools:
sudo apt update -y && sudo apt upgrade -y && sudo apt install iptables python3-scapy -yNote that if you are building this on a personal computer instead of a dedicated Pi, it is best practice to isolate your Python dependencies using a virtual environment.
In case Python is already installed on your system, creating a virtual environment and installing Scapy is very straightforward:
python3 -m venv venv
source venv/bin/activate
pip install scapyThe Firewall
Before I could build a secret knock, I had to lock the front door. I used iptables to manage the Linux kernel’s network filter.
Note that ufw is a fantastic alternative that wraps around iptables:
# iptables:
sudo iptables -A INPUT -p tcp --dport 22 -j DROP
# the same result in ufw:
sudo ufw deny 22/tcpFirst, I explicitly allowed established connections so that we stay connected after closing the SSH port and while setting this up.
sudo iptables -A INPUT -m conntrack --ctstate ESTABLISHED,RELATED -j ACCEPTNext, I blocked everything on port 22:
sudo iptables -A INPUT -p tcp --dport 22 -j DROPYou may be asking yourself: Why aren’t we blocking all ports by default? In a hardened production environment, the default INPUT policy should absolutely be set to DROP. But for this proof-of-concept, we are only blocking the SSH port to prevent accidentally cutting off other outbound traffic like DNS.
Python State Management
The core of this daemon is a Python script utilizing Scapy to sniff raw network packets before our firewall drops them. The script tracks state using a dictionary, storing the IP addresses, target ports, and timestamps of incoming TCP SYN packets.
It looks something like this:
{
"192.168.178.13": [
{"value": 7000, "timestamp": 1772553796.9509974},
{"value": 8000, "timestamp": 1772553797.9550185},
{"value": 9000, "timestamp": 1772553799.0455625},
],
"192.168.178.17": [
{"value": 22, "timestamp": 1772553796.9509974}
],
}Here is my first proof-of-concept Scapy loop:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from scapy.all import sniff, TCP, IP
# The secret sequence of ports
SECRET_SEQUENCE = [7000, 8000, 9000]
knocks = {}
def packet_callback(packet):
if packet.haslayer(TCP) and packet[TCP].flags == "S":
source_ip = packet[IP].src
target_port = packet[TCP].dport
print(f"Detected knock from {source_ip} on port {target_port}")
# call future functions here
print("Monitoring for knocks...")
sniff(filter="tcp", prn=packet_callback, store=0)
Testing the Concept
Once the basic script is running:
sudo python3 knocker.pyI can trigger the first knocks from the client laptop to verify it is working. Initially, I used nmap to send the knocks:
nmap -Pn -p 7000,8000,9000 sentinel-piHowever, the order of ports with nmap is not always consistent. A more reliable way to test this was by using a simple bash loop with netcat:
for port in 7000 8000 9000; do nc -z -w 1 sentinel-pi $port; doneExpanding the Script
To turn this into a fully functional daemon, I needed a few extra functions:
# Dynamically punching a hole in our firewall
open_the_door(source_ip)
# Garbage collection and cleanup
delete_older_timestamps(source_ip)
# Detection of the secret knock phrase
detect_close_timestamps(source_ip)To dynamically punch a hole in the firewall, I used Python’s subprocess module to execute an iptables rule, tailored to the current knocker’s IP address:
1
2
3
4
5
6
def open_the_door(source_ip):
subprocess.run(["sudo", "iptables", "-I", "INPUT", "1", "-s", source_ip, "-p", "tcp", "--dport", str(SSH_PORT), "-j", "ACCEPT"], check=True)
print(f"Unlocked Port {SSH_PORT} for {source_ip}")
time.sleep(OPEN_DOOR_TIMER)
subprocess.run(["sudo", "iptables", "-D", "INPUT", "-s", source_ip, "-p", "tcp", "--dport", str(SSH_PORT), "-j", "ACCEPT"], check=True)
print(f"Locked Port {SSH_PORT} for {source_ip}")
Garbage collection is necessary; otherwise, we would run out of memory storing data from botnet scans and anyone else probing the server. That’s why we only keep track of IP addresses that have connected within the last few seconds.
Instead of using a heavy for loop to calculate the time difference between every single knock stored in our dictionary, I optimized the time calculation using a telescoping series. By extracting the timestamp of the very first knock and subtracting it from the timestamp of the final knock, we achieve an O(1) time calculation for the entire duration, preventing unnecessary CPU cycles.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def delta_time(timestamp_1, timestamp_2):
return timestamp_2 - timestamp_1
def delete_older_timestamps(source_ip):
current_time = time.time()
entries = knocks.get(source_ip)
new_list = []
for entry in entries:
if delta_time(entry.get("timestamp"), current_time) < COOLDOWN_TIMER:
new_list.append(entry)
knocks[source_ip] = new_list
# GC: If the list is empty, delete the IP entirely
if len(knocks[source_ip]) == 0:
del knocks[source_ip]
Finally, to detect the secret knock phrase, the script evaluates the time delta and compares the last n recorded ports with our secret sequence. If the phrase matches, it clears the IP address’s knock history and spawns a background thread that asynchronously handles the dynamic firewall punch. This approach even allows for multi-user support without blocking the main sniffing loop.
1
2
3
4
5
6
7
8
9
10
11
12
13
def detect_close_timestamps(source_ip):
entries = knocks.get(source_ip)
if len(entries) != len(SECRET_SEQUENCE):
return
total_delta = delta_time(entries[0].get("timestamp"), entries[-1].get("timestamp"))
if total_delta < COOLDOWN_TIMER:
extract = [x.get("value") for x in entries]
if extract == SECRET_SEQUENCE:
del knocks[source_ip]
threading.Thread(target=open_the_door, args=(source_ip,)).start()
The Final Script
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import time
import threading
import subprocess
from scapy.all import sniff, TCP, IP
# Change to a custom port if needed
SSH_PORT = 22
# How long the configured SSH port will remain open
OPEN_DOOR_TIMER = 5
# How long the garbage collection keeps entries
COOLDOWN_TIMER = 10
# Knock on this port to delete your entire knock history (useful for high COOLDOWN_TIMER values)
EMERGENCY_BACKUP_KNOCK_PORT = 1111
# Your secret sequence of ports
# Make sure to knock in this exact sequence
SECRET_SEQUENCE = [7000, 8000, 9000]
# Dictionary to keep track of knocks per IP address (stores port and timestamp)
knocks = {}
def open_the_door(source_ip):
subprocess.run(["sudo", "iptables", "-I", "INPUT", "1", "-s", source_ip, "-p", "tcp", "--dport", str(SSH_PORT), "-j", "ACCEPT"], check=True)
print(f"Unlocked Port {SSH_PORT} for {source_ip}")
time.sleep(OPEN_DOOR_TIMER)
subprocess.run(["sudo", "iptables", "-D", "INPUT", "-s", source_ip, "-p", "tcp", "--dport", str(SSH_PORT), "-j", "ACCEPT"], check=True)
print(f"Locked Port {SSH_PORT} for {source_ip}")
def delta_time(timestamp_1, timestamp_2):
return timestamp_2 - timestamp_1
def delete_older_timestamps(source_ip):
current_time = time.time()
entries = knocks.get(source_ip)
new_list = []
for entry in entries:
if delta_time(entry.get("timestamp"), current_time) < COOLDOWN_TIMER:
new_list.append(entry)
knocks[source_ip] = new_list
# GC: If the list is empty, delete the IP
if len(knocks[source_ip]) == 0:
del knocks[source_ip]
def detect_close_timestamps(source_ip):
entries = knocks.get(source_ip)
if len(entries) != len(SECRET_SEQUENCE):
return
total_delta = delta_time(entries[0].get("timestamp"), entries[-1].get("timestamp"))
if total_delta < COOLDOWN_TIMER:
extract = [x.get("value") for x in entries]
if extract == SECRET_SEQUENCE:
del knocks[source_ip]
threading.Thread(target=open_the_door, args=(source_ip,)).start()
def packet_callback(packet):
if packet.haslayer(TCP) and packet[TCP].flags == "S":
source_ip = packet[IP].src
target_port = packet[TCP].dport
# Drop all standard requests to the SSH port
if target_port == SSH_PORT:
# Remove this log for production use cases! Helpful for development and debugging.
print(f"Detected SSH knock from {source_ip} on port {target_port}... dropping")
return
# In case you locked yourself out and don't want to wait for the cooldown timer
if target_port == EMERGENCY_BACKUP_KNOCK_PORT:
if source_ip in knocks:
del knocks[source_ip]
print(f"Resetting knocks for {source_ip}")
return
# Create the data entry
entry = {
"value": target_port,
"timestamp": time.time()
}
# Check if source_ip exists, run time-based garbage collection
if source_ip in knocks:
delete_older_timestamps(source_ip)
# If the source_ip doesn't exist in the dictionary anymore, initialize it
if source_ip not in knocks:
knocks[source_ip] = []
# Append the next knock
knocks[source_ip].append(entry)
# Remove this log for production use cases! Helpful for development and debugging.
# If a botnet runs a port scan against your server, this will flood your log.
print(f"Detected knock from {source_ip} on port {target_port}")
detect_close_timestamps(source_ip)
# Start sniffing on the default interface
print("Monitoring for knocks...")
sniff(filter="tcp", prn=packet_callback, store=0)
Final Testing
Now, when we run the script:
sudo python3 knocker.pyWe can start knocking on the Pi’s doors:
for port in 7000 8000 9000; do nc -z -w 1 sentinel-pi $port; doneAnd we’ll monitor the log from our Pi:
Monitoring for knocks...
Detected knock from 192.168.178.13 on port 7000
Detected knock from 192.168.178.13 on port 8000
Detected knock from 192.168.178.13 on port 9000
Unlocked Port 22 for 192.168.178.13Quickly switch shells and send an SSH request:
ssh pi@sentinel-piFinally, the log on the Pi should read:
Locked Port 22 for 192.168.178.13Changing the SSH Port
To further confuse automated attackers, standard practice is to change the default SSH port. This is done by editing the daemon configuration at /etc/ssh/sshd_config.
Let’s change our SSH port to 44423:
# other config...
#Port 22 -> Change to:
Port 44423
# other config...Update the knocker.py script variable:
# Change to a custom port if needed
SSH_PORT = 44423After restarting the SSH service:
sudo systemctl restart sshdUpdate the iptables drop rule to target 44423:
# Delete old rule
sudo iptables -D INPUT -p tcp --dport 22 -j DROP
# New rule for the new port
sudo iptables -A INPUT -p tcp --dport 44423 -j DROPNow, knocking in the right order:
for port in 7000 8000 9000; do nc -z -w 1 sentinel-pi $port; doneGives us a few seconds to securely connect on the new port:
ssh -p 44423 pi@sentinel-piThe logs on the Pi:
Monitoring for knocks...
Detected knock from 192.168.178.13 on port 7000
Detected knock from 192.168.178.13 on port 8000
Detected knock from 192.168.178.13 on port 9000
Unlocked Port 44423 for 192.168.178.13
# A short pause to allow the login
Locked Port 44423 for 192.168.178.13Note: The open timeframe does not need to include the time it takes to type in your SSH password. Once the initial network handshake is established, the connection won’t drop. Even if the firewall closes the port again, we stay connected thanks to the
ESTABLISHED,RELATEDiptables rule we defined at the beginning.
Additional Features
Reset Port
This acts as an extra backup in case of high cooldown times (e.g., for long knock phrases) or an application failure. If you send a packet to the specified emergency backup port, all the recorded knocks for your IP address will automatically be deleted.
if target_port == EMERGENCY_BACKUP_KNOCK_PORT:
if source_ip in knocks:
del knocks[source_ip]
print(f"Resetting knocks for {source_ip}")
returnDropping SSH Connections
I added this feature to prevent an IP’s knock history from getting cluttered with premature SSH port requests. This is particularly useful if you use an SSH GUI client or connection manager that attempts a background connection to test reachability before you even initiate the session. By explicitly dropping these standard requests in the Python daemon, I ensured the secret sequence remains uninterrupted.
# Drop all standard requests to the SSH port
if target_port == SSH_PORT:
# Remove this log for production use cases! Helpful for development and debugging.
print(f"Detected SSH knock from {source_ip} on port {target_port}... dropping")
returnChallenges Faced
While this was an incredibly rewarding project, it was not without its setbacks:
- Race Conditions & Concurrency: When a user successfully knocks, the daemon uses
time.sleep()to keep the door open for a few seconds. Initially, this froze the entire packet sniffer. I implemented Python’sthreadingmodule to handle the firewall modifications in a background thread, ensuring the daemon never drops a concurrent user’s packet. - State Exhaustion (Garbage Collection): If a botnet runs a basic port scan against the server, the state dictionary rapidly fills with random IP addresses, potentially causing a memory leak. That is why I implemented a time-based garbage collector that safely deletes an IP address from memory once its knock history expires.
- The Mutation Bug: Adding the garbage collector introduced a fatal
KeyError. The script would evaluate an IP, trigger the GC to delete it, and instantly try to append new data to the deleted key. I had to strictly decouple the cleanup logic from the initialization logic (defensive programming) to prevent the state from mutating mid-execution. - The “Machine Gun” Effect: When testing with
nmap -Pn -p 7000,8000,9000, the sequence logic kept failing. Becausenmapis optimized for speed, it fires packets in parallel, causing them to arrive at the server out-of-order (e.g., 9000, 7000, 8000). This prompted me to switch to the sequential bashnetcatloop.
Wrapping Up
To turn this experiment into a background process, you could implement a systemd service unit to manage the daemon’s lifecycle. But it’s important to note that because the script sniffs raw network packets and modifies the firewall via iptables, it requires root privileges to read the socket. While this project was a great learning experience, it remains as a proof-of-concept. For production-hardened environments, I recommend sticking to established, community-maintained solutions like fwknop to ensure security and reliability.
Technical Stack
| Category | Technologies & Concepts |
|---|---|
| Languages | Python (Scapy, Subprocess, Threading), Bash Scripting |
| Networking | OSI Layer 4 (TCP/UDP), Port Knocking Architecture, Packet Sniffing |
| Security | Zero-Trust Principles, Linux Kernel Firewall (iptables/ufw), SPA Logic |
| Systems | Linux Administration (Raspberry Pi/Debian), State Management, Concurrency |
| Dev Tools | Nmap, Netcat, Virtual Environments (venv), Git |