MCP-NET-02 — Egress Traffic Filtering#
Level: L3 Domain: NET
Requirement#
MCP servers MUST implement egress traffic filtering that restricts outbound connections to an explicit allowlist of destinations, protocols, and ports, with mandatory DNS resolution control, connection rate limiting, and comprehensive logging of all blocked attempts.
Rationale#
Unrestricted egress enables data exfiltration, command-and-control communication, and lateral movement in compromised environments. Research shows 80% of MCP servers allow unrestricted outbound connections, enabling attackers to exfiltrate data to arbitrary destinations. Documented incidents include cryptocurrency mining through compromised servers, data theft to attacker-controlled infrastructure, and botnet participation through unrestricted egress.
Scope#
Applies to:
- All outbound network connections (HTTP/HTTPS, TCP, UDP)
- DNS queries and resolutions
- Database and service connections
- Webhook and callback URLs
- File downloads and API calls
- All network interfaces and protocols
Does NOT apply to:
- Loopback connections (127.0.0.1, ::1)
- Unix domain sockets
- Internal service mesh communication (if properly segmented)
Required Evidence#
Egress filtering configuration: Allowlist of permitted destinations
egress_policy: default_action: DENY allowed_destinations: - host: api.company.com ports: [443] protocols: [https] purpose: "Internal API access" - host: "*.amazonaws.com" ports: [443] protocols: [https] purpose: "AWS service access" restrictions: regions: [us-east-1, eu-west-1] services: [s3, dynamodb] - host: database.internal ports: [5432] protocols: [tcp] purpose: "PostgreSQL database" denied_destinations: - host: "*.crypto-mining.com" - host: "*.torproject.org" - cidr: "10.0.0.0/8" # Except explicitly allowed dns_policy: allowed_resolvers: - 10.0.0.53 - 10.0.0.54 block_doh: true # Block DNS over HTTPS block_dot: true # Block DNS over TLS rate_limits: per_destination: max_connections: 100 time_window: 60 # seconds global: max_new_connections: 1000 time_window: 60Enforcement implementation: Code showing egress filtering
Test results:
- Connection to allowed host succeeds
- Connection to non-allowed host blocked
- DNS resolution to unauthorized resolver blocked
- Rate limit exceeded triggers blocking
- Blocked attempts appear in logs
Verification Guidance#
Static (code/config review)#
Look for:
# Python example with egress filtering
import socket
import ipaddress
from typing import Optional, Tuple, List, Dict
from dataclasses import dataclass
import dns.resolver
import time
from collections import defaultdict
@dataclass
class EgressRule:
host_pattern: str
allowed_ports: List[int]
allowed_protocols: List[str]
purpose: str
rate_limit: Optional[int] = None
class EgressFilter:
def __init__(self, default_deny: bool = True):
self.default_deny = default_deny
self.allowed_destinations: List[EgressRule] = []
self.denied_cidrs: List[ipaddress.IPv4Network] = []
self.allowed_dns_servers = ['10.0.0.53', '10.0.0.54']
self.connection_counts = defaultdict(lambda: {'count': 0, 'reset_time': 0})
# Monkey-patch socket to enforce filtering
self._original_connect = socket.socket.connect
socket.socket.connect = self._filtered_connect
def add_allowed_destination(self, rule: EgressRule):
"""Add an allowed egress destination."""
self.allowed_destinations.append(rule)
def is_connection_allowed(self, host: str, port: int,
protocol: str = 'tcp') -> Tuple[bool, str]:
"""Check if outbound connection is allowed."""
# Resolve hostname to IP
try:
ip = self._resolve_host(host)
except Exception as e:
return False, f"DNS resolution failed: {e}"
# Check if IP is in denied ranges
ip_obj = ipaddress.ip_address(ip)
for denied_cidr in self.denied_cidrs:
if ip_obj in denied_cidr:
return False, f"IP {ip} in denied range {denied_cidr}"
# Check against allowed rules
for rule in self.allowed_destinations:
if self._matches_rule(host, ip, port, protocol, rule):
# Check rate limit
if rule.rate_limit and not self._check_rate_limit(host, rule.rate_limit):
return False, f"Rate limit exceeded for {host}"
return True, f"Allowed by rule: {rule.purpose}"
# Default action
if self.default_deny:
return False, f"No matching allow rule for {host}:{port}"
else:
return True, "Default allow (not recommended)"
def _filtered_connect(self, address):
"""Wrapper for socket.connect with filtering."""
if isinstance(address, tuple):
host, port = address[0], address[1]
# Skip filtering for loopback
if host in ['127.0.0.1', 'localhost', '::1']:
return self._original_connect(address)
# Check if connection is allowed
allowed, reason = self.is_connection_allowed(host, port)
if not allowed:
self._log_blocked_connection(host, port, reason)
raise ConnectionError(f"Egress blocked: {reason}")
self._log_allowed_connection(host, port, reason)
return self._original_connect(address)
def _resolve_host(self, host: str) -> str:
"""Resolve hostname with DNS filtering."""
# Configure resolver to use only allowed DNS servers
resolver = dns.resolver.Resolver()
resolver.nameservers = self.allowed_dns_servers
try:
# Block DNS over HTTPS/TLS
if host.startswith('https://') or host.startswith('dns://'):
raise ValueError("DNS over HTTPS/TLS not allowed")
answers = resolver.resolve(host, 'A')
return str(answers[0])
except:
# If already an IP, return as-is
return host
def _matches_rule(self, host: str, ip: str, port: int,
protocol: str, rule: EgressRule) -> bool:
"""Check if connection matches allow rule."""
# Check port
if port not in rule.allowed_ports:
return False
# Check protocol
if protocol not in rule.allowed_protocols:
return False
# Check host pattern
import fnmatch
if not fnmatch.fnmatch(host, rule.host_pattern):
# Also check IP match for cases where pattern is IP-based
if not fnmatch.fnmatch(ip, rule.host_pattern):
return False
return True
def _check_rate_limit(self, host: str, limit: int) -> bool:
"""Check if connection is within rate limit."""
current_time = time.time()
# Reset counter if window expired
if current_time > self.connection_counts[host]['reset_time']:
self.connection_counts[host] = {
'count': 0,
'reset_time': current_time + 60 # 1-minute window
}
# Check limit
if self.connection_counts[host]['count'] >= limit:
return False
self.connection_counts[host]['count'] += 1
return True
def _log_blocked_connection(self, host: str, port: int, reason: str):
"""Log blocked egress attempt."""
import logging
logging.warning(f"EGRESS_BLOCKED: {host}:{port} - {reason}")
# Send to SIEM
self._send_to_siem({
'event': 'egress_blocked',
'host': host,
'port': port,
'reason': reason,
'timestamp': time.time()
})
def _log_allowed_connection(self, host: str, port: int, reason: str):
"""Log allowed egress connection."""
import logging
logging.info(f"EGRESS_ALLOWED: {host}:{port} - {reason}")// Node.js example with egress proxy
const net = require('net');
const dns = require('dns').promises;
const { URL } = require('url');
class EgressProxy {
constructor(config) {
this.allowedHosts = new Map();
this.deniedCIDRs = [];
this.allowedDNS = config.allowedDNS || ['10.0.0.53'];
this.rateLimits = new Map();
this.defaultDeny = config.defaultDeny !== false;
// Intercept outbound connections
this.interceptConnections();
}
addAllowedDestination(pattern, ports, protocols, options = {}) {
this.allowedHosts.set(pattern, {
ports: new Set(ports),
protocols: new Set(protocols),
rateLimit: options.rateLimit,
purpose: options.purpose
});
}
interceptConnections() {
const originalConnect = net.Socket.prototype.connect;
net.Socket.prototype.connect = function(...args) {
const options = args[0];
if (typeof options === 'object' && options.host) {
const { host, port } = options;
// Skip localhost
if (['localhost', '127.0.0.1', '::1'].includes(host)) {
return originalConnect.apply(this, args);
}
// Check if allowed
const allowed = this.checkEgress(host, port);
if (!allowed.permitted) {
const error = new Error(`Egress blocked: ${allowed.reason}`);
error.code = 'EGRESS_DENIED';
throw error;
}
}
return originalConnect.apply(this, args);
}.bind(this);
// Also intercept HTTP/HTTPS
this.interceptHTTP();
}
interceptHTTP() {
const http = require('http');
const https = require('https');
const wrapRequest = (module) => {
const original = module.request;
module.request = (options, callback) => {
const url = typeof options === 'string' ? new URL(options) : options;
const check = this.checkEgress(
url.hostname || url.host,
url.port || (url.protocol === 'https:' ? 443 : 80),
url.protocol.slice(0, -1) // Remove trailing colon
);
if (!check.permitted) {
const error = new Error(`HTTP egress blocked: ${check.reason}`);
error.code = 'EGRESS_DENIED';
if (callback) {
process.nextTick(() => callback(error));
return { on: () => {}, end: () => {} }; // Mock request
}
throw error;
}
return original.call(module, options, callback);
};
};
wrapRequest(http);
wrapRequest(https);
}
checkEgress(host, port, protocol = 'tcp') {
// Check rate limiting first
if (!this.checkRateLimit(host)) {
this.logBlocked(host, port, 'Rate limit exceeded');
return { permitted: false, reason: 'Rate limit exceeded' };
}
// Check allowed hosts
for (const [pattern, config] of this.allowedHosts) {
if (this.matchesPattern(host, pattern)) {
if (config.ports.has(port) &&
(config.protocols.has(protocol) || config.protocols.has('*'))) {
this.logAllowed(host, port, config.purpose);
return { permitted: true, reason: config.purpose };
}
}
}
// Default deny
if (this.defaultDeny) {
this.logBlocked(host, port, 'No matching allow rule');
return { permitted: false, reason: 'Not in allowlist' };
}
return { permitted: true, reason: 'Default allow' };
}
checkRateLimit(host) {
const now = Date.now();
const window = 60000; // 1 minute
if (!this.rateLimits.has(host)) {
this.rateLimits.set(host, []);
}
const hostLimits = this.rateLimits.get(host);
// Clean old entries
const recent = hostLimits.filter(time => now - time < window);
this.rateLimits.set(host, recent);
// Check limit (100 connections per minute)
if (recent.length >= 100) {
return false;
}
recent.push(now);
return true;
}
matchesPattern(host, pattern) {
// Support wildcards
const regex = pattern
.replace(/\./g, '\\.')
.replace(/\*/g, '.*');
return new RegExp(`^${regex}$`).test(host);
}
logBlocked(host, port, reason) {
console.error(`[EGRESS_BLOCKED] ${host}:${port} - ${reason}`);
// Send to SIEM/monitoring
}
logAllowed(host, port, purpose) {
console.log(`[EGRESS_ALLOWED] ${host}:${port} - ${purpose}`);
}
}Red flags:
- No egress filtering at all
- Overly permissive rules (. allowed)
- DNS resolution without restrictions
- No rate limiting on connections
- Missing logging of blocked attempts
- Bypass mechanisms in code
Dynamic (behavioral verification)#
Test cases:
- Allowed connection: Connect to api.company.com:443 → Success
- Blocked connection: Connect to evil.com:443 → Connection refused
- DNS filtering: Query unauthorized DNS server → Blocked
- Rate limiting: 101 connections in 60 seconds → Last one blocked
- Data exfiltration: Attempt to POST to pastebin.com → Blocked
- Protocol enforcement: HTTP to HTTPS-only host → Blocked
Expected behavior:
- Only allowlisted destinations reachable
- Clear error messages for blocked connections
- All blocked attempts logged with details
- Rate limits enforced per destination
- No DNS leakage to unauthorized resolvers
Mappings#
- CWE: CWE-829 (Inclusion of Functionality from Untrusted Control Sphere), CWE-923 (Improper Restriction of Communication Channel)
- OWASP Top 10: A05:2021 – Security Misconfiguration, A08:2021 – Software and Data Integrity Failures
- OWASP MCP Top 10: MCP-09 (Unrestricted Egress), MCP-05 (Data Exfiltration)
- OWASP ASVS: V14.4.6 (Verify egress traffic filtering), V9.2.5 (Verify secure communication configuration)
Exceptions & Compensating Controls#
NOT_APPLICABLE when:
- Completely air-gapped system (no network)
- Local development with no sensitive data
- Read-only kiosk systems
Compensating controls:
- Network-level firewall rules (less granular)
- Data loss prevention (DLP) systems
- Proxy servers with content inspection
- Network segmentation and VLANs
Risk acceptance:
- May skip for L1 internal systems
- Recommended for L2 with sensitive data
- MANDATORY for L3 and Internet-facing systems
References#
- 80% of MCP servers lack egress filtering (arXiv:2506.13538)
- Data exfiltration via unrestricted egress (Incident Response Report 2024)
- NIST SP 800-41: Guidelines on Firewalls and Firewall Policy
- Zero Trust Architecture principles (NIST SP 800-207)
- Egress filtering best practices (SANS Institute)